package thread.lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer {
// 可重入锁
final Lock lock = new ReentrantLock();
// 两个条件对象
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
// 缓冲区
final Object[] items = new Object[10];
// 计数器
int putcount; // 放数据计数
int takecount; // 取数据计数
int count; // 缓冲区计数
// 放数据操作,生产者调用该方法
public void put(Object x) throws InterruptedException {
lock.lock();
try {
// 如果缓冲区满了,则线程等待
while (count == items.length) {
notFull.await();
}
items[putcount] = x;
if (++putcount == items.length) {
putcount = 0;
}
++count;
// 向消费者线程发送通知
notEmpty.signal();
} finally {
lock.unlock();
}
}
// 取数据操作,消费者线程调用该方法
public Object take() throws InterruptedException {
lock.lock();
try {
// 如果缓冲区空,则线程等待
while (count == 0) {
notEmpty.await();
}
Object x = items[takecount];
if (++takecount == items.length) {
takecount = 0;
}
--count;
// 通知其他生产者线程
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
package thread.lock;
public class Consumer implements Runnable {
private String name;
private BoundedBuffer buffer;
public Consumer(String name, BoundedBuffer buffer) {
this.name = name;
this.buffer = buffer;
}
public void run() {
Object c = null;
for (int i = 0; i < 5; i++) {
try {
// 取数据
c = buffer.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("consumer, " + this.name + " , " + c);
try {
Thread.sleep(200);
} catch (InterruptedException ex) {
}
}
}
}
package thread.lock;
public class Producer implements Runnable {
private String name;
private BoundedBuffer buffer;
public Producer(String name, BoundedBuffer buffer) {
this.name = name;
this.buffer = buffer;
}
public void run() {
char c;
for (int i = 0; i < 15; i++) {
c = (char) (Math.random() * 26 + 'A');
try {
// 向缓冲区放入数据
buffer.put(c);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("producer, " + this.name + " , " + c);
try {
Thread.sleep(200);
} catch (InterruptedException ex) {
}
}
}
}
package thread.lock;
public class ProducerConsumerTest {
/**
* @author jzh add 2011-12-23
* @param args
*/
public static void main(String args[]) {
// 创建一个缓冲区
BoundedBuffer stack = new BoundedBuffer();
// 生产者 chanpinxue.cn
Thread producer = new Thread(new Producer("chanpinxue.cn", stack));
// 消费者 jiang
Thread jiang = new Thread(new Consumer("jiang", stack));
// 消费者 jzh
Thread jzh = new Thread(new Consumer("jzh", stack));
producer.start();
jiang.start();
jzh.start();
}
}