|
|
|
@ -5,6 +5,7 @@ import cn.hutool.core.thread.ThreadUtil;
|
|
|
|
|
import java.util.Objects;
|
|
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
|
|
import java.util.concurrent.Executor;
|
|
|
|
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -17,6 +18,9 @@ public abstract class AbstractConsumer<T> {
|
|
|
|
|
|
|
|
|
|
private final Executor executor;
|
|
|
|
|
|
|
|
|
|
private final ScheduledThreadPoolExecutor scheduledExecutorService;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public AbstractConsumer(BlockingQueue<T> taskQueue) {
|
|
|
|
|
this(taskQueue, ThreadUtil.newExecutor());
|
|
|
|
|
}
|
|
|
|
@ -24,10 +28,11 @@ public abstract class AbstractConsumer<T> {
|
|
|
|
|
public AbstractConsumer(BlockingQueue<T> taskQueue, Executor executor) {
|
|
|
|
|
this.taskQueue = taskQueue;
|
|
|
|
|
this.executor = executor;
|
|
|
|
|
this.scheduledExecutorService = ThreadUtil.createScheduledExecutor(1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void startConsumer() {
|
|
|
|
|
executor.execute(() -> {
|
|
|
|
|
scheduledExecutorService.scheduleWithFixedDelay(() -> {
|
|
|
|
|
try {
|
|
|
|
|
T e;
|
|
|
|
|
while (Objects.nonNull(e = taskQueue.poll(30L, TimeUnit.SECONDS))) {
|
|
|
|
@ -38,7 +43,7 @@ public abstract class AbstractConsumer<T> {
|
|
|
|
|
Thread.currentThread().interrupt();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
});
|
|
|
|
|
}, 1L, 1L, TimeUnit.SECONDS);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|