问题描述
下载数据excel时,发现数据中有一列只有部分数据
伪代码
//获取Customer
CustomerListModel customerListModel = new CustomerListModel();
List<Callable<CustomerListModel>> bonusGetDetailTasks = Lists.newArrayList();
for (GetCustomerListRequest customerListRequest : requests) {
bonusGetDetailTasks.add(() -> {
CustomerListModel model = this.getCustomerList(customerListRequest);
if (model.getList() != null) {
customerListModel.getList().addAll(model.getList());
}
return customerListModel;
});
}
AsyncExecutor.executeTasks(bonusGetDetailTasks);
//load bonus
List<Callable<CustomerModel>> bonusGetDetailTasks = Lists.newArrayList();
for (CustomerModel customerModel : customerListModel.getList()) {
bonusGetDetailTasks.add(() -> {
BonusBaseModel bonusBaseModel = bonusOperationService.queryBonus(customerModel);
customerModel.setBonus(bonusBaseModel.getBonus());
return customerModel;
});
}
AsyncExecutor.executeTasks(bonusGetDetailTasks);
AsyncExecutor
public class AsyncExecutor {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 20L, TimeUnit.SECONDS, new ArrayBlockingQueue(50000));
private static ListeningExecutorService executorService = MoreExecutors
.listeningDecorator(executor);
public static <T> List<T> executeTasks(List<Callable<T>> tasks) throws InterruptedException {
try {
return executeTasks(tasks, 5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new InterruptedException();
}
}
public static <T> List<T> executeTasks(List<Callable<T>> tasks, int time, TimeUnit timeUnit)
throws InterruptedException {
final List<T> results = new CopyOnWriteArrayList<>();
log.warn("begin exec tasks");
final List<ListenableFuture<T>> futureTasks = new ArrayList<>(tasks.size());
tasks.forEach(task -> {
final ListenableFuture<T> futureTask = executorService.submit(task);
futureTasks.add(futureTask);
});
final CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
for (ListenableFuture<T> futureTask : futureTasks) {
Futures.addCallback(futureTask, new DownFutureCallback(countDownLatch, results),MoreExecutors.directExecutor());
}
countDownLatch.await(time, timeUnit);
return results;
}
}
排查
由于数据中一部分客户是有积分数据的,一部分没有,故猜测问题点是AsyncExecutor.executeTasks
时超时,导致未等待api响应setBonus,调整获取积分中的AsyncExecutor.executeTasks(bonusGetDetailTasks);
为executeTasks(tasks, 50000, TimeUnit.MILLISECONDS)
调整后发现线上仍然存在无bonus的数据,经添加task执行日志、customer请求日志,setBonus等日志后发现在请求customer时,未获取完成所有customer即await timeout,所以在后续获取积分时,只会获取之前请求完成的customer对应的积分,而在获取积分的await中,会将剩下的customer获取完成,故调整获取customer时的time
结论
使用countDownLatch要注意timeout设置