package thread.queue; import java.text.SimpleDateFormat; public class Consumer implements Runnable { private String name; private Warehouse queue; public Consumer(String name, Warehouse warehouse) { this.name = name; this.queue = warehouse; } public void run() { SimpleDateFormat sdf = new SimpleDateFormat(); sdf.applyPattern("ss.SSS"); //yyyy-MM-dd HH:mm:ss try { while (true) { System.out.println("consumer, " + this.name + " , begin " + sdf.format(System.currentTimeMillis())); // String cake = queue.consume(); queue.consume(); System.out.println("consumer, " + this.name + " , end " + sdf.format(System.currentTimeMillis())); Thread.sleep(200); } } catch (InterruptedException ex) { } } }
package thread.queue; import java.text.SimpleDateFormat; public class Producer implements Runnable { private String name; private Warehouse queue; public Producer(String name, Warehouse warehouse) { this.name = name; this.queue = warehouse; } public void run() { SimpleDateFormat sdf = new SimpleDateFormat(); sdf.applyPattern("ss.SSS"); //yyyy-MM-dd HH:mm:ss try { while (true) { System.out.println("producer, " + this.name + " , begin " + sdf.format(System.currentTimeMillis())); queue.produce("cake"); System.out.println("producer, " + this.name + " , end " + sdf.format(System.currentTimeMillis())); Thread.sleep(100); } } catch (InterruptedException ex) { } } }
package thread.queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ProducerConsumerTest { /** * @author jzh add 2011-12-22 * @param args */ public static void main(String args[]) { // 建立一个仓库(用于存放产品) Warehouse warehouse = new Warehouse(); // 创建一个线程池(创建一个可根据需要创建新线程的线程池) // Executors 类提供了用于此包中所提供的执行程序服务的工厂方法 ExecutorService pool = Executors.newCachedThreadPool(); // 生产者 howsky Producer producer = new Producer("chanpinxue.cn", warehouse); // 消费者 jiang Consumer jiang = new Consumer("jiang", warehouse); // 消费者 jzh Consumer jzh = new Consumer("jzh", warehouse); pool.submit(producer); pool.submit(jiang); pool.submit(jzh); try { Thread.sleep(1000); } catch (InterruptedException ex) { } pool.shutdownNow(); } }
package thread.queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class Warehouse { // 定义一个ArrayBlockingQueue, 指定长度为5. BlockingQueue<String> queue = new ArrayBlockingQueue<String>(5); // 生产 public void produce(String name) throws InterruptedException { // put方法放入一个cake, 若queue满了, 等到queue有位置. queue.put(name); } // 消费 public String consume() throws InterruptedException { // take方法取出一个cake, 若queue为空, 等到queue有cake为止. return queue.take(); } }