`
Franciswmf
  • 浏览: 779383 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

异步批量执行任务与回滚

阅读更多
java异步方式(结合@Async和CompletableFuture)处理批量任务执行和回滚(只回滚执行失败的批次,执行成功的批次不回滚)

    @Autowired
    @Qualifier(value = "customThreadPoolTaskExecutor")
    private ThreadPoolTaskExecutor customThreadPoolTaskExecutor;
    @Autowired
    private Environment environment;
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private PlatformTransactionManager platformTransactionManager;
    /**
     * 测试异步批量审批与回滚
     * @param orderList
     * @param traceId
     * @throws Exception
     */
    @Async(value = "customThreadPoolTaskExecutor")
    public void batchAuditDemo(List<Order> orderList, String traceId){
        log.info("异步开始...traceId={}", traceId);
        long start = System.currentTimeMillis();
        //批量任务拆分成多个批次执行,每个批次跑固定数量的任务
        List<String> orderIdList = new ArrayList<>();
        for (Order temp : orderList) {
            orderIdList.add(temp.getId());
        }
        Integer size = Integer.parseInt(environment.getProperty(Constant.SIZE));
        //拆分
        Map<Integer, List<String>> map = spiltListFun(orderIdList, size);
        //批次执行结果
        boolean batchResult = false;
        for (Integer batchNo : map.keySet()) {
            //单批次异步审批(手动控制事务)
            batchResult = batchAsyncFun(batchNo, map.get(batchNo),traceId);
            if(batchResult){
                //...
                log.info("第{}批次执行失败,已回滚", batchNo);
            }
        }
        //...
        log.info("异步结束, 总耗时:{}", (System.currentTimeMillis() - start));
        orderIdList = null;
        map = null;
    }


    /**
     * 单批次异步审批(手动控制事务)
     * @param batchNo
     * @param orderIdList
     * @param traceId 本次批量审批的标记
     * @return
     */
    public boolean batchAsyncFun(int batchNo, List<String> orderIdList, String traceId){
        log.info("本次批量审批第{}批次开始执行:orderIdList={}", batchNo, orderIdList);
		redisTemplate.delete(currentBatchKey);
		//本批次是否进行了回滚
        final Map<Integer, Boolean> returnMap = new ConcurrentHashMap<>();
        CompletableFuture<Void> all = null;
        final Map<String, TransactionStatus> transactionMap = new ConcurrentHashMap<>();
        //用于保存当前批量审批任一批次执行结果的缓存key
        String currentBatchKey = RedisConstant.LIST_BATCH_APPROVAL + traceId;
        final CountDownLatch countDownLatch = new CountDownLatch(orderIdList.size());
        for (String orderId : orderIdList) {
            CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(() -> {
                //1.创建异步操作,支持返回值
                log.info("orderId={}, threadName={}", orderId, Thread.currentThread().getName());
                //手动控制事务
                DefaultTransactionDefinition def = new DefaultTransactionDefinition();
                //新发起一个事务
                def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
                TransactionStatus status = platformTransactionManager.getTransaction(def);
                transactionMap.put(orderId, status);
                //审批
                boolean eachResult = false;
                try {
                    //核心业务
                    //core code...;
                }catch (Exception e){
                    log.info("捕捉到异常:batchNo={}, orderId={}", batchNo, orderId);
                    e.printStackTrace();
                    eachResult =true;
                }
                if(eachResult){
                    redisTemplate.opsForList().leftPush(currentBatchKey, "0");
                }else{
                    redisTemplate.opsForList().leftPush(currentBatchKey, "1");
                }
                //根据eachResult结果判断当前记录是否需要回滚
                return eachResult;
            }, customThreadPoolTaskExecutor).handle((s, t) -> {
                //2.执行任务完成时对结果的处理(包括异常和非异常结果)
                //第一个参数s为CompletableFuture 返回的结果, 第二个参数t为抛出的异常
                //log.info("子任务完成:batchNo={}, orderId={}, s={}", batchNo, orderId, s);
                boolean needRollback = false;
                int size = Integer.valueOf(Long.toString(redisTemplate.opsForList().size(currentBatchKey)));
                //循环等待所有任务都有结果(考虑引入超时机制)
                while(true){
                    //log.info("循环等待...realSize={}", size);
                    if(size == orderIdList.size()){
                        List<String> ls = redisTemplate.opsForList().range(currentBatchKey, 0, -1);
                        for (String str : ls) {
                            if(str.equals("0")){
                                needRollback = true;
                                break;
                            }
                        }
                        if(!returnMap.containsKey(batchNo)){
                            returnMap.put(batchNo, needRollback);
                        }
                        //根据情况,提交或者回滚事务
                        if(needRollback){
                            platformTransactionManager.rollback(transactionMap.get(orderId));
                        }else{
                            platformTransactionManager.commit(transactionMap.get(orderId));
                        }
                        countDownLatch.countDown();
                        break;
                    }else{
                        size = Integer.valueOf(Long.toString(redisTemplate.opsForList().size(currentBatchKey)));
                    }
                }
                return needRollback;
            });
            all = CompletableFuture.allOf(cf);
        }
        //CompletableFuture有任务结果返回,但事务不一定提交或者回滚
        //log.info("主线程阻塞1...");
        all.join();
        
        try {
		    //等待本批次所有任务彻底执行完
            //log.info("主线程阻塞2...");
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //本批次是否回滚:true - 是; false - 否;
        return returnMap.get(batchNo);
    }


    /**
     * 将指定集合进行拆分
     * @param list 集合
     * @param len 拆分长度,每个集合按照拆分长度进行分割
     * @return
     */
    public static Map<Integer, List<String>> spiltListFun(List<String> list, int len){
        Map<Integer, List<String>> map = new HashMap<>();
        if(null == list || len < 1){
            map.put(1, new ArrayList<>());
            return map;
        }
        int size = list.size();
        log.info("size={}, len={}", size, len);
        if(size <= len){
            map.put(1, list);
        }else{
            //分隔后的集合个数
            int count = (size + len - 1) / len;
            for (int i = 0; i < count; i++) {
                //子集合
                List<String> subList = list.subList(i * len, (len * (i + 1) > size ? size : len * (i + 1)));
                //批次号,子集合
                map.put(i+1, subList);
            }
        }
//        log.info("map={}", map.toString());
        return map;
    }
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics