事件驱动模型实现

330

介绍

event-driven

事件通过 EventDispatcher 将事件分发到各个 EventChannel 中, 再由 EventProcessor 进行处理

概念

  • Event 事件
  • EventQueue 事件队列
  • EventChannel 事件通道
  • EventDispatcher 事件分发器
  • EventHandler 事件处理器 (EventListener)

代码实现

代码地址: https://github.com/Erzbir/dispatcher

使用 api-core 的架构模式, api 模块定义接口和标准, core 模块提供 api 模块的实现

代码中有关 Interceptor 的部分没有给出, 这部分可以去掉

模式

push

在这个模式中 EventDispatcher 作为 Event 的分发者将 Event 分发到各个 EventChannel 中. EventChannel 在收到 Event 后, 可以直接将 Event 广播给所有的 EventHandler

pull

在这个模式中, EventDispatcher 可以作为一个 EventContainer, 通过调用 dispatch 接口来将 Event 储存进内部维护的一个 BlockQueue 然后等待 EventChannel 来 "拉取" 这些事件, EventChannel 拉取到事件后可以选择直接广播给 EventHandler, 也可以和上面的做法一样

当然不一定要将事件存放到 EventDispatcher 中, 实际上存放到这个类中会有点职责混乱, 可以将事件分发到 EventChannel 后存放在 EventChannel 中, 不过这样做还需要再抽象出一个 EventProcessor 来拉取 EventChannel 中的事件并执行 EventHandler 的回调

拉取模式只能轮询, 但会比较好设计事件优先级

push / pull 混合

可以在推送事件时拉取一定数量的事件, 从而实现混合

api 模块

Event

在事件驱动模型中传递的元素, 由事件源产生, 通常事件中会包含产生事件的主体以及产生时间

Event

/**
 * @author Erzbir
 * @Data: 2023/12/6 10:46
 */
public interface Event {
    /**
     * @return 事件产生的时间
     */
    long timestamp();

    /**
     * @return 发布事件的主体
     */
    Object getSource();

    /**
     * @return 是否被拦截
     */
    boolean isIntercepted();

    /**
     * 拦截这个事件. 当事件被拦截之后, 调度器就不会再将事件分发到事件频道
     * <p>
     * 如果一个事件拦截器返回的结果是 false, 那么将会调用此方法拦截
     */
    void intercepted();
    
    /**
     * @return 优先级
     */
    int getPriority();
}

AbstractEvent

/**
 * @author Erzbir
 * @Data: 2023/12/6 10:47
 */
public abstract class AbstractEvent implements Event {
    private final AtomicBoolean canceled;
    protected Object source;
    protected long timestamp;
    protected final AtomicBoolean intercepted;

    public AbstractEvent(Object source) {
        this.source = source;
        intercepted = new AtomicBoolean(false);
        timestamp = System.currentTimeMillis();
        canceled = new AtomicBoolean(false);
    }

    @Override
    public long timestamp() {
        return timestamp;
    }

    @Override
    public boolean isIntercepted() {
        return intercepted.get();
    }

    @Override
    public void intercepted() {
        intercepted.set(true);
    }
    
    @Override
    public int getPriority() {
        return Integer.MAX_VALUE;
    }


    public void cancel() {
        if (!(this instanceof CancelableEvent)) return;
        canceled.set(true);
    }

    public boolean isCanceled() {
        if (!(this instanceof CancelableEvent)) throw new UnsupportedOperationException();
        return canceled.get();
    }

    @Override
    public String toString() {
        return this.getClass().getSimpleName() + "{" +
                "timestamp=" + timestamp +
                ", intercepted=" + intercepted +
                ", canceled=" + canceled +
                '}';
    }
}

EventHandler (EventListener)

收到 Event 后要执行的逻辑. 当 Event 满足条件时就会触发相应的 EventHandler

存在于 EventChannel 中, 每个 EventChannel 都可以独立注册 EventHandler

