From 7486f842658e9ae6470b9990949511e23ebf3de9 Mon Sep 17 00:00:00 2001 From: robin Date: Fri, 6 May 2022 18:21:00 +0800 Subject: [PATCH] :sparkles: provider and consumer --- alis-component/pom.xml | 8 +++ .../annotation/EnableConsumerModel.java | 19 +++++++ .../component/config/AbstractConsumer.java | 51 +++++++++++++++++++ .../component/config/AbstractProvider.java | 50 ++++++++++++++++++ .../component/config/DefaultConsumer.java | 30 +++++++++++ .../component/config/DefaultProvider.java | 28 ++++++++++ .../config/consumer/ConsumerRegister.java | 30 +++++++++++ .../aliscomponent/ProviderTest.java | 45 ++++++++++++++++ pom.xml | 2 +- 9 files changed, 262 insertions(+), 1 deletion(-) create mode 100644 alis-component/src/main/java/org/alis/aliscomponent/annotation/EnableConsumerModel.java create mode 100644 alis-component/src/main/java/org/alis/aliscomponent/component/config/AbstractConsumer.java create mode 100644 alis-component/src/main/java/org/alis/aliscomponent/component/config/AbstractProvider.java create mode 100644 alis-component/src/main/java/org/alis/aliscomponent/component/config/DefaultConsumer.java create mode 100644 alis-component/src/main/java/org/alis/aliscomponent/component/config/DefaultProvider.java create mode 100644 alis-component/src/main/java/org/alis/aliscomponent/config/consumer/ConsumerRegister.java create mode 100644 alis-component/src/test/java/org/alis/aliscomponent/aliscomponent/ProviderTest.java diff --git a/alis-component/pom.xml b/alis-component/pom.xml index dfeb7b3..f4ef170 100644 --- a/alis-component/pom.xml +++ b/alis-component/pom.xml @@ -58,6 +58,14 @@ + + org.springframework.boot + spring-boot-configuration-processor + + + org.springframework.boot + spring-boot-autoconfigure + diff --git a/alis-component/src/main/java/org/alis/aliscomponent/annotation/EnableConsumerModel.java b/alis-component/src/main/java/org/alis/aliscomponent/annotation/EnableConsumerModel.java new file mode 100644 index 0000000..a3e087c --- /dev/null +++ b/alis-component/src/main/java/org/alis/aliscomponent/annotation/EnableConsumerModel.java @@ -0,0 +1,19 @@ +package org.alis.aliscomponent.annotation; + +import org.alis.aliscomponent.config.consumer.ConsumerRegister; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.*; + +/** + * 开启消费者模型 + * @author robin + * @date 2022/5/6 17:09 + */ + +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Import(ConsumerRegister.class) +public @interface EnableConsumerModel { +} diff --git a/alis-component/src/main/java/org/alis/aliscomponent/component/config/AbstractConsumer.java b/alis-component/src/main/java/org/alis/aliscomponent/component/config/AbstractConsumer.java new file mode 100644 index 0000000..31c5ad8 --- /dev/null +++ b/alis-component/src/main/java/org/alis/aliscomponent/component/config/AbstractConsumer.java @@ -0,0 +1,51 @@ +package org.alis.aliscomponent.component.config; + +import cn.hutool.core.thread.ThreadUtil; + +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +/** + * @author lc + * @date 2022/5/6 17:26 + */ +public abstract class AbstractConsumer { + + private final BlockingQueue taskQueue; + + private final Executor executor; + + public AbstractConsumer(BlockingQueue taskQueue) { + this(taskQueue, ThreadUtil.newExecutor()); + } + + public AbstractConsumer(BlockingQueue taskQueue, Executor executor) { + this.taskQueue = taskQueue; + this.executor = executor; + } + + public void startConsumer() { + executor.execute(() -> { + try { + T e; + while (Objects.nonNull(e = taskQueue.poll(30L, TimeUnit.SECONDS))) { + consume(e); + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + }); + } + + /** + * 消费 + * + * @param e 节点 + */ + protected abstract void consume(T e); + +} diff --git a/alis-component/src/main/java/org/alis/aliscomponent/component/config/AbstractProvider.java b/alis-component/src/main/java/org/alis/aliscomponent/component/config/AbstractProvider.java new file mode 100644 index 0000000..8fd98c7 --- /dev/null +++ b/alis-component/src/main/java/org/alis/aliscomponent/component/config/AbstractProvider.java @@ -0,0 +1,50 @@ +package org.alis.aliscomponent.component.config; + +import cn.hutool.core.thread.ThreadUtil; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * @author lc + * @date 2022/5/6 17:26 + */ +public abstract class AbstractProvider { + + private final BlockingQueue taskQueue; + + private final Executor executor; + + private final ScheduledExecutorService scheduledExecutorService; + + public AbstractProvider(BlockingQueue taskQueue) { + this(taskQueue, ThreadUtil.newExecutor()); + } + + public AbstractProvider(BlockingQueue taskQueue, Executor executor) { + this.taskQueue = taskQueue; + this.executor = executor; + this.scheduledExecutorService = ThreadUtil.createScheduledExecutor(1); + } + + public void startProvide() { + scheduledExecutorService.scheduleAtFixedRate(() -> executor.execute(() -> { + try { + taskQueue.offer(provide(), 30L, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + }), 0L, 1L, TimeUnit.SECONDS); + } + + /** + * 生产者 + * + * @return 生产者生产商品 + */ + protected abstract T provide(); + +} diff --git a/alis-component/src/main/java/org/alis/aliscomponent/component/config/DefaultConsumer.java b/alis-component/src/main/java/org/alis/aliscomponent/component/config/DefaultConsumer.java new file mode 100644 index 0000000..353c91c --- /dev/null +++ b/alis-component/src/main/java/org/alis/aliscomponent/component/config/DefaultConsumer.java @@ -0,0 +1,30 @@ +package org.alis.aliscomponent.component.config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; + +/** + * @author lc + * @date 2022/5/6 18:01 + */ +public class DefaultConsumer extends AbstractConsumer { + private static final Logger log = LoggerFactory.getLogger(DefaultConsumer.class); + + public DefaultConsumer(BlockingQueue taskQueue) { + super(taskQueue); + } + + public DefaultConsumer(BlockingQueue taskQueue, Executor executor) { + super(taskQueue, executor); + } + + @Override + protected void consume(String e) { + log.info("consumer {} consume {}",hashCode(),e); + } + + +} diff --git a/alis-component/src/main/java/org/alis/aliscomponent/component/config/DefaultProvider.java b/alis-component/src/main/java/org/alis/aliscomponent/component/config/DefaultProvider.java new file mode 100644 index 0000000..6226773 --- /dev/null +++ b/alis-component/src/main/java/org/alis/aliscomponent/component/config/DefaultProvider.java @@ -0,0 +1,28 @@ +package org.alis.aliscomponent.component.config; + +import cn.hutool.core.thread.ThreadUtil; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; + +/** + * @author lc + * @date 2022/5/6 17:58 + */ +public class DefaultProvider extends AbstractProvider{ + + public DefaultProvider(BlockingQueue taskQueue) { + super(taskQueue); + } + + public DefaultProvider(BlockingQueue taskQueue, Executor executor) { + super(taskQueue, executor); + } + + @Override + protected String provide() { + // 每隔一秒生产消息 + ThreadUtil.sleep(1000); + return this.toString()+" : provide ok"; + } +} diff --git a/alis-component/src/main/java/org/alis/aliscomponent/config/consumer/ConsumerRegister.java b/alis-component/src/main/java/org/alis/aliscomponent/config/consumer/ConsumerRegister.java new file mode 100644 index 0000000..d0f2e39 --- /dev/null +++ b/alis-component/src/main/java/org/alis/aliscomponent/config/consumer/ConsumerRegister.java @@ -0,0 +1,30 @@ +package org.alis.aliscomponent.config.consumer; + +import org.alis.aliscomponent.component.config.DefaultConsumer; +import org.alis.aliscomponent.component.config.DefaultProvider; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; +import org.springframework.beans.factory.support.BeanDefinitionRegistry; +import org.springframework.context.annotation.ImportBeanDefinitionRegistrar; +import org.springframework.core.type.AnnotationMetadata; + +import java.util.stream.Stream; + +/** + * @author lc + * @date 2022/5/6 17:11 + */ +public class ConsumerRegister implements ImportBeanDefinitionRegistrar { + + @Override + public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) { + Stream.of(DefaultConsumer.class, DefaultProvider.class) + //转化为 BeanDefinitionBuilder 对象 + .map(BeanDefinitionBuilder::genericBeanDefinition) + //转化为 BeanDefinition 对象 + .map(BeanDefinitionBuilder::getBeanDefinition) + //注册 BeanDefinition 到 BeanDefinitionRegistry + .forEach(beanDefinition -> BeanDefinitionReaderUtils.registerWithGeneratedName(beanDefinition, registry)); + + } +} diff --git a/alis-component/src/test/java/org/alis/aliscomponent/aliscomponent/ProviderTest.java b/alis-component/src/test/java/org/alis/aliscomponent/aliscomponent/ProviderTest.java new file mode 100644 index 0000000..24be465 --- /dev/null +++ b/alis-component/src/test/java/org/alis/aliscomponent/aliscomponent/ProviderTest.java @@ -0,0 +1,45 @@ +package org.alis.aliscomponent.aliscomponent; + +import cn.hutool.core.thread.ThreadUtil; +import org.alis.aliscomponent.component.config.AbstractConsumer; +import org.alis.aliscomponent.component.config.AbstractProvider; +import org.alis.aliscomponent.component.config.DefaultConsumer; +import org.alis.aliscomponent.component.config.DefaultProvider; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; + +/** + * @author lc + * @date 2022/5/6 18:05 + */ +public class ProviderTest { + + @Test + public void testConsumer() { + BlockingQueue queue = new ArrayBlockingQueue<>(100); + Executor executor = ThreadUtil.newExecutor(); + + // 4个生产者 + AbstractProvider provider = new DefaultProvider(queue, executor); + AbstractProvider provider1 = new DefaultProvider(queue, executor); + AbstractProvider provider2 = new DefaultProvider(queue, executor); + AbstractProvider provider3 = new DefaultProvider(queue, executor); + + // 1个消费者 + AbstractConsumer consumer = new DefaultConsumer(queue, executor); + + provider.startProvide(); + provider1.startProvide(); + provider2.startProvide(); + provider3.startProvide(); + + consumer.startConsumer(); + + + ThreadUtil.sleep(10L * 1000); + } + +} diff --git a/pom.xml b/pom.xml index 8e21d74..7e25846 100644 --- a/pom.xml +++ b/pom.xml @@ -37,7 +37,7 @@ 3.4.1 1.2.4 2.2.0 - 2.13.2.2 + 2.13.2 3.0.2 2.3.0 8.20.1