问题描述

下载数据excel时,发现数据中有一列只有部分数据

伪代码

//获取Customer
CustomerListModel customerListModel = new CustomerListModel();
List<Callable<CustomerListModel>> bonusGetDetailTasks = Lists.newArrayList();
for (GetCustomerListRequest customerListRequest : requests) {
    bonusGetDetailTasks.add(() -> {
        CustomerListModel model = this.getCustomerList(customerListRequest);
        if (model.getList() != null) {
            customerListModel.getList().addAll(model.getList());
        }
        return customerListModel;
    });
}
AsyncExecutor.executeTasks(bonusGetDetailTasks);
//load bonus
List<Callable<CustomerModel>> bonusGetDetailTasks = Lists.newArrayList();
for (CustomerModel customerModel : customerListModel.getList()) {
    bonusGetDetailTasks.add(() -> {
        BonusBaseModel bonusBaseModel = bonusOperationService.queryBonus(customerModel);
        customerModel.setBonus(bonusBaseModel.getBonus());
        return customerModel;
    });
}
AsyncExecutor.executeTasks(bonusGetDetailTasks);

AsyncExecutor

public class AsyncExecutor {
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 20L, TimeUnit.SECONDS, new ArrayBlockingQueue(50000));

    private static ListeningExecutorService executorService = MoreExecutors
            .listeningDecorator(executor);

    public static <T> List<T> executeTasks(List<Callable<T>> tasks) throws InterruptedException {
        try {
            return executeTasks(tasks, 5000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            throw new InterruptedException();
        }
    }

    public static <T> List<T> executeTasks(List<Callable<T>> tasks, int time, TimeUnit timeUnit)
            throws InterruptedException {
        final List<T> results = new CopyOnWriteArrayList<>();
        log.warn("begin exec tasks");
        final List<ListenableFuture<T>> futureTasks = new ArrayList<>(tasks.size());
        tasks.forEach(task -> {
            final ListenableFuture<T> futureTask = executorService.submit(task);
            futureTasks.add(futureTask);
        });

        final CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
        for (ListenableFuture<T> futureTask : futureTasks) {
            Futures.addCallback(futureTask, new DownFutureCallback(countDownLatch, results),MoreExecutors.directExecutor());
        }
        countDownLatch.await(time, timeUnit);
        return results;
    }
}

排查

由于数据中一部分客户是有积分数据的,一部分没有,故猜测问题点是AsyncExecutor.executeTasks时超时,导致未等待api响应setBonus,调整获取积分中的AsyncExecutor.executeTasks(bonusGetDetailTasks);executeTasks(tasks, 50000, TimeUnit.MILLISECONDS)

调整后发现线上仍然存在无bonus的数据,经添加task执行日志、customer请求日志,setBonus等日志后发现在请求customer时,未获取完成所有customer即await timeout,所以在后续获取积分时,只会获取之前请求完成的customer对应的积分,而在获取积分的await中,会将剩下的customer获取完成,故调整获取customer时的time

结论

使用countDownLatch要注意timeout设置