package thread.linkedqueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; public class Worker implements Runnable { private CountDownLatch downLatch; private String name; private ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>(); public Worker(CountDownLatch downLatch, String name, ConcurrentLinkedQueue<Integer> queue) { this.downLatch = downLatch; this.name = name; this.queue = queue; } public void run() { this.doPoll(); this.downLatch.countDown(); } /** * 取数据 */ private void doPoll() { while (!queue.isEmpty()) { System.out.println(name + "," + queue.poll()); } } }
package thread.linkedqueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Test { /** * @author jzh add 2012-01-06 * @param args */ @SuppressWarnings("unused") public static void main(String[] args) { // 初始队列 ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>(); for (int i = 0; i < 100000; i++) { queue.offer(i); } // 用于计算时间 long timer = System.currentTimeMillis(); // 创建一个线程池(创建一个可根据需要创建新线程的线程池) // Executors 类提供了用于此包中所提供的执行程序服务的工厂方法 ExecutorService pool = Executors.newCachedThreadPool(); // 初始化一个4次的CountDownLatch CountDownLatch latch = new CountDownLatch(4); Worker wroker1 = new Worker(latch, "chanpinxue.cn", queue); Worker wroker2 = new Worker(latch, "jiang", queue); Worker wroker3 = new Worker(latch, "jzh", queue); Worker wroker4 = new Worker(latch, "jiangzhihao", queue); pool.execute(wroker1); pool.execute(wroker2); pool.execute(wroker3); pool.execute(wroker4); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } // 输出耗时 System.out.println("time is " + (System.currentTimeMillis() - timer) + " ms"); pool.shutdown(); } }