Implementing an Event Loop in Java for Fun and Profit by@raffaeleflorio

Implementing an Event Loop in Java for Fun and Profit

An event loop waits and reacts to events. It's so simple, but so powerful. It always fascinated me. Here I present a multithreaded Java implementation inspired by a real world scenario. Specifically I used it to wait and react to messages coming from different Azure queues.
image
Raffaele Florio HackerNoon profile picture

Raffaele Florio

πŸͺ Abstractions explorer in love with object-oriented programming

github social iconlinkedin social icontwitter social icon


The event loop pattern always fascinated me. I found it interesting, useful, and compatible with object-oriented programming. A lot of us know it thanks to Node.js.


For some reason, recently, I have been looking to implement it and experiment with it. And I finally got the opportunity to do so at work.

Event Loop Concepts

An event loop doesn’t do too much. We can ask it to start and stop itself. And once started it waits and reacts to events. The events could signal a new message in a queue; an expired timeout and so on. Events trigger an action or a script once they occur.


With this in mind, we can already define these interfaces. They’re evoking the aforesaid behaviors:


interface EventLoop {
  void start();

  void stop();
}

interface Events {
  Optional<Event> next();
}

interface Event {
  void trigger(Script script);
}

interface Script {
  void run(JsonObject properties, Consumer<Instant> onSuccess, Consumer<Throwable> onFailure);
}


The Script interface deserves more attention. We can ask it to run with given properties and with two callbacks. One for success and one for failure. The first one sends the script end time. The second one sends the exception that has occured.

Simple event loop

The simplest event loop that comes to mind uses busy waiting:


final class BusyWaitingEventLoop implements EventLoop {
  private final Events events;
  private final Script script;
  private final AtomicBoolean alive;

  BusyWaitingEventLoop(final Events events, final Script script) {
    this(events, script, new AtomicBoolean(true));
  }
  
  BusyWaitingEventLoop(final Events events, final Script script, final AtomicBoolean alive) {
    this.events = events;
    this.script = script;
    this.alive = alive;
  }

  @Override
  public void start() {
    while (alive.get()) {
      events.next().ifPresent(event -> event.trigger(script));
    }
  }

  @Override
  public void stop() {
    alive.set(false);
  }
}


Once started it checks repeatedly for a new event. Once the event is found, it’ll trigger the script according to its own rules.


But this approach has a major drawback: the event handling blocks the event loop. This means that we can handle only one event at a time.

Multithreaded Event Loop

An approach to resolve the aforesaid issue is to start the event loop two or more times. Each time in a different thread.


So, here is a potential multithreaded event loop implementation exploiting the ExecutorService:

The ExecutorService responsibility is to run tasks. It can run them in different threads or in a single thread. Java provides both implementations thanks to a bunch of factory methods.


final class MultithreadedEventLoop implements EventLoop {
  private final EventLoop origin;
  private final Integer nThreads;
  private final ExecutorService executorService;

  MultithreadedEventLoop(final EventLoop origin, final Integer nThreads, final ExecutorService executorService) {
    this.origin = origin;
    this.nThreads = nThreads;
    this.executorService = executorService;
  }

  @Override
  public void start() {
    for (var i = 0; i < nThreads; i++) {
      executorService.execute(origin::start);
    }
  }

  @Override
  public void stop() {
    origin.stop();
    shutdownExecutorService();
  }

  private void shutdownExecutorService() {
    // Java specific code
  }
}

We can start a MultithreadedEventLoop with eight threads in this way:

public final class Main {
  public static void main(String[] args) {
    var nThreads = 8;
    new MultithreadedEventLoop(
      new BusyWaitingEventLoop(
        anEventsObject,
        aScriptObject
      ),
      nThreads,
      Executors.newFixedThreadPool(nThreads)
    );
  }
}


Now we can handle eight events in parallels. An improvement, but we can do it even better once we have resolved the main issue: the event loop and the script running in the same thread.

Asynchronous Script

The solution to the aforesaid issue is to run scripts and event loops in different threads. In this way, we can have threads dedicated to the scripts and threads dedicated to the event loop.


We already have a multithreaded event loop but we are missing the other piece: an asynchronous script.


Here is an implementation that exploits the ExecutorService again:


final class AsyncScript implements Script {
  private final Script origin;
  private final ExecutorService executorService;

  AsyncScript(final Script origin, final ExecutorService executorService) {
    this.origin = origin;
    this.executorService = executorService;
  }

  @Override
  public void run(final JsonObject properties, final Consumer<Instant> onSuccess, final Consumer<Throwable> onFailure) {
    if (!executorService.isShutdown()) {
      executorService.execute(() -> origin.run(properties, onSuccess, onFailure));
    }
  }
}


AsyncScript is a Script decorator. Using this, we can turn a synchronous Script to an asynchronous one.


Indeed, we can easily integrate AsyncScript in the previous example:


public final class Main {
  public static void main(String[] args) {
    var eventLoopThreads = 2;
    var scriptThreads = 14;
    var executorService = Executors.newFixedThreadPool(eventLoopThreads + scriptThreads);
    new MultithreadedEventLoop(
      new BusyWaitingEventLoop(
        anEventsObject,
        new AsyncScript(
          aScriptObject,
          executorService
        )
      ),
      eventLoopThreads,
      executorService
    ).start();
  }
}


Now we have an ExecutorService with sixteen threads. Two threads were assigned to the event loop, with fourteen available to the scripts. So the event loop will receive new events in two threads. While the scripts will run in others fourteen threads.

Java Tip

A possible improvement specific to Java is to use generics. Something like:


interface EventLoop {
  void start();

  void stop();
}

interface Events<I, O> {
  Optional<Event<I, O>> next();
}

interface Event<I, O> {
  void trigger(Script<I, O> script);
}

interface Script<I, O> {
  void run(I in, Consumer<O> onSuccess, Consumer<Throwable> onFailure);
}


In this way we can handle generic input/output in the Script.

Real-World Scenario

While the AggregateEvents aggregates Events objects, in my case, it aggregates multiple Azure Queues Events that emit AzureQueueMessage. The latter triggers the scripts with the message body as properties. And on completion they remove themselves from the queue.

Conclusion

We can improve the presented implementation in various ways. For example, we can add more capabilities to the event loop, such as restarting. But, as always, keep in mind the modeled domain. Furthermore there could be more opportunities to catch. For example I can guess a prioritised Events implementation, or I can guess event loops with different ExecutorService implementations. In this way, we could have finer-grained control over threads or priorities.

Currently, I’m investigating a broader and more consistent way to apply this pattern.


So, stay tuned and happy looping to you all!

react to story with heart
react to story with light
react to story with boat
react to story with money
L O A D I N G
. . . comments & more!