package thread.queue;
import java.text.SimpleDateFormat;
public class Consumer implements Runnable {
private String name;
private Warehouse queue;
public Consumer(String name, Warehouse warehouse) {
this.name = name;
this.queue = warehouse;
}
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat();
sdf.applyPattern("ss.SSS"); //yyyy-MM-dd HH:mm:ss
try {
while (true) {
System.out.println("consumer, " + this.name + " , begin " + sdf.format(System.currentTimeMillis()));
// String cake = queue.consume();
queue.consume();
System.out.println("consumer, " + this.name + " , end " + sdf.format(System.currentTimeMillis()));
Thread.sleep(200);
}
} catch (InterruptedException ex) {
}
}
}
package thread.queue;
import java.text.SimpleDateFormat;
public class Producer implements Runnable {
private String name;
private Warehouse queue;
public Producer(String name, Warehouse warehouse) {
this.name = name;
this.queue = warehouse;
}
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat();
sdf.applyPattern("ss.SSS"); //yyyy-MM-dd HH:mm:ss
try {
while (true) {
System.out.println("producer, " + this.name + " , begin " + sdf.format(System.currentTimeMillis()));
queue.produce("cake");
System.out.println("producer, " + this.name + " , end " + sdf.format(System.currentTimeMillis()));
Thread.sleep(100);
}
} catch (InterruptedException ex) {
}
}
}
package thread.queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ProducerConsumerTest {
/**
* @author jzh add 2011-12-22
* @param args
*/
public static void main(String args[]) {
// 建立一个仓库(用于存放产品)
Warehouse warehouse = new Warehouse();
// 创建一个线程池(创建一个可根据需要创建新线程的线程池)
// Executors 类提供了用于此包中所提供的执行程序服务的工厂方法
ExecutorService pool = Executors.newCachedThreadPool();
// 生产者 howsky
Producer producer = new Producer("chanpinxue.cn", warehouse);
// 消费者 jiang
Consumer jiang = new Consumer("jiang", warehouse);
// 消费者 jzh
Consumer jzh = new Consumer("jzh", warehouse);
pool.submit(producer);
pool.submit(jiang);
pool.submit(jzh);
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
}
pool.shutdownNow();
}
}
package thread.queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Warehouse {
// 定义一个ArrayBlockingQueue, 指定长度为5.
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(5);
// 生产
public void produce(String name) throws InterruptedException {
// put方法放入一个cake, 若queue满了, 等到queue有位置.
queue.put(name);
}
// 消费
public String consume() throws InterruptedException {
// take方法取出一个cake, 若queue为空, 等到queue有cake为止.
return queue.take();
}
}