Listener<E extends Event>

这里设计成泛型的目的是为了方便在注册时, 使用 lambda 构建 Listener<E extends Event>onEvent(E event)回调可以与注册的事件类型一致. 还有一个好处是方便配合 EventChannel 在编译期限制注册的监听器类型

/**
 * @author Erzbir
 */
public interface Listener<E extends Event> {
    ListenerResult onEvent(E event);
}

ListenerResult

/**
 * 监听完成返回的结果
 *
 * @author Erzbir
 * @Data: 2024/2/3 11:44
 */
public interface ListenerResult {
    // 监听器是否被截断
    boolean isTruncated();

    // 是否继续监听
    boolean isContinue();
}


StandardListenerResult

/**
 * @author Erzbir
 * @Data: 2024/2/7 05:26
 */
public enum StandardListenerResult implements ListenerResult {
    CONTINUE(false, true),
    STOP(false, false),
    TRUNCATED(true, true);

    private final boolean isContinue;
    private final boolean isTruncated;


    StandardListenerResult(boolean isTruncated, boolean isContinue) {
        this.isContinue = isContinue;
        this.isTruncated = isTruncated;
    }

    @Override
    public boolean isTruncated() {
        return isTruncated;
    }

    public boolean isContinue() {
        return isContinue;
    }

}

EventChannel

事件的通道

用于注册 EventHandler, 通过 EventDispatcher 分发的 Event 来触发其中的 EventHandler

EventChannel 可以提供过滤的方法, 可以按照事件类型等条件从一个通道过滤出一个新通道

EventChannel<E extends Event>

这里使用泛型设计可以在编译期很好的避免一个不合法事件进入 channel. 例如一个 EventChannel<StartEvent>, 这在编译期就不允许 StopEvent 在其中流动, 以及不能注册 Listener<StopEvent>

/**
 * 事件通道, 提供 监听器 的注册接口以及容器.
 * 通过 broadcast 方法将事件广播给所有监听器
 * <p></p>
 * 所有请求委托到 `EventChannelDispatcher` 中
 *
 * @author Erzbir
 * @Data: 2023/12/6 10:46
 */
public abstract class EventChannel<E extends Event> implements ListenerContainer<E>, Cancelable, AutoCloseable {
    protected Class<E> baseEventClass;
    protected List<ListenerInterceptor> listenerInterceptors = new ArrayList<>();
    protected AtomicBoolean activated = new AtomicBoolean(true);

    public EventChannel(Class<E> baseEventClass) {
        this.baseEventClass = baseEventClass;
    }

    public Class<E> getBaseEventClass() {
        return baseEventClass;
    }

    public abstract void broadcast(EventContext eventContext);

    protected abstract ListenerHandle registerListener(Class<E> eventType, Listener<E> listener);

    public abstract <T extends E> ListenerHandle subscribe(Class<T> eventType, Function<T, ListenerResult> handle);

    public abstract <T extends E> ListenerHandle subscribeOnce(Class<T> eventType, Consumer<T> handle);

    public abstract <T extends E> ListenerHandle subscribeAlways(Class<T> eventType, Consumer<T> handle);

    public abstract Listener<E> createListener(Function<E, ListenerResult> handle);

    public abstract EventChannel<E> filter(Predicate<Event> predicate);

    public abstract <T extends E> EventChannel<T> filterInstance(Class<T> eventType);

    public void addInterceptor(ListenerInterceptor listenerInterceptor) {
        listenerInterceptors.add(listenerInterceptor);
    }

    public void close() {
        activated.set(false);
    }

    public void close(Runnable hook, boolean isAsync) {
        if (isAsync) {
            CompletableFuture.runAsync(hook);
        } else {
            hook.run();
        }
    }

    public void open() {
        activated.set(true);
    }

    @Override
    public void cancel() {
        close();
    }

    @Override
    public boolean isCanceled() {
        return !activated.get();
    }
}

EventDispatcher

事件调度器

/**
 * 事件调度器
 *
 * @author Erzbir
 * @Data: 2024/2/7 02:52
 */
public interface EventDispatcher {

    default void dispatch(Event event) {
        dispatch(event, GlobalEventChannel.INSTANCE);
    }

    <E extends Event> void dispatch(E event, EventChannel<E> channel);

    void addInterceptor(EventDispatchInterceptor eventDispatchInterceptor);

    void start();

    boolean isActive();
    
    void cancel();

    boolean isCanceled();

}

core 模块

EventChannel 的实现

委派的形式实现

EventChannelImpl

/**
 * @author Erzbir
 * @Data: 2023/12/6 13:39
 */
@Slf4j
public class EventChannelImpl<E extends Event> extends EventChannel<E> {
    private final ListenerInvoker listenerInvoker = new InterceptorInvoker(listenerInterceptors);
    // 为了避免监听器逻辑中出现阻塞, 从而导致监听无法取消
    private final Map<Listener<?>, Thread> taskMap = new WeakHashMap<>();
    protected List<ListenerDescription> listeners = new ArrayList<>();


    public EventChannelImpl(Class<E> baseEventClass) {
        super(baseEventClass);
    }

    @SuppressWarnings({"unchecked"})
    @Override
    public void broadcast(EventContext eventContext) {
        Event event = eventContext.getEvent();
        if (!(event instanceof AbstractEvent)) throw new IllegalArgumentException("Event must extend AbstractEvent");
        if (event.isIntercepted()) {
            log.debug("Event: {} was truncated, cancel broadcast", event);
            return;
        }
        callListeners((E) event);
    }

    @SuppressWarnings({"unchecked"})
    protected ListenerHandle registerListener(Class<E> eventType, Listener<E> listener) {
        Listener<E> safeListener = createSafeListener(listener);
        listeners.add(new ListenerDescription((Class<Event>) eventType, safeListener));
        return new WeakReferenceListenerHandle(listener, listeners, createHandleHook(safeListener));
    }

    @SuppressWarnings({"unchecked"})
    @Override
    public <T extends E> ListenerHandle subscribe(Class<T> eventType, Function<T, ListenerResult> handle) {
        Listener<E> listener = createListener((Function<E, ListenerResult>) handle);
        return registerListener((Class<E>) eventType, listener);
    }

    @Override
    public <T extends E> ListenerHandle subscribeOnce(Class<T> eventType, Consumer<T> handle) {
        return subscribe(eventType, event -> {
            handle.accept(event);
            return StandardListenerResult.STOP;
        });
    }

    @Override
    public <T extends E> ListenerHandle subscribeAlways(Class<T> eventType, Consumer<T> handle) {
        return subscribe(eventType, event -> {
            handle.accept(event);
            return StandardListenerResult.CONTINUE;
        });
    }


    @Override
    public Listener<E> createListener(Function<E, ListenerResult> handle) {
        return createSafeListener(handle::apply);
    }

    private Listener<E> createSafeListener(Listener<E> listener) {
        if (listener instanceof SafeListener) {
            return listener;
        }
        return new SafeListener(listener);
    }


    @Override
    public EventChannel<E> filter(Predicate<Event> predicate) {
        return new FilterEventChannel<>(this, predicate);
    }

    @SuppressWarnings({"rawtypes", "unchecked"})
    @Override
    public <T extends E> EventChannel<T> filterInstance(Class<T> eventType) {
        FilterEventChannel filterEventChannel = new FilterEventChannel(this, eventType);
        filterEventChannel.baseEventClass = eventType;
        return filterEventChannel;
    }

    @SuppressWarnings({"unchecked", "rawtypes"})
    @Override
    public Iterable<Listener<E>> getListeners() {
        return (Iterable) listeners.stream().map(ListenerDescription::listener).toList();
    }

