package thread.barrier;
import java.util.concurrent.ConcurrentHashMap;
public interface MasterDao {
public void count(ConcurrentHashMap<String, SaleReport> map);
}
package thread.barrier;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
public class MasterDaoImpl implements MasterDao {
@Override
public void count(ConcurrentHashMap<String, SaleReport> map) {
double sales; // 营业额
long tc; // 交易次数
sales = 0;
tc = 0;
Iterator<SaleReport> iter = map.values().iterator();
System.out.println("各slave数据");
while (iter.hasNext()) {
SaleReport obj = iter.next();
sales = sales + obj.sales;
tc = tc + obj.tc;
System.out.println("slave: " + obj.code + ", sales: " + obj.sales + ", tc: "+ obj.tc);
}
System.out.println("总sales: " + sales);
System.out.println("总tc: " + tc);
}
}
package thread.barrier;
import java.util.concurrent.ConcurrentHashMap;
public class MasterTask implements Runnable {
private MasterDao dao;
private ConcurrentHashMap<String, SaleReport> map;
MasterTask(MasterDao dao, ConcurrentHashMap<String, SaleReport> map) {
this.dao = dao;
this.map = map;
}
public void run() {
System.out.println("-------------------------");
System.out.println("开始汇总数据");
// 汇总各个slave的数据
dao.count(this.map);
System.out.println("汇总数据完成");
}
}
package thread.barrier;
public class SaleReport {
String code; // 编码
double sales; // 营业额
long tc; // 交易次数
public long getTc() {
return tc;
}
public void setTc(long tc) {
this.tc = tc;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public double getSales() {
return sales;
}
public void setSales(double sales) {
this.sales = sales;
}
}
package thread.barrier;
public interface SlaveDao {
// 查询各个salve db 的数据
public SaleReport qryReport(String code);
}
package thread.barrier;
public class SlaveDaoImpl implements SlaveDao {
/**
* 查询各个salve db 的数据
* 这里仅仅模拟了一些数据
* add by jzh 2011-12-27
*/
@Override
public SaleReport qryReport(String code) {
SaleReport rpt = new SaleReport();
switch (Integer.valueOf(code)) {
case 1001: {
rpt.code = String.valueOf(1001);
rpt.sales = 1;
rpt.tc = 2;
break;
}
case 1002: {
rpt.code = String.valueOf(1002);
rpt.sales = 2;
rpt.tc = 4;
break;
}
case 1003: {
rpt.code = String.valueOf(1003);
rpt.sales = 3;
rpt.tc = 6;
break;
}
case 1004: {
rpt.code = String.valueOf(1004);
rpt.sales = 4;
rpt.tc = 8;
break;
}
case 1005: {
rpt.code = String.valueOf(1005);
rpt.sales = 5;
rpt.tc = 10;
break;
}
default: {
rpt.code = "";
rpt.sales = 0;
rpt.tc = 0;
break;
}
}
return rpt;
}
}
package thread.barrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
public class SlaveTask extends Thread {
private SlaveDao dao;
private CyclicBarrier barrier;
private String code;
private ConcurrentHashMap<String, SaleReport> map;
SlaveTask(SlaveDao dao, CyclicBarrier barrier, String code,
ConcurrentHashMap<String, SaleReport> map) {
this.dao = dao;
this.barrier = barrier;
this.code = code;
this.map = map;
}
public void run() {
System.out.println("开始分析: " + code + "数据");
SaleReport rpt = dao.qryReport(code);
// 存入map
map.put(code, rpt);
System.out.println(code + "已经分析完成,并通知Master");
try {
// 通知barrier已经完成
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
package thread.barrier;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test {
/**
* @author jzh 2011-12-27
* @param args
*/
public static void main(String[] args) {
ConcurrentHashMap<String, SaleReport> map = new ConcurrentHashMap<String, SaleReport>();
MasterDao daoMaster = new MasterDaoImpl();
CyclicBarrier barrier = new CyclicBarrier(5,
new MasterTask(daoMaster, map));
// 创建一个线程池(创建一个可根据需要创建新线程的线程池)
// Executors 类提供了用于此包中所提供的执行程序服务的工厂方法
ExecutorService pool = Executors.newCachedThreadPool();
pool.execute(new SlaveTask(new SlaveDaoImpl(), barrier, "1001", map));
pool.execute(new SlaveTask(new SlaveDaoImpl(), barrier, "1002", map));
pool.execute(new SlaveTask(new SlaveDaoImpl(), barrier, "1003", map));
pool.execute(new SlaveTask(new SlaveDaoImpl(), barrier, "1004", map));
pool.execute(new SlaveTask(new SlaveDaoImpl(), barrier, "1005", map));
pool.shutdown();
}
}