package thread.barrier; import java.util.concurrent.ConcurrentHashMap; public interface MasterDao { public void count(ConcurrentHashMap<String, SaleReport> map); }
package thread.barrier; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; public class MasterDaoImpl implements MasterDao { @Override public void count(ConcurrentHashMap<String, SaleReport> map) { double sales; // 营业额 long tc; // 交易次数 sales = 0; tc = 0; Iterator<SaleReport> iter = map.values().iterator(); System.out.println("各slave数据"); while (iter.hasNext()) { SaleReport obj = iter.next(); sales = sales + obj.sales; tc = tc + obj.tc; System.out.println("slave: " + obj.code + ", sales: " + obj.sales + ", tc: "+ obj.tc); } System.out.println("总sales: " + sales); System.out.println("总tc: " + tc); } }
package thread.barrier; import java.util.concurrent.ConcurrentHashMap; public class MasterTask implements Runnable { private MasterDao dao; private ConcurrentHashMap<String, SaleReport> map; MasterTask(MasterDao dao, ConcurrentHashMap<String, SaleReport> map) { this.dao = dao; this.map = map; } public void run() { System.out.println("-------------------------"); System.out.println("开始汇总数据"); // 汇总各个slave的数据 dao.count(this.map); System.out.println("汇总数据完成"); } }
package thread.barrier; public class SaleReport { String code; // 编码 double sales; // 营业额 long tc; // 交易次数 public long getTc() { return tc; } public void setTc(long tc) { this.tc = tc; } public String getCode() { return code; } public void setCode(String code) { this.code = code; } public double getSales() { return sales; } public void setSales(double sales) { this.sales = sales; } }
package thread.barrier; public interface SlaveDao { // 查询各个salve db 的数据 public SaleReport qryReport(String code); }
package thread.barrier; public class SlaveDaoImpl implements SlaveDao { /** * 查询各个salve db 的数据 * 这里仅仅模拟了一些数据 * add by jzh 2011-12-27 */ @Override public SaleReport qryReport(String code) { SaleReport rpt = new SaleReport(); switch (Integer.valueOf(code)) { case 1001: { rpt.code = String.valueOf(1001); rpt.sales = 1; rpt.tc = 2; break; } case 1002: { rpt.code = String.valueOf(1002); rpt.sales = 2; rpt.tc = 4; break; } case 1003: { rpt.code = String.valueOf(1003); rpt.sales = 3; rpt.tc = 6; break; } case 1004: { rpt.code = String.valueOf(1004); rpt.sales = 4; rpt.tc = 8; break; } case 1005: { rpt.code = String.valueOf(1005); rpt.sales = 5; rpt.tc = 10; break; } default: { rpt.code = ""; rpt.sales = 0; rpt.tc = 0; break; } } return rpt; } }
package thread.barrier; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; public class SlaveTask extends Thread { private SlaveDao dao; private CyclicBarrier barrier; private String code; private ConcurrentHashMap<String, SaleReport> map; SlaveTask(SlaveDao dao, CyclicBarrier barrier, String code, ConcurrentHashMap<String, SaleReport> map) { this.dao = dao; this.barrier = barrier; this.code = code; this.map = map; } public void run() { System.out.println("开始分析: " + code + "数据"); SaleReport rpt = dao.qryReport(code); // 存入map map.put(code, rpt); System.out.println(code + "已经分析完成,并通知Master"); try { // 通知barrier已经完成 barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
package thread.barrier; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Test { /** * @author jzh 2011-12-27 * @param args */ public static void main(String[] args) { ConcurrentHashMap<String, SaleReport> map = new ConcurrentHashMap<String, SaleReport>(); MasterDao daoMaster = new MasterDaoImpl(); CyclicBarrier barrier = new CyclicBarrier(5, new MasterTask(daoMaster, map)); // 创建一个线程池(创建一个可根据需要创建新线程的线程池) // Executors 类提供了用于此包中所提供的执行程序服务的工厂方法 ExecutorService pool = Executors.newCachedThreadPool(); pool.execute(new SlaveTask(new SlaveDaoImpl(), barrier, "1001", map)); pool.execute(new SlaveTask(new SlaveDaoImpl(), barrier, "1002", map)); pool.execute(new SlaveTask(new SlaveDaoImpl(), barrier, "1003", map)); pool.execute(new SlaveTask(new SlaveDaoImpl(), barrier, "1004", map)); pool.execute(new SlaveTask(new SlaveDaoImpl(), barrier, "1005", map)); pool.shutdown(); } }