🎨 任务编排

master
alis-lc 1 year ago
parent 1b6049f02f
commit 7c0a1e65b8

@ -0,0 +1,112 @@
package org.alis.aliscomponent.executor;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.springframework.beans.factory.DisposableBean;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
/**
*
*
* @author lc
* @date 2023/10/19 15:52
*/
@Slf4j
public class TaskDispatcher implements DisposableBean {
private final Integer queueSize = 1024;
/**
*
*/
private final ExecutorService executor;
private Integer executionNum = 1024;
public TaskDispatcher() {
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueSize),
ThreadFactoryBuilder.create().setNamePrefix("dispatch-task-").build(),
new ThreadPoolExecutor.AbortPolicy()
);
}
public void setExecution(Integer executionNum) {
if (executionNum > queueSize) {
throw new UnsupportedOperationException("任务数量过大请小于:" + queueSize);
}
this.executionNum = executionNum;
}
public TaskDispatcher(ExecutorService executor) {
this.executor = executor;
}
public void dispatch(List<Runnable> list) {
if (CollectionUtils.isEmpty(list)) {
return;
}
// 开始编排任务
// 开始分发任务
ThreadUtil.execute(() -> {
// 创建一个任务控制器
Semaphore semaphore = new Semaphore(executionNum);
list.forEach(task -> {
try {
semaphore.acquire(1);
executor.execute(() -> {
task.run();
semaphore.release(1);
});
} catch (InterruptedException e) {
log.error("执行任务线程异常中断,thread线程名{}->", Thread.currentThread().getName(), e);
Thread.currentThread().interrupt();
}
});
});
// 任务分发完成
log.info("任务分发结束");
}
public <T> List<Future<T>> allOver(List<Callable<T>> list) {
if (CollectionUtils.isEmpty(list)) {
return Collections.emptyList();
}
List<Future<T>> result = new ArrayList<>();
List<List<Callable<T>>> partitions = ListUtils.partition(list, executionNum );
CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
for (List<Callable<T>> partition : partitions) {
ThreadUtil.execute(()->{
try {
result.addAll(executor.invokeAll(partition));
} catch (InterruptedException e) {
log.error("执行任务线程异常中断,thread线程名{}->", Thread.currentThread().getName(), e);
Thread.currentThread().interrupt();
}
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return result;
}
@Override
public void destroy() throws Exception {
executor.shutdown();
}
}
Loading…
Cancel
Save