package thread.lock; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class BoundedBuffer { // 可重入锁 final Lock lock = new ReentrantLock(); // 两个条件对象 final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); // 缓冲区 final Object[] items = new Object[10]; // 计数器 int putcount; // 放数据计数 int takecount; // 取数据计数 int count; // 缓冲区计数 // 放数据操作,生产者调用该方法 public void put(Object x) throws InterruptedException { lock.lock(); try { // 如果缓冲区满了,则线程等待 while (count == items.length) { notFull.await(); } items[putcount] = x; if (++putcount == items.length) { putcount = 0; } ++count; // 向消费者线程发送通知 notEmpty.signal(); } finally { lock.unlock(); } } // 取数据操作,消费者线程调用该方法 public Object take() throws InterruptedException { lock.lock(); try { // 如果缓冲区空,则线程等待 while (count == 0) { notEmpty.await(); } Object x = items[takecount]; if (++takecount == items.length) { takecount = 0; } --count; // 通知其他生产者线程 notFull.signal(); return x; } finally { lock.unlock(); } } }
package thread.lock; public class Consumer implements Runnable { private String name; private BoundedBuffer buffer; public Consumer(String name, BoundedBuffer buffer) { this.name = name; this.buffer = buffer; } public void run() { Object c = null; for (int i = 0; i < 5; i++) { try { // 取数据 c = buffer.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("consumer, " + this.name + " , " + c); try { Thread.sleep(200); } catch (InterruptedException ex) { } } } }
package thread.lock; public class Producer implements Runnable { private String name; private BoundedBuffer buffer; public Producer(String name, BoundedBuffer buffer) { this.name = name; this.buffer = buffer; } public void run() { char c; for (int i = 0; i < 15; i++) { c = (char) (Math.random() * 26 + 'A'); try { // 向缓冲区放入数据 buffer.put(c); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("producer, " + this.name + " , " + c); try { Thread.sleep(200); } catch (InterruptedException ex) { } } } }
package thread.lock; public class ProducerConsumerTest { /** * @author jzh add 2011-12-23 * @param args */ public static void main(String args[]) { // 创建一个缓冲区 BoundedBuffer stack = new BoundedBuffer(); // 生产者 chanpinxue.cn Thread producer = new Thread(new Producer("chanpinxue.cn", stack)); // 消费者 jiang Thread jiang = new Thread(new Consumer("jiang", stack)); // 消费者 jzh Thread jzh = new Thread(new Consumer("jzh", stack)); producer.start(); jiang.start(); jzh.start(); } }