线程助手 - ThreadUtil

曾几何时,多线程是JAVA里面很少触碰的技术,而随着技术的发展,对于多线程使用越来越多,在合适的时候合理的使用多线程,对于性能有着质的提高, 但是如果自己从头写,会比较繁琐,那么我们就封装一个吧(这也是feilong的精髓思想之所在)

我们来看看 线程助手 - ThreadUtil

1.执行.

方法Description
void execute(List<T> list,int eachSize,Map<String, ?> paramsMap,PartitionRunnableBuilder<T> partitionRunnableBuilder)给定一个待解析的 list,设定每个线程执行多少条 eachSize,传入一些额外的参数 paramsMap,使用自定义的 partitionRunnableBuilder,自动构造多条线程并运行..

1.1 void execute(List<T> list,int eachSize,Map<String, ?> paramsMap,PartitionRunnableBuilder<T> partitionRunnableBuilder)

给定一个待解析的 list,设定每个线程执行多少条 eachSize,传入一些额外的参数 paramsMap,使用自定义的 partitionRunnableBuilder,自动构造多条线程并运行.

适用场景:

比如同步库存,一次从MQ或者其他接口中得到了5000条数据,如果使用单线程做5000次循环,势必会比较慢,并且影响性能; 如果调用这个方法,传入eachSize=100, 那么自动会开启5000/100=50 个线程来跑功能,大大提高同步库存的速度

其他的适用场景还有诸如同步商品主档数据,同步订单等等这类每个独立对象之间没有相关联关系的数据,能提高执行速度和效率

重构:

对于以下代码:模拟10个对象/数字,循环执行任务(可能是操作数据库等)

  1. public void testExecuteTest() throws InterruptedException{
  2. Date beginDate = new Date();
  3. List<Integer> list = toList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
  4. for (Integer integer : list){
  5. //------
  6. //模拟 do something
  7. //---------
  8. Thread.sleep(1 * MILLISECOND_PER_SECONDS);
  9. }
  10. LOGGER.info("use time:{}", formatDuration(beginDate));
  11. }

统计总耗时时间 需要 use time:10秒28毫秒

此时你可以调用此方法,改成多线程执行:

  1. public void testExecuteTestUsePartitionRunnableBuilder() throws InterruptedException{
  2. Date beginDate = new Date();
  3. List<Integer> list = toList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
  4. //每个线程执行2条数据, 没有自定义 paramsMap
  5. //将会自动创建 list.size()/2 =5个 线程执行
  6. //每个线程执行的,将会是 PartitionRunnableBuilder build 返回的 Runnable
  7. ThreadUtil.execute(list, 1, null, new PartitionRunnableBuilder<Integer>(){
  8. @Override
  9. public Runnable build(final List<Integer> perBatchList,PartitionThreadEntity partitionThreadEntity,Map<String, ?> paramsMap){
  10. return new Runnable(){
  11. @Override
  12. public void run(){
  13. for (Integer integer : perBatchList){
  14. //------
  15. //模拟 do something
  16. //---------
  17. try{
  18. Thread.sleep(1 * MILLISECOND_PER_SECONDS);
  19. }catch (InterruptedException e){
  20. LOGGER.error("", e);
  21. }
  22. }
  23. }
  24. };
  25. }
  26. });
  27. LOGGER.info("use time:{}", formatDuration(beginDate));
  28. }

统计总耗时时间 需要 use time:2秒36毫秒

对于上述的case,如果将 eachSize 参数由2 改成1, 统计总耗时时间 需要 use time:1秒36毫秒

可见 调用该方法,使用多线程能节省执行时间,提高效率; 但是也需要酌情考虑eachSize大小,合理的开启线程数量

说明:

  • 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性;
  • 需要注意合理的评估list 的大小和eachSize 比率;不建议list size很大,比如 20W,而eachSize值很小,比如2 ,那么会开启20W/2=10W个线程;此时建议考虑 线程池的实现方案

  • 对于参数 paramsMap 的使用:比如你需要拿到最终每条数据执行的结果,以便后续进行处理(比如对失败的操作再次执行或者发送汇报邮件等)

  1. public void testExecuteTestUsePartitionRunnableBuilderParamsMap() throws InterruptedException{
  2. Date beginDate = new Date();
  3. List<Integer> list = toList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
  4. final Map<Integer, Boolean> indexAndResultMap = Collections.synchronizedSortedMap(new TreeMap());
  5. ThreadUtil.execute(list, 2, null, new PartitionRunnableBuilder<Integer>(){
  6. @Override
  7. public Runnable build(
  8. final List<Integer> perBatchList,
  9. final PartitionThreadEntity partitionThreadEntity,
  10. Map<String, ?> paramsMap){
  11. return new Runnable(){
  12. @Override
  13. public void run(){
  14. int i = 0;
  15. for (Integer integer : perBatchList){
  16. //------
  17. //模拟 do something
  18. //---------
  19. try{
  20. Thread.sleep(1 * MILLISECOND_PER_SECONDS);
  21. }catch (InterruptedException e){
  22. LOGGER.error("", e);
  23. }
  24. int indexInTotalList = getIndex(partitionThreadEntity, i);
  25. //模拟 当值是 5 或者8 的时候 操作结果是false
  26. boolean result = (integer == 5 || integer == 8) ? false : true;
  27. indexAndResultMap.put(indexInTotalList, result);
  28. i++;
  29. }
  30. }
  31. private Integer getIndex(PartitionThreadEntity partitionThreadEntity,int i){
  32. int batchNumber = partitionThreadEntity.getBatchNumber();
  33. return batchNumber * partitionThreadEntity.getEachSize() + i;
  34. }
  35. };
  36. }
  37. });
  38. LOGGER.debug(JsonUtil.format(indexAndResultMap));
  39. LOGGER.info("use time:{}", formatDuration(beginDate));
  40. }

输出结果:

  1. 29:21 DEBUG (ThreadUtilExample.java:161) [testExecuteTestUsePartitionRunnableBuilderParamsMap()] {
  2. "0": true,
  3. "1": true,
  4. "2": true,
  5. "3": true,
  6. "4": true,
  7. "5": false,
  8. "6": true,
  9. "7": true,
  10. "8": false,
  11. "9": true
  12. }
  13. 29:21 INFO (ThreadUtilExample.java:164) [testExecuteTestUsePartitionRunnableBuilderParamsMap()] use time:2181毫秒

2.创建指定数量 threadCount 的线程,并执行

方法Description
void execute(Runnable runnable,int threadCount)创建指定数量 threadCount 的线程,并执行.

2.1 void execute(Runnable runnable,int threadCount)

创建指定数量 threadCount 的线程,并执行.