    private void callListeners(E event) {
        for (ListenerDescription listenerDescription : listeners) {
            if (!listenerDescription.eventType().isInstance(event)) {
                continue;
            }
            process(listenerDescription, event);
        }
    }

    private void process(ListenerDescription listenerDescription, E event) {
        Listener<?> listener = listenerDescription.listener();
        log.debug("Broadcasting event: {} to listener: {}", event, listener.getClass().getSimpleName());
        Thread invokeThread = Thread.ofVirtual()
                .name("Listener-Invoke-Thread")
                .unstarted(createInvokeRunnable(event, listener));
        invokeThread.start();
        taskMap.put(listener, invokeThread);
    }

    private Runnable createInvokeRunnable(E event, Listener<?> listener) {
        return () -> {
            try {
                if (!activated.get()) {
                    Thread.currentThread().interrupt();
                    return;
                }
                ListenerResult listenerResult = listenerInvoker.invoke(new DefaultListenerContext(new DefaultEventContext(event), listener));
                if (!listenerResult.isContinue()) {
                    Thread.currentThread().interrupt();
                }
            } catch (Throwable e) {
                log.error("Calling listener error: {}", e.getMessage());
                Thread.currentThread().interrupt();
            }
        };
    }

    private Runnable createHandleHook(Listener<E> listener) {
        return () -> {
            Thread thread = taskMap.get(listener);
            if (thread == null) {
                return;
            }
            thread.interrupt();
        };
    }

    public class SafeListener implements Listener<E> {
        private final Listener<E> delegate;

        public SafeListener(Listener<E> delegate) {
            this.delegate = delegate;
        }

        @Override
        public ListenerResult onEvent(E event) {
            try {
                return delegate.onEvent(event);
            } catch (Exception e) {
                return StandardListenerResult.STOP;
            }
        }
    }
}

FilterEventChannel

/**
 * @author Erzbir
 * @Data: 2023/12/6 16:41
 */
public class FilterEventChannel<E extends Event> extends EventChannel<E> {
    private final EventChannel<E> delegate;
    private final Predicate<Event> filter;

    public FilterEventChannel(EventChannel<E> delegate, Predicate<Event> filter) {
        super(delegate.getBaseEventClass());
        this.delegate = delegate;
        this.filter = filter;
    }

    public FilterEventChannel(EventChannel<E> delegate, Class<E> eventType) {
        this(delegate, eventType::isInstance);
    }


    @Override
    public void broadcast(EventContext eventContext) {
        Event event = eventContext.getEvent();
        if (!baseEventClass.isInstance(event) && filter.test(event)) {
            return;
        }
        delegate.broadcast(eventContext);
    }

    @Override
    protected ListenerHandle registerListener(Class<E> eventType, Listener<E> listener) {
        return delegate.registerListener(eventType, intercept(listener));
    }

    @Override
    public <T extends E> ListenerHandle subscribe(Class<T> eventType, Function<T, ListenerResult> handler) {
        return delegate.subscribe(eventType, intercept(handler));
    }

    @Override
    public <T extends E> ListenerHandle subscribeOnce(Class<T> eventType, Consumer<T> handler) {
        return delegate.subscribeOnce(eventType, intercept(handler));
    }

    @Override
    public <T extends E> ListenerHandle subscribeAlways(Class<T> eventType, Consumer<T> handler) {
        return delegate.subscribeAlways(eventType, intercept(handler));
    }

    @Override
    public Listener<E> createListener(Function<E, ListenerResult> handler) {
        return delegate.createListener(intercept(handler));
    }

    private <T extends E> Consumer<T> intercept(Consumer<T> handler) {
        return (ev) -> {
            boolean filterResult;
            filterResult = getBaseEventClass().isInstance(ev) && filter.test(ev);
            if (filterResult) {
                handler.accept(ev);
            }
        };
    }

