Java 多线程 BlockingQueue ArrayBlockingQueue

Java 多线程 BlockingQueue ArrayBlockingQueue

package thread.queue;

import java.text.SimpleDateFormat;

public class Consumer implements Runnable {
  private String name;
  private Warehouse queue;

  public Consumer(String name, Warehouse warehouse) {
    this.name = name;
    this.queue = warehouse;
  }

  public void run() {
    SimpleDateFormat sdf = new SimpleDateFormat();
    sdf.applyPattern("ss.SSS"); //yyyy-MM-dd HH:mm:ss
    try {
      while (true) {
        System.out.println("consumer, " + this.name + " , begin " + sdf.format(System.currentTimeMillis()));
        // String cake = queue.consume();
        queue.consume();
        System.out.println("consumer, " + this.name + " , end " + sdf.format(System.currentTimeMillis()));
        Thread.sleep(200);
      }
    } catch (InterruptedException ex) {
    }
  }
}
package thread.queue;

import java.text.SimpleDateFormat;

public class Producer implements Runnable {
  private String name;
  private Warehouse queue;

  public Producer(String name, Warehouse warehouse) {
    this.name = name;
    this.queue = warehouse;
  }

  public void run() {
    SimpleDateFormat sdf = new SimpleDateFormat();
    sdf.applyPattern("ss.SSS"); //yyyy-MM-dd HH:mm:ss
    try {
      while (true) {
        System.out.println("producer, " + this.name + " , begin " + sdf.format(System.currentTimeMillis()));
        queue.produce("cake");
        System.out.println("producer, " + this.name + " , end " + sdf.format(System.currentTimeMillis()));
        Thread.sleep(100);
      }
    } catch (InterruptedException ex) {
    }
  }
}
package thread.queue;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ProducerConsumerTest {

	/**
	 * @author jzh add 2011-12-22
	 * @param args
	 */
	public static void main(String args[]) {
		// 建立一个仓库(用于存放产品)
		Warehouse warehouse = new Warehouse(); 
		// 创建一个线程池(创建一个可根据需要创建新线程的线程池)
		// Executors 类提供了用于此包中所提供的执行程序服务的工厂方法
		ExecutorService pool = Executors.newCachedThreadPool();
		// 生产者 howsky
		Producer producer = new Producer("chanpinxue.cn", warehouse);
		// 消费者 jiang
		Consumer jiang = new Consumer("jiang", warehouse);
		// 消费者 jzh 
		Consumer jzh = new Consumer("jzh", warehouse);
		
		pool.submit(producer);
		pool.submit(jiang);
		pool.submit(jzh);

		try {
			Thread.sleep(1000);
		} catch (InterruptedException ex) {
		}

		pool.shutdownNow();
	}
}

package thread.queue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Warehouse {
  // 定义一个ArrayBlockingQueue, 指定长度为5.
  BlockingQueue<String> queue = new ArrayBlockingQueue<String>(5);

  // 生产
  public void produce(String name) throws InterruptedException {
    // put方法放入一个cake, 若queue满了, 等到queue有位置.
    queue.put(name);
  }

  // 消费
  public String consume() throws InterruptedException {
    // take方法取出一个cake, 若queue为空, 等到queue有cake为止.
    return queue.take();
  }
}

 

发表回复

您的电子邮箱地址不会被公开。