侧边栏壁纸
博主头像
过去的,未来的博主等级

来日可期!

  • 累计撰写 280 篇文章
  • 累计创建 43 个标签
  • 累计收到 38 条评论

guava的eventBus和disruptor异步队列入门

过去的,未来的
2022-07-13 / 0 评论 / 0 点赞 / 37 阅读 / 1,019 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-07-23,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

一、guava的eventBus代码实现

1、加入pom
     <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>19.0</version>
      </dependency>
2、定义一个消费者的注解
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface EventConsume {

    /**
     * 事件标识
     */
    String identifier() default "default";

}
3、定义消费着接口
public interface EventHandler<T> {

    @Subscribe
    boolean process(T data);

}
4、定义一个线程池
@Slf4j
public class EventThreadPoolFactory {

    private static final int DEFAULT_CORE_SIZE = 0;
    private static final int DEFAULT_MAX_SIZE = 2;
    private static final int DEFAULT_TIMEOUT = 1;
    private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.HOURS;
    private static final int DEFAULT_QUEUE_SIZE = 1;
    private static final BlockingQueue<Runnable> DEFAULT_WORK_QUEUE = new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE);

    public static Executor buildDefaultExecutor(String identifier) {
        return new ThreadPoolExecutor(DEFAULT_CORE_SIZE,
                DEFAULT_MAX_SIZE,
                DEFAULT_TIMEOUT,
                DEFAULT_TIME_UNIT,
                DEFAULT_WORK_QUEUE,
                new ThreadFactoryBuilder().setNameFormat(String.format("%s-", identifier)).build(),
                new RejectedExecutionHandler(){
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        log.error("线程池[ {} ]等待队列已满,正在执行阻塞等待", executor.toString());
                        if (!executor.isShutdown()) {
                            try {
                                executor.getQueue().put(r);
                            } catch (Exception e) {
                                log.error("阻塞策略异常", e);
                            }
                        }
                    }
                });
    }

}
5、实现消费这注册
@Component
@Slf4j
public class EventHub {

    private final Map<String, AsyncEventBus> eventBusMap = new ConcurrentHashMap<>();

    @Autowired
    private ApplicationContext applicationContext;

    @PostConstruct
    public void onStart() {
        this.loadEventHandler();
    }

    public void loadEventHandler() {
        Map<String, EventHandler> eventHandlerMap = applicationContext.getBeansOfType(EventHandler.class);
        for (EventHandler eventHandler : eventHandlerMap.values()) {
            this.register(eventHandler);
        }
    }

    private void register(EventHandler eventHandler) {
        EventConsume eventConsume = eventHandler.getClass().getAnnotation(EventConsume.class);
        if (eventConsume == null || StringUtils.isBlank(eventConsume.identifier())) {
            log.error("EventHandler[ {} ]没有配置 identifier ,注册失败", eventHandler.getClass().getSimpleName());
            return;
        }
        String identifier = eventConsume.identifier();
        AsyncEventBus eventBus = eventBusMap.get(identifier);
        if (eventBus == null) {
            AsyncEventBus asyncEventBus = new AsyncEventBus(identifier, EventThreadPoolFactory.buildDefaultExecutor(identifier));
            eventBus = ObjectUtils.defaultIfNull(eventBusMap.putIfAbsent(identifier, asyncEventBus), asyncEventBus);
        }
        eventBus.register(eventHandler);
    }

    public EventBus getEventBus(String identifier) {
        return eventBusMap.get(identifier);
    }
}

6、实现生产者
@Component
@Slf4j
public class EventProducer {

    @Autowired
    private EventHub eventHub;

    public <T> void post(String identifier, T data) {
        EventBus eventBus = eventHub.getEventBus(identifier);
        if (eventBus == null) {
            log.error("identifier [ {} ] 没有事件监听者", identifier);
            return;
        }
        eventBus.post(data);
    }

}

7、简单使用
7.1 定义dto
@Data
public class Order {
    private String name;
    private Long id;
}
7.1定义消费者
@Service
@EventConsume(identifier="order")
public class OrderEventHandlerImpl implements EventHandler<Order> {
    @Override
    public boolean process(Order data) {
         System.out.println(data.getId());
        return true;
    }
}

7.1 定义一个controller
@RestController
public class EventController {


    @Autowired
    private EventProducer eventProducer;

    @RequestMapping("/order")
    public void test(@RequestBody Order order){
          //如果要发布其他的事件,只需要改变事件类型。
         eventProducer.post("order",order);
         System.out.println("发送order消息结束");
    }

二、disruptor异步队列

1、增加pom
 <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.2</version>
        </dependency>
2、配置Disruptor
@Configuration
public class DisruptorConfiguration {
    private static final ThreadFactory DISRUPTOR_THREAD_FACTORY = new ThreadFactoryBuilder()
            .setNameFormat("disruptor-%d")
            .build();

    /**
     * 指定ringBuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
     */
    private static final int BUFFER_SIZE = 1024*1024;

    @Bean("disruptorEventHandler")
    public EventHandler nodeEventHandler() {
        return new DisruptorEventHandler();
    }

    @Bean("disruptorRingBuffer")
    public RingBuffer<DisruptorMsgEvent> nodeEventRingBuffer(@Qualifier("disruptorEventHandler") EventHandler eventHandler) {
        //指定事件工厂
        DisruptorMsgEvent.Factory factory = new DisruptorMsgEvent.Factory();

        //单线程模式,获取额外的性能
        Disruptor<DisruptorMsgEvent> disruptor = new Disruptor<>(factory, BUFFER_SIZE, DISRUPTOR_THREAD_FACTORY, ProducerType.MULTI, new YieldingWaitStrategy());

        //设置事件业务处理器---消费者
        disruptor.handleEventsWith(eventHandler);

        // 启动disruptor线程
        disruptor.start();

        //获取ringBuffer环,用于接取生产者生产的事件
        return disruptor.getRingBuffer();
    }
}
3、定义事件对象
@Data
public class DisruptorMsgEvent {
    private String value;
    public static class Factory implements EventFactory<DisruptorMsgEvent> {

        @Override
        public DisruptorMsgEvent newInstance() {
            return new DisruptorMsgEvent();
        }
    }
}

5、定义消费者
@Slf4j
public class DisruptorEventHandler implements EventHandler<DisruptorMsgEvent> {
    @Override
    public void onEvent(DisruptorMsgEvent disruptorMsgEvent, long l, boolean b) throws Exception {
         System.out.println(disruptorMsgEvent.getValue());
    }
}
6、简单测试
@RestController
@Slf4j
public class DisruptorController {

    @Autowired
    @Qualifier("disruptorRingBuffer")
    private RingBuffer<DisruptorMsgEvent> ringBuffer;

    @RequestMapping("sendDisruptor")
    public String sendDisruptor(@RequestParam("value") String value){
       //获取下一个Event槽的下标
        long sequence = ringBuffer.next();
        log.info("sequence={}",sequence);
        try {
            //给Event填充数据
            DisruptorMsgEvent event = ringBuffer.get(sequence);
            event.setValue(value);

        } catch (Exception e) {
            log.error("failed to add event to ringBuffer for : e = {}", e.getMessage(), e);
        } finally {
            //发布Event,激活观察者去消费,将sequence传递给改消费者
            //注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
            ringBuffer.publish(sequence);
        }
        return null;

    }
}
0

评论区