@Reactivejs - will you take a ride on the bus?

Ben Mizrahi
4 min readOct 28, 2021

What is @reactivejs ? reactivejs provides an event bus implementation for communication between NodeJs micro-services. The implementation contains simple but powerful technique that can allow you to communicate between your services with Kafka based communication layer. Another benefit it allows you as a developer to follow domain-driven-design while using event-driven-architecture with simple and useful API. If you will follow the pattern it will enable you’r services to grow horizontally and improve performance and scalability of the hole system.

The three most powerful points that this package useful for:

  1. React to event/batch of events (batch/streaming).
  2. Produce async event to between domains.
  3. Produce a sync (non-blocking) message and wait for response.

Source code can be found here: https://github.com/providersjs/reactivejs
Dependencies: KafkaJs, Redis, TypeScript

@EventBus — the one that consolidate it all

The EventBus instance is the single point of communication between the bus and the client — the bus instance contains serval entries that allows you to communicate between services. The main idea that you have to keep in mind is that every service should have a domain and the service should react to events on the domain and respond with an answer or produce the next event on the chain needed — a full example you can find under the example section in the Github repository

@ReactAction —decorator of action definition

Beside the event bus instance, the package contains a decorator that allows you (as a client) to bind a function to specific events in the domain — the idea is very similar to Routes on HTTP packages — you define a “route” and bind it into a function that handles that — the decorator itself will inject the message to your function JSON data provided in the message and other identifiers and methods for your usage (like idempotent protections).

Two-way, Async communication

Working with event bus and producing messages it’s easy! — but what you will do when you’r code need to get some information from another service before progressing with the operation needed? let’s take a look :

class OrderService {
getOrderDetails = async (orderId) => {
const results = dbConnection.getOrderDetails(orderId);
return results;
}
}
class ShippingService {
handleShipment = async (orderId) => {
const orderDetails = ?
await externalAPI(orderDetails)
}
}

In this example we have 2 services (described here as classes) — remember that each service is isolated and cannot call to each other directly (so don’t get confused here it’s just an example) — well the next question is how can we get the order details between services ? this is two-way communication ! the method on handleShipment needs to get the results back from the OrderService and then process the shipment — of course we can create a route and call between services via HTTP but we want to have one communication layer — the event bus. So let’s see how we transfer this code to be more reactive and event-driven and get the desired state:

class OrderService {@ActionReact({ action: 'get', domain: 'orders' })
getOrderDetails = async (message:IEventBusMessage) => {
const { orderId } = message.payload
const results = dbConnection.getOrderDetails(id);
return results;
}
}
class ShippingService {
handleShipment = async (orderId) => {
const orderDetails = await EventBus.getAsync('orders', 'get', { payload: { orderId } })
await externalAPI(orderDetails)
}
}

In the example above we made our method reactable to events — means that every-time a get action will be publish on the order domain — getOrderDetails method will be execute — the shipment service only have to call the EventBus and wait for the promise to be resolved.

The implementation under this “magic” is by using Redis PubSub to get results back to the specific service, in the GitHub repo there is a diagram that describes exactly how it happens:

  1. The getAsync method calling the event-bus with the event need to be triggered and params object.
  2. The event-bus returns a promise and creates a uuid for this message.
  3. The event-bus stores this unique key in redis and listen to events on the keys channel with redis subscribe method.
  4. The event-bus publish the event to the target domain and action.
  5. The event-bus on the target domain handles the message and return a value to the event-bus instance (or with an exception that also handled by the package).
  6. The result then stored on the unique key in Redis.
  7. The modification of the key triggers the subscriber event.
  8. The subscriber parse the message results and pass it the the callback function to resolve the promise created at stage 2
  9. The event-bus deletes the keys and removes the subscription on the channel.

This technique gives us the ability to responded to callback action on every replica of the service if needed.

for more examples, Issues, PR’s please go to the Github repository and feel free to contribute:

https://github.com/providersjs/reactivejs

--

--