Java 多线程 Lock ReentrantLock Condition

Java 多线程 Lock ReentrantLock Condition

package thread.lock;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBuffer {
  // 可重入锁
  final Lock lock = new ReentrantLock();
  // 两个条件对象
  final Condition notFull = lock.newCondition();
  final Condition notEmpty = lock.newCondition();
  // 缓冲区
  final Object[] items = new Object[10];
  // 计数器
  int putcount;  // 放数据计数
  int takecount; // 取数据计数
  int count;     // 缓冲区计数

  // 放数据操作,生产者调用该方法
  public void put(Object x) throws InterruptedException {
    lock.lock();
    try {
      // 如果缓冲区满了,则线程等待
      while (count == items.length) {
        notFull.await();
      }
      
      items[putcount] = x;
      
      if (++putcount == items.length) {
        putcount = 0;
      }
      
      ++count;
      
      // 向消费者线程发送通知
      notEmpty.signal();
    } finally {
      lock.unlock();
    }
  }

  // 取数据操作,消费者线程调用该方法
  public Object take() throws InterruptedException {
    lock.lock();
    try {
      // 如果缓冲区空,则线程等待
      while (count == 0) {
        notEmpty.await();
      }
      
      Object x = items[takecount];
      
      if (++takecount == items.length) {
        takecount = 0;
      }
      
      --count;
      
      // 通知其他生产者线程
      notFull.signal();
      return x;
    } finally {
      lock.unlock();
    }
  }

}
package thread.lock;

public class Consumer implements Runnable {
  private String name;
  private BoundedBuffer buffer;

  public Consumer(String name, BoundedBuffer buffer) {
    this.name = name;
    this.buffer = buffer;
  }

  public void run() {
    Object c = null;
    for (int i = 0; i < 5; i++) {
      try {
        // 取数据
        c = buffer.take();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.out.println("consumer, " + this.name + " , " + c);
      try {
        Thread.sleep(200);
      } catch (InterruptedException ex) {
      }
    }
  }
}
package thread.lock;

public class Producer implements Runnable {
  private String name;
  private BoundedBuffer buffer;

  public Producer(String name, BoundedBuffer buffer) {
    this.name = name;
    this.buffer = buffer;
  }

  public void run() {
    char c;
    for (int i = 0; i < 15; i++) {
      c = (char) (Math.random() * 26 + 'A');
      try {
        // 向缓冲区放入数据
        buffer.put(c);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.out.println("producer, " + this.name + " , " + c);
      try {
        Thread.sleep(200);
      } catch (InterruptedException ex) {
      }
    }
  }
}
package thread.lock;

public class ProducerConsumerTest {

  /**
   * @author jzh add 2011-12-23
   * @param args
   */
  public static void main(String args[]) {
    // 创建一个缓冲区
    BoundedBuffer stack = new BoundedBuffer();
    
    // 生产者 chanpinxue.cn
    Thread producer = new Thread(new Producer("chanpinxue.cn", stack));
    // 消费者 jiang
    Thread jiang = new Thread(new Consumer("jiang", stack));
    // 消费者 jzh 
    Thread jzh = new Thread(new Consumer("jzh", stack));
    
    producer.start();
    jiang.start();
    jzh.start();
  }
}

 

发表回复

您的电子邮箱地址不会被公开。