provider and consumer

master
robin 3 years ago
parent 8a516a5321
commit 7486f84265

@ -58,6 +58,14 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
</dependencies>

@ -0,0 +1,19 @@
package org.alis.aliscomponent.annotation;
import org.alis.aliscomponent.config.consumer.ConsumerRegister;
import org.springframework.context.annotation.Import;
import java.lang.annotation.*;
/**
*
* @author robin
* @date 2022/5/6 17:09
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(ConsumerRegister.class)
public @interface EnableConsumerModel {
}

@ -0,0 +1,51 @@
package org.alis.aliscomponent.component.config;
import cn.hutool.core.thread.ThreadUtil;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
* @author lc
* @date 2022/5/6 17:26
*/
public abstract class AbstractConsumer<T> {
private final BlockingQueue<T> taskQueue;
private final Executor executor;
public AbstractConsumer(BlockingQueue<T> taskQueue) {
this(taskQueue, ThreadUtil.newExecutor());
}
public AbstractConsumer(BlockingQueue<T> taskQueue, Executor executor) {
this.taskQueue = taskQueue;
this.executor = executor;
}
public void startConsumer() {
executor.execute(() -> {
try {
T e;
while (Objects.nonNull(e = taskQueue.poll(30L, TimeUnit.SECONDS))) {
consume(e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
/**
*
*
* @param e
*/
protected abstract void consume(T e);
}

@ -0,0 +1,50 @@
package org.alis.aliscomponent.component.config;
import cn.hutool.core.thread.ThreadUtil;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author lc
* @date 2022/5/6 17:26
*/
public abstract class AbstractProvider<T> {
private final BlockingQueue<T> taskQueue;
private final Executor executor;
private final ScheduledExecutorService scheduledExecutorService;
public AbstractProvider(BlockingQueue<T> taskQueue) {
this(taskQueue, ThreadUtil.newExecutor());
}
public AbstractProvider(BlockingQueue<T> taskQueue, Executor executor) {
this.taskQueue = taskQueue;
this.executor = executor;
this.scheduledExecutorService = ThreadUtil.createScheduledExecutor(1);
}
public void startProvide() {
scheduledExecutorService.scheduleAtFixedRate(() -> executor.execute(() -> {
try {
taskQueue.offer(provide(), 30L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}), 0L, 1L, TimeUnit.SECONDS);
}
/**
*
*
* @return
*/
protected abstract T provide();
}

@ -0,0 +1,30 @@
package org.alis.aliscomponent.component.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
/**
* @author lc
* @date 2022/5/6 18:01
*/
public class DefaultConsumer extends AbstractConsumer<String> {
private static final Logger log = LoggerFactory.getLogger(DefaultConsumer.class);
public DefaultConsumer(BlockingQueue<String> taskQueue) {
super(taskQueue);
}
public DefaultConsumer(BlockingQueue<String> taskQueue, Executor executor) {
super(taskQueue, executor);
}
@Override
protected void consume(String e) {
log.info("consumer {} consume {}",hashCode(),e);
}
}

@ -0,0 +1,28 @@
package org.alis.aliscomponent.component.config;
import cn.hutool.core.thread.ThreadUtil;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
/**
* @author lc
* @date 2022/5/6 17:58
*/
public class DefaultProvider extends AbstractProvider<String>{
public DefaultProvider(BlockingQueue<String> taskQueue) {
super(taskQueue);
}
public DefaultProvider(BlockingQueue<String> taskQueue, Executor executor) {
super(taskQueue, executor);
}
@Override
protected String provide() {
// 每隔一秒生产消息
ThreadUtil.sleep(1000);
return this.toString()+" : provide ok";
}
}

@ -0,0 +1,30 @@
package org.alis.aliscomponent.config.consumer;
import org.alis.aliscomponent.component.config.DefaultConsumer;
import org.alis.aliscomponent.component.config.DefaultProvider;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.type.AnnotationMetadata;
import java.util.stream.Stream;
/**
* @author lc
* @date 2022/5/6 17:11
*/
public class ConsumerRegister implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
Stream.of(DefaultConsumer.class, DefaultProvider.class)
//转化为 BeanDefinitionBuilder 对象
.map(BeanDefinitionBuilder::genericBeanDefinition)
//转化为 BeanDefinition 对象
.map(BeanDefinitionBuilder::getBeanDefinition)
//注册 BeanDefinition 到 BeanDefinitionRegistry
.forEach(beanDefinition -> BeanDefinitionReaderUtils.registerWithGeneratedName(beanDefinition, registry));
}
}

@ -0,0 +1,45 @@
package org.alis.aliscomponent.aliscomponent;
import cn.hutool.core.thread.ThreadUtil;
import org.alis.aliscomponent.component.config.AbstractConsumer;
import org.alis.aliscomponent.component.config.AbstractProvider;
import org.alis.aliscomponent.component.config.DefaultConsumer;
import org.alis.aliscomponent.component.config.DefaultProvider;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
/**
* @author lc
* @date 2022/5/6 18:05
*/
public class ProviderTest {
@Test
public void testConsumer() {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
Executor executor = ThreadUtil.newExecutor();
// 4个生产者
AbstractProvider<String> provider = new DefaultProvider(queue, executor);
AbstractProvider<String> provider1 = new DefaultProvider(queue, executor);
AbstractProvider<String> provider2 = new DefaultProvider(queue, executor);
AbstractProvider<String> provider3 = new DefaultProvider(queue, executor);
// 1个消费者
AbstractConsumer<String> consumer = new DefaultConsumer(queue, executor);
provider.startProvide();
provider1.startProvide();
provider2.startProvide();
provider3.startProvide();
consumer.startConsumer();
ThreadUtil.sleep(10L * 1000);
}
}

@ -37,7 +37,7 @@
<mybatis.plus.version>3.4.1</mybatis.plus.version>
<druid.version>1.2.4</druid.version>
<xxl.job.version>2.2.0</xxl.job.version>
<jackson.version>2.13.2.2</jackson.version>
<jackson.version>2.13.2</jackson.version>
<knife4j.version>3.0.2</knife4j.version>
<telesign.version>2.3.0</telesign.version>
<jwt.version>8.20.1</jwt.version>

Loading…
Cancel
Save