一、guava的eventBus代码实现
1、加入pom
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
2、定义一个消费者的注解
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface EventConsume {
/**
* 事件标识
*/
String identifier() default "default";
}
3、定义消费着接口
public interface EventHandler<T> {
@Subscribe
boolean process(T data);
}
4、定义一个线程池
@Slf4j
public class EventThreadPoolFactory {
private static final int DEFAULT_CORE_SIZE = 0;
private static final int DEFAULT_MAX_SIZE = 2;
private static final int DEFAULT_TIMEOUT = 1;
private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.HOURS;
private static final int DEFAULT_QUEUE_SIZE = 1;
private static final BlockingQueue<Runnable> DEFAULT_WORK_QUEUE = new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE);
public static Executor buildDefaultExecutor(String identifier) {
return new ThreadPoolExecutor(DEFAULT_CORE_SIZE,
DEFAULT_MAX_SIZE,
DEFAULT_TIMEOUT,
DEFAULT_TIME_UNIT,
DEFAULT_WORK_QUEUE,
new ThreadFactoryBuilder().setNameFormat(String.format("%s-", identifier)).build(),
new RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.error("线程池[ {} ]等待队列已满,正在执行阻塞等待", executor.toString());
if (!executor.isShutdown()) {
try {
executor.getQueue().put(r);
} catch (Exception e) {
log.error("阻塞策略异常", e);
}
}
}
});
}
}
5、实现消费这注册
@Component
@Slf4j
public class EventHub {
private final Map<String, AsyncEventBus> eventBusMap = new ConcurrentHashMap<>();
@Autowired
private ApplicationContext applicationContext;
@PostConstruct
public void onStart() {
this.loadEventHandler();
}
public void loadEventHandler() {
Map<String, EventHandler> eventHandlerMap = applicationContext.getBeansOfType(EventHandler.class);
for (EventHandler eventHandler : eventHandlerMap.values()) {
this.register(eventHandler);
}
}
private void register(EventHandler eventHandler) {
EventConsume eventConsume = eventHandler.getClass().getAnnotation(EventConsume.class);
if (eventConsume == null || StringUtils.isBlank(eventConsume.identifier())) {
log.error("EventHandler[ {} ]没有配置 identifier ,注册失败", eventHandler.getClass().getSimpleName());
return;
}
String identifier = eventConsume.identifier();
AsyncEventBus eventBus = eventBusMap.get(identifier);
if (eventBus == null) {
AsyncEventBus asyncEventBus = new AsyncEventBus(identifier, EventThreadPoolFactory.buildDefaultExecutor(identifier));
eventBus = ObjectUtils.defaultIfNull(eventBusMap.putIfAbsent(identifier, asyncEventBus), asyncEventBus);
}
eventBus.register(eventHandler);
}
public EventBus getEventBus(String identifier) {
return eventBusMap.get(identifier);
}
}
6、实现生产者
@Component
@Slf4j
public class EventProducer {
@Autowired
private EventHub eventHub;
public <T> void post(String identifier, T data) {
EventBus eventBus = eventHub.getEventBus(identifier);
if (eventBus == null) {
log.error("identifier [ {} ] 没有事件监听者", identifier);
return;
}
eventBus.post(data);
}
}
7、简单使用
7.1 定义dto
@Data
public class Order {
private String name;
private Long id;
}
7.1定义消费者
@Service
@EventConsume(identifier="order")
public class OrderEventHandlerImpl implements EventHandler<Order> {
@Override
public boolean process(Order data) {
System.out.println(data.getId());
return true;
}
}
7.1 定义一个controller
@RestController
public class EventController {
@Autowired
private EventProducer eventProducer;
@RequestMapping("/order")
public void test(@RequestBody Order order){
//如果要发布其他的事件,只需要改变事件类型。
eventProducer.post("order",order);
System.out.println("发送order消息结束");
}
二、disruptor异步队列
1、增加pom
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
2、配置Disruptor
@Configuration
public class DisruptorConfiguration {
private static final ThreadFactory DISRUPTOR_THREAD_FACTORY = new ThreadFactoryBuilder()
.setNameFormat("disruptor-%d")
.build();
/**
* 指定ringBuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
*/
private static final int BUFFER_SIZE = 1024*1024;
@Bean("disruptorEventHandler")
public EventHandler nodeEventHandler() {
return new DisruptorEventHandler();
}
@Bean("disruptorRingBuffer")
public RingBuffer<DisruptorMsgEvent> nodeEventRingBuffer(@Qualifier("disruptorEventHandler") EventHandler eventHandler) {
//指定事件工厂
DisruptorMsgEvent.Factory factory = new DisruptorMsgEvent.Factory();
//单线程模式,获取额外的性能
Disruptor<DisruptorMsgEvent> disruptor = new Disruptor<>(factory, BUFFER_SIZE, DISRUPTOR_THREAD_FACTORY, ProducerType.MULTI, new YieldingWaitStrategy());
//设置事件业务处理器---消费者
disruptor.handleEventsWith(eventHandler);
// 启动disruptor线程
disruptor.start();
//获取ringBuffer环,用于接取生产者生产的事件
return disruptor.getRingBuffer();
}
}
3、定义事件对象
@Data
public class DisruptorMsgEvent {
private String value;
public static class Factory implements EventFactory<DisruptorMsgEvent> {
@Override
public DisruptorMsgEvent newInstance() {
return new DisruptorMsgEvent();
}
}
}
5、定义消费者
@Slf4j
public class DisruptorEventHandler implements EventHandler<DisruptorMsgEvent> {
@Override
public void onEvent(DisruptorMsgEvent disruptorMsgEvent, long l, boolean b) throws Exception {
System.out.println(disruptorMsgEvent.getValue());
}
}
6、简单测试
@RestController
@Slf4j
public class DisruptorController {
@Autowired
@Qualifier("disruptorRingBuffer")
private RingBuffer<DisruptorMsgEvent> ringBuffer;
@RequestMapping("sendDisruptor")
public String sendDisruptor(@RequestParam("value") String value){
//获取下一个Event槽的下标
long sequence = ringBuffer.next();
log.info("sequence={}",sequence);
try {
//给Event填充数据
DisruptorMsgEvent event = ringBuffer.get(sequence);
event.setValue(value);
} catch (Exception e) {
log.error("failed to add event to ringBuffer for : e = {}", e.getMessage(), e);
} finally {
//发布Event,激活观察者去消费,将sequence传递给改消费者
//注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
ringBuffer.publish(sequence);
}
return null;
}
}
评论区