    private <T extends E> Listener<T> intercept(Listener<T> listener) {
        return (ev) -> {
            boolean filterResult;
            filterResult = getBaseEventClass().isInstance(ev) && filter.test(ev);
            if (filterResult) {
                return listener.onEvent(ev);
            } else {
                return StandardListenerResult.TRUNCATED;
            }
        };
    }

    private <T extends E> Function<T, ListenerResult> intercept(Function<T, ListenerResult> handler) {
        return (ev) -> {
            boolean filterResult;
            filterResult = getBaseEventClass().isInstance(ev) && filter.test(ev);
            if (filterResult) {
                return handler.apply(ev);
            } else {
                return StandardListenerResult.TRUNCATED;
            }
        };
    }

    @Override
    public EventChannel<E> filter(Predicate<Event> predicate) {
        return delegate.filter(predicate);
    }

    @Override
    public <T extends E> EventChannel<T> filterInstance(Class<T> eventType) {
        return delegate.filterInstance(eventType);
    }

    @Override
    public void addInterceptor(ListenerInterceptor eventInterceptor) {
        delegate.addInterceptor(eventInterceptor);
    }

    @Override
    public Iterable<Listener<E>> getListeners() {
        return delegate.getListeners();
    }
}

最终委托到 EventChannelDispatcher

/**
 * 事件的广播, 订阅等都委托到此处处理
 *
 * @author Erzbir
 * @Data: 2023/12/12 08:56
 */
public class EventChannelDispatcher<E extends Event> extends EventChannelImpl<E> {
    public static final EventChannelDispatcher<Event> INSTANCE = new EventChannelDispatcher<>(Event.class);

    private EventChannelDispatcher(Class<E> baseEventClass) {
        super(baseEventClass);
    }

    @Override
    public void broadcast(EventContext eventContext) {
        super.broadcast(eventContext);
    }
}

Invoker

ListenerInvoker

/**
 * @author Erzbir
 * @Data: 2024/2/7 02:46
 */
public sealed interface ListenerInvoker {
    ListenerResult invoke(ListenerContext listenerContext);
}

@Slf4j
final class BaseListenerInvoker implements ListenerInvoker {

    @SuppressWarnings("unchecked")
    @Override
    public ListenerResult invoke(ListenerContext listenerContext) {
        Event event = listenerContext.getEvent();
        return listenerContext.getListener().onEvent(event);
    }
}

@Slf4j
final class InterceptorInvoker implements ListenerInvoker {
    public ListenerInvoker listenerInvoker;
    public List<ListenerInterceptor> listenerInterceptors;

    public InterceptorInvoker(List<ListenerInterceptor> listenerInterceptors) {
        this.listenerInvoker = new BaseListenerInvoker();
        this.listenerInterceptors = listenerInterceptors;
    }

    @Override
    public ListenerResult invoke(ListenerContext listenerContext) {
        if (!intercept(listenerContext)) {
            return StandardListenerResult.TRUNCATED;
        }
        return listenerInvoker.invoke(listenerContext);
    }


    private boolean intercept(ListenerContext listenerContext) {
        for (ListenerInterceptor listenerInterceptor : listenerInterceptors) {
            if (!listenerInterceptor.intercept(listenerContext)) {
                Listener<?> listener = listenerContext.getListener();
                log.debug("Listener: {} was truncated", listener);
                return false;
            }
        }
        return true;
    }

}
Handle

监听器句柄

ListenerHandles

/**
 * @author Erzbir
 * @Data: 2024/2/13 16:08
 */
abstract class HookableHandle implements ListenerHandle {
    protected Runnable hook;

    public HookableHandle(Runnable hook) {
        this.hook = hook;
        if (this.hook == null) {
            this.hook = () -> {
            };
        }
    }
}

class IndexListenerHandle extends HookableHandle implements ListenerHandle {
    private final AtomicBoolean disposed = new AtomicBoolean(false);

