Java 多线程 CyclicBarrier

Java 多线程 CyclicBarrier

package thread.barrier;

import java.util.concurrent.ConcurrentHashMap;

public interface MasterDao {
  public void count(ConcurrentHashMap<String, SaleReport> map);
}
package thread.barrier;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;

public class MasterDaoImpl implements MasterDao {

  @Override
  public void count(ConcurrentHashMap<String, SaleReport> map) {

    double sales; // 营业额
    long tc; // 交易次数
    
    sales = 0;
    tc = 0;
    Iterator<SaleReport> iter = map.values().iterator();
    System.out.println("各slave数据");
    while (iter.hasNext()) {
      SaleReport obj = iter.next();
      sales = sales + obj.sales;
      tc = tc + obj.tc;
      System.out.println("slave: " + obj.code + ", sales: " + obj.sales + ", tc: "+ obj.tc);
    }
    System.out.println("总sales: " + sales);
    System.out.println("总tc: " + tc);
  }

}
package thread.barrier;

import java.util.concurrent.ConcurrentHashMap;

public class MasterTask implements Runnable {
  private MasterDao dao;
  private ConcurrentHashMap<String, SaleReport> map;

  MasterTask(MasterDao dao, ConcurrentHashMap<String, SaleReport> map) {
    this.dao = dao;
    this.map = map;
  }

  public void run() {

    System.out.println("-------------------------");
    System.out.println("开始汇总数据");
    
    // 汇总各个slave的数据
    dao.count(this.map);
    System.out.println("汇总数据完成");
    
  }

}
package thread.barrier;

public class SaleReport {
  String code; // 编码
  double sales; // 营业额
  long tc; // 交易次数

  public long getTc() {
    return tc;
  }

  public void setTc(long tc) {
    this.tc = tc;
  }

  public String getCode() {
    return code;
  }

  public void setCode(String code) {
    this.code = code;
  }

  public double getSales() {
    return sales;
  }

  public void setSales(double sales) {
    this.sales = sales;
  }
}
package thread.barrier;

public interface SlaveDao {
  // 查询各个salve db 的数据
  public SaleReport qryReport(String code); 
}
package thread.barrier;

public class SlaveDaoImpl implements SlaveDao {

  /**
   * 查询各个salve db 的数据
   * 这里仅仅模拟了一些数据
   * add by jzh 2011-12-27
   */
  @Override
  public SaleReport qryReport(String code) {
    
    SaleReport rpt = new SaleReport();

    switch (Integer.valueOf(code)) {
    case 1001: {
      rpt.code = String.valueOf(1001);
      rpt.sales = 1;
      rpt.tc = 2;
      break;
    }
    case 1002: {
      rpt.code = String.valueOf(1002);
      rpt.sales = 2;
      rpt.tc = 4;
      break;
    }
    case 1003: {
      rpt.code = String.valueOf(1003);
      rpt.sales = 3;
      rpt.tc = 6;
      break;
    }
    case 1004: {
      rpt.code = String.valueOf(1004);
      rpt.sales = 4;
      rpt.tc = 8;
      break;
    }
    case 1005: {
      rpt.code = String.valueOf(1005);
      rpt.sales = 5;
      rpt.tc = 10;
      break;
    }
    default: {
      rpt.code = "";
      rpt.sales = 0;
      rpt.tc = 0;
      break;
    }
    }
    return rpt;
  }

}
package thread.barrier;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;

public class SlaveTask extends Thread {
  private SlaveDao dao;
  private CyclicBarrier barrier;
  private String code;
  private ConcurrentHashMap<String, SaleReport> map;

  SlaveTask(SlaveDao dao, CyclicBarrier barrier, String code,
      ConcurrentHashMap<String, SaleReport> map) {
    this.dao = dao;
    this.barrier = barrier;
    this.code = code;
    this.map = map;
  }

  public void run() {
    System.out.println("开始分析: " + code + "数据");

    SaleReport rpt = dao.qryReport(code);
    // 存入map
    map.put(code, rpt);

    System.out.println(code + "已经分析完成,并通知Master");
    try {
      // 通知barrier已经完成
      barrier.await();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (BrokenBarrierException e) {
      e.printStackTrace();
    }
  }

}
package thread.barrier;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test {

  /**
   * @author jzh 2011-12-27
   * @param args
   */
  public static void main(String[] args) {
    ConcurrentHashMap<String, SaleReport> map = new ConcurrentHashMap<String, SaleReport>();
    
    MasterDao daoMaster = new MasterDaoImpl();
    CyclicBarrier barrier = new CyclicBarrier(5,
        new MasterTask(daoMaster, map));
    
    // 创建一个线程池(创建一个可根据需要创建新线程的线程池)
    // Executors 类提供了用于此包中所提供的执行程序服务的工厂方法
    ExecutorService pool = Executors.newCachedThreadPool();
    pool.execute(new SlaveTask(new SlaveDaoImpl(), barrier, "1001", map));
    pool.execute(new SlaveTask(new SlaveDaoImpl(), barrier, "1002", map));
    pool.execute(new SlaveTask(new SlaveDaoImpl(), barrier, "1003", map));
    pool.execute(new SlaveTask(new SlaveDaoImpl(), barrier, "1004", map));
    pool.execute(new SlaveTask(new SlaveDaoImpl(), barrier, "1005", map));
    
        pool.shutdown();
  }
}

 

发表回复

您的电子邮箱地址不会被公开。