From 7c0a1e65b81e1b5da060d15923dddaa0c40bca14 Mon Sep 17 00:00:00 2001 From: alis-lc Date: Fri, 20 Oct 2023 10:19:54 +0000 Subject: [PATCH] =?UTF-8?q?:art:=20=E4=BB=BB=E5=8A=A1=E7=BC=96=E6=8E=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../executor/TaskDispatcher.java | 112 ++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 alis-component/src/main/java/org/alis/aliscomponent/executor/TaskDispatcher.java diff --git a/alis-component/src/main/java/org/alis/aliscomponent/executor/TaskDispatcher.java b/alis-component/src/main/java/org/alis/aliscomponent/executor/TaskDispatcher.java new file mode 100644 index 0000000..4ebf32e --- /dev/null +++ b/alis-component/src/main/java/org/alis/aliscomponent/executor/TaskDispatcher.java @@ -0,0 +1,112 @@ +package org.alis.aliscomponent.executor; + +import cn.hutool.core.thread.ThreadFactoryBuilder; +import cn.hutool.core.thread.ThreadUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.ListUtils; +import org.springframework.beans.factory.DisposableBean; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.*; + +/** + * 任务编排 + * + * @author lc + * @date 2023/10/19 15:52 + */ +@Slf4j +public class TaskDispatcher implements DisposableBean { + + private final Integer queueSize = 1024; + /** + * 执行器 + */ + private final ExecutorService executor; + + private Integer executionNum = 1024; + + + public TaskDispatcher() { + executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(queueSize), + ThreadFactoryBuilder.create().setNamePrefix("dispatch-task-").build(), + new ThreadPoolExecutor.AbortPolicy() + ); + } + + public void setExecution(Integer executionNum) { + if (executionNum > queueSize) { + throw new UnsupportedOperationException("任务数量过大请小于:" + queueSize); + } + this.executionNum = executionNum; + } + + + public TaskDispatcher(ExecutorService executor) { + this.executor = executor; + } + + public void dispatch(List list) { + if (CollectionUtils.isEmpty(list)) { + return; + } + // 开始编排任务 + // 开始分发任务 + ThreadUtil.execute(() -> { + // 创建一个任务控制器 + Semaphore semaphore = new Semaphore(executionNum); + list.forEach(task -> { + try { + semaphore.acquire(1); + executor.execute(() -> { + task.run(); + semaphore.release(1); + }); + } catch (InterruptedException e) { + log.error("执行任务线程异常中断,thread线程名:{}->", Thread.currentThread().getName(), e); + Thread.currentThread().interrupt(); + } + }); + }); + // 任务分发完成 + log.info("任务分发结束"); + } + + + public List> allOver(List> list) { + if (CollectionUtils.isEmpty(list)) { + return Collections.emptyList(); + } + List> result = new ArrayList<>(); + List>> partitions = ListUtils.partition(list, executionNum ); + CountDownLatch countDownLatch = new CountDownLatch(partitions.size()); + for (List> partition : partitions) { + ThreadUtil.execute(()->{ + try { + result.addAll(executor.invokeAll(partition)); + } catch (InterruptedException e) { + log.error("执行任务线程异常中断,thread线程名:{}->", Thread.currentThread().getName(), e); + Thread.currentThread().interrupt(); + } + countDownLatch.countDown(); + }); + } + try { + countDownLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return result; + + } + + + @Override + public void destroy() throws Exception { + executor.shutdown(); + } +}