package thread.linkedqueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
public class Worker implements Runnable {
private CountDownLatch downLatch;
private String name;
private ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
public Worker(CountDownLatch downLatch, String name, ConcurrentLinkedQueue<Integer> queue) {
this.downLatch = downLatch;
this.name = name;
this.queue = queue;
}
public void run() {
this.doPoll();
this.downLatch.countDown();
}
/**
* 取数据
*/
private void doPoll() {
while (!queue.isEmpty()) {
System.out.println(name + "," + queue.poll());
}
}
}
package thread.linkedqueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test {
/**
* @author jzh add 2012-01-06
* @param args
*/
@SuppressWarnings("unused")
public static void main(String[] args) {
// 初始队列
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
for (int i = 0; i < 100000; i++) {
queue.offer(i);
}
// 用于计算时间
long timer = System.currentTimeMillis();
// 创建一个线程池(创建一个可根据需要创建新线程的线程池)
// Executors 类提供了用于此包中所提供的执行程序服务的工厂方法
ExecutorService pool = Executors.newCachedThreadPool();
// 初始化一个4次的CountDownLatch
CountDownLatch latch = new CountDownLatch(4);
Worker wroker1 = new Worker(latch, "chanpinxue.cn", queue);
Worker wroker2 = new Worker(latch, "jiang", queue);
Worker wroker3 = new Worker(latch, "jzh", queue);
Worker wroker4 = new Worker(latch, "jiangzhihao", queue);
pool.execute(wroker1);
pool.execute(wroker2);
pool.execute(wroker3);
pool.execute(wroker4);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 输出耗时
System.out.println("time is " + (System.currentTimeMillis() - timer) + " ms");
pool.shutdown();
}
}