    private final int index;
    private final List<Listener<?>> listeners;

    public IndexListenerHandle(int index, List<Listener<?>> listeners) {
        this(index, listeners, null);
    }

    public IndexListenerHandle(int index, List<Listener<?>> listeners, Runnable hook) {
        super(hook);
        this.index = index;
        this.listeners = listeners;
    }

    @Override
    public void dispose() {
        if (!disposed.compareAndSet(false, true)) {
            return;
        }
        listeners.remove(index);
        hook.run();
    }

    @Override
    public boolean isDisposed() {
        return disposed.get();
    }
}

class WeakReferenceListenerHandle extends HookableHandle implements ListenerHandle {
    private final AtomicBoolean disposed = new AtomicBoolean(false);
    private WeakReference<Listener<?>> listenerRef;
    private WeakReference<Collection<ListenerDescription>> collectionRef;

    public WeakReferenceListenerHandle(Listener<?> listener, Collection<ListenerDescription> collection) {
        this(listener, collection, null);
    }

    public WeakReferenceListenerHandle(Listener<?> listener, Collection<ListenerDescription> collection, Runnable hook) {
        super(hook);
        this.listenerRef = new WeakReference<>(listener);
        this.collectionRef = new WeakReference<>(collection);
    }


    @Override
    public void dispose() {
        if (!disposed.compareAndSet(false, true)) {
            return;
        }
        Collection<ListenerDescription> collection = collectionRef.get();
        if (collection != null) {
            Listener<?> listener = listenerRef.get();
            if (listener != null) {
                collection.removeIf(listenerDescription -> listenerDescription.listener().equals(listener));
            }
        }
        listenerRef = null;
        collectionRef = null;
        hook.run();
    }

    @Override
    public boolean isDisposed() {
        return disposed.get();
    }
}

EventDispatcher

AbstractEventDispatcher

/**
 * @author Erzbir
 * @Data: 2024/2/22 00:08
 */
@Slf4j
public abstract class AbstractEventDispatcher implements EventDispatcher {
    protected final List<EventDispatchInterceptor> eventDispatchInterceptors = new ArrayList<>();
    protected final AtomicBoolean activated = new AtomicBoolean(false);

    @Override
    public <E extends Event> void dispatch(E event, EventChannel<E> channel) {
        log.debug("Received event: {}", event);
        DefaultEventContext eventContext = new DefaultEventContext(event);
        if (!intercept(eventContext)) {
            return;
        }
        if (channel.isCanceled()) {
            log.debug("EventChannel: {} is already shutdown, dispatching canceled", channel.getClass().getSimpleName());
            return;
        }
        if (!channel.getListeners().iterator().hasNext()) {
            log.debug("EventChannel: {} has no listener, dispatching canceled", channel.getClass().getSimpleName());
        }
        dispatchTo(event, channel);
    }

    protected abstract <E extends Event> void dispatchTo(E event, EventChannel<E> channel);

    private boolean intercept(EventContext eventContext) {
        for (EventDispatchInterceptor eventDispatchInterceptor : eventDispatchInterceptors) {
            if (!eventDispatchInterceptor.intercept(eventContext)) {
                Event event = eventContext.getEvent();
                event.intercepted();
                log.debug("Event : {} was truncated", event);
                return false;
            }
        }
        return true;
    }

    @Override
    public void addInterceptor(EventDispatchInterceptor eventDispatchInterceptor) {
        eventDispatchInterceptors.add(eventDispatchInterceptor);
    }

    @Override
    public void start() {
        activated.set(true);
    }

    @Override
    public boolean isActive() {
        return activated.get();
    }

    @Override
    public boolean isCanceled() {
        return !activated.get();
    }

    @Override
    public void cancel() {
        activated.set(false);
    }
}

轮询 or 通知

core 模块提供了两个默认实现: 基于轮询和基于通知

默认使用基于通知的 EventDispatcher

两种方式各有好处, 基于轮询可以直接通过一个 PriorityBlockingQueue 来实现事件的优先级, 但是效率比较低, 会浪费资源. 而基于通知则更适合需要及时处理事件的场景

基于轮询

PollingEventDispatcher

/**
 * 基于轮询的 {@link EventDispatcher}
 *
 * @author Erzbir
 * @Data: 2024/2/7 03:46
 */
@Slf4j
public class PollingEventDispatcher extends AbstractEventDispatcher implements EventDispatcher {
    // 同步块的锁, 用于控制线程
    private final Object dispatchLock = new Object();
    // 当前是否暂停
    private volatile boolean suspended = false;
    // 事件缓存队列
    private final PriorityBlockingQueue<Event> eventQueue = new PriorityBlockingQueue<>(10, Comparator.comparingInt(Event::getPriority));

    @Override
    protected <E extends Event> void dispatchTo(E event, EventChannel<E> channel) {
        eventQueue.add(event);
        // 如果有事件要分发, 则恢复线程
        if (suspended) {
            resume();
        }
    }

    @Override
    public void start() {
        if (!activated.compareAndSet(false, true)) {
            return;
        }
        Runnable runnable = () -> {
            while (activated.get() && !Thread.currentThread().isInterrupted()) {
                // 如果队列为空则暂让线程等待
                if (eventQueue.isEmpty()) {
                    suspend();
                    continue;
                }
                Event event = null;
                try {
                    event = eventQueue.take();
                } catch (InterruptedException e) {
                    log.error("Dispatching error: {}", e.getMessage());
                    cancel();
                    Thread.currentThread().interrupt();
                }
                EventChannelDispatcher<Event> channel = EventChannelDispatcher.INSTANCE;
                Thread.ofVirtual()
                        .name("Dispatcher-Sub-Thread")
                        .start(createTask2(channel, event));
            }
        };
        // 主线程结束后程序不结束, 调用 cancel() 后结束
        new Thread(runnable, "Dispatcher-Thread").start();
    }

    private void resume() {
        synchronized (dispatchLock) {
            dispatchLock.notifyAll();
        }
    }

    private void suspend() {
        try {
            synchronized (dispatchLock) {
                suspended = true;
                dispatchLock.wait();
            }
        } catch (InterruptedException e) {
            log.error("Dispatching error: {}", e.getMessage());
            cancel();
            Thread.currentThread().interrupt();
        }
    }

    private Runnable createTask2(EventChannel<Event> channel, Event event) {
        return () -> {
            try {
                if (event instanceof CancelableEvent cancelableEvent) {
                    if (cancelableEvent.isCanceled()) {
                        return;
                    }
                }
                if (!event.isIntercepted()) {
                    log.debug("Dispatching event: {} to channel: {}", event, channel.getClass().getSimpleName());
                    channel.broadcast(new DefaultEventContext(event));
                }
            } catch (Throwable e) {
                log.error("Dispatching to channel: {} error: {}", channel.getClass().getSimpleName(), e.getMessage());
                Thread.currentThread().interrupt();
            }
        };
    }

    @Override
    public void cancel() {
        if (!activated.compareAndSet(true, false)) {
            return;
        }
        eventQueue.clear();
        // 这里需要唤醒等待的线程, 否则线程永远都不会结束
        resume();
    }
}
基于通知

NotificationEventDispatcher

/**
 * 基于通知的 {@link EventDispatcher}
 *
 * @author Erzbir
 * @Data: 2024/2/21 23:45
 */
@Slf4j
public class NotificationEventDispatcher extends AbstractEventDispatcher implements EventDispatcher {

    @Override
    protected <E extends Event> void dispatchTo(E event, EventChannel<E> channel) {
        Thread.ofVirtual()
                .name("Dispatcher-Thread")
                .start(createTask(channel, event));
    }

    private <E extends Event> Runnable createTask(EventChannel<E> channel, E event) {
        return () -> {
            try {
                if (event instanceof CancelableEvent cancelableEvent) {
                    if (cancelableEvent.isCanceled()) {
                        return;
                    }
                }
                log.debug("Dispatching event: {} to channel: {}", event, channel.getClass().getSimpleName());
                channel.broadcast(new DefaultEventContext(event));
            } catch (Throwable e) {
                log.error("Dispatching to channel: {} error: {}", channel.getClass().getSimpleName(), e.getMessage());
                Thread.currentThread().interrupt();
            }
        };
    }
}

测试

首先实现一个 Event

TestEvent

/**
 * @author Erzbir
 * @Data: 2024/2/14 02:19
 */
public class TestEvent extends AbstractEvent implements Event {
    public TestEvent(Object source) {
        super(source);
    }

    @Override
    public Object getSource() {
        return source;
    }
}

NamedEvent

/**
 * @author Erzbir
 * @Data: 2024/2/22 02:32
 */
public interface NamedEvent extends Event {
    String getName();
}

class TestNamedEvent extends AbstractEvent implements NamedEvent {
    private final String name;

    public TestNamedEvent(Object source, String name) {
        super(source);
        this.name = name;
    }

    @Override
    public Object getSource() {
        return null;
    }

    @Override
    public String getName() {
        return name;
    }
}

直接在 core 模块中测试

/**
 * @author Erzbir
 * @Data: 2024/2/20 17:07
 */
@Slf4j
public class Test {
    public static void main(String[] args) throws InterruptedException {
        EventDispatcher eventDispatcher = new NotificationEventDispatcher();
        
        // 过滤出一个 `NamedEvent` 的 channel
        EventChannel<NamedEvent> eventEventChannel = EventChannelDispatcher.INSTANCE.filterInstance(NamedEvent.class);
        
        // 开启事件调度, 实际上只有基于轮询的调度器才有用
        eventDispatcher.start();
        
        // 注册一个 `NamedEvent` 事件的 `Listener`
         eventEventChannel.subscribe(NamedEvent.class, event -> {
            log.info("Name: {}", event.getName());
            return StandardListenerResult.CONTINUE;
        });
        
        // 按条件过滤出一个 channel
        EventChannel<Event> filter = EventChannelDispatcher.INSTANCE.filter(event -> event instanceof TestEvent);
        filter.subscribe(Event.class, event -> {
            if (event instanceof TestEvent) {
                log.info(((TestEvent) event).getStr());
            }
            return StandardListenerResult.CONTINUE;
        });
        
        
        EventChannelDispatcher.INSTANCE.subscribe(TestNamedEvent.class, event -> {
            log.info("this is a TestNamedEvent");
            return StandardListenerResult.CONTINUE;
        });
        
        EventChannelDispatcher.INSTANCE.subscribe(Event.class, event -> {
            log.info("this is an Event");
            return StandardListenerResult.CONTINUE;
        });
        
        eventDispatcher.dispatch(new TestNamedEvent("this", "test1"));
        eventDispatcher.dispatch(new TestNamedEvent("this", "test2"));
        eventDispatcher.dispatch(new TestNamedEvent("this", "test3"), eventEventChannel);
        eventDispatcher.dispatch(new TestNamedEvent("this", "test4"));
        eventDispatcher.dispatch(new TestEvent("this"));
        eventDispatcher.dispatch(new TestNamedEvent("this", "test5"), eventEventChannel);
        
        Thread.sleep(2000);
        
        // 取消后程序退出
        eventDispatcher.cancel();
    }
}

输出结果:

Name: test4
this is an Event
this is a TestNamedEvent
Name: test2
test
Name: test3
Name: test1
Name: test5
this is a TestNamedEvent
this is a TestNamedEvent
this is a TestNamedEvent
this is an Event
this is an Event
this is an Event
this is an Event
this is a TestNamedEvent
this is an Event