In my previous article, How to think reactively and animate moving objects using RxJs, I described how to build a MobileObject class that simulates the movement of an object subject to accelerations imposed on it by an external controller.
Now I want to show you a simple distributed system that allows a Controller app to remotely control the movement of a MobileObject. A second remote app, the Monitor, shows the movement of the object on a two-dimensional plan. At the center of the system lays a MobileObjectServer, which is the place where the MobileObjects live.
The goal of this article is to explain how Reactive thinking can progressively produce a design which maps the requirements very naturally and produces a neat solution. We will end up solving the problem subscribing to just ONE Observable.
We’ll focus on the server part, which is the most intriguing from this standpoint.
For the implementation, we’ll use RxJs and TypeScript. The server runs on Node. All the components communicate using Web-Sockets.
The full code base, comprised of the Server Controller and Monitor, can be found here.
Schema of the distributed system
The logical schema of the distributed system is represented in the following diagram:

At the center lays the MobileObjectServer where the instances of the MobileObjets run. Each MobileObject is controlled by its Controller, that is a Web app through which we can issue commands (like accelerate, brake) to the MobileObject. The movement of all MobileObjects can be seen on one or more Monitors. Each Monitor is again a Web app.
The following diagram shows a sample interaction flow between one Controller, one Monitor, and the MobileObjectServer.

The Server Requirements in terms of events
We can express the requirements for the server part of our distributed system in terms of events:
- Event1 — when a Controller connects => create a MobileObject
- Event2 — when a Controller receives a command => forward the command to the MobileObject controlled by the Controller
- Event3 — when a Controller disconnects => delete the MobileObject controlled by the Controller
- Event4 — when a Monitor connects => start sending dynamics data of all running MobileObjects to the newly connected Monitor
- Event5 — when a MobileObject is added => start sending its dynamics data to all the Monitors connected
- Event6 — when a Monitor disconnects => stop sending the streams of dynamics data for all MobileObjects to that Monitor
Reactive thinking will produce a design which naturally maps the requirements expressed in this way.
The elements composing the server
The server component of the distributed application is made up of two main elements:
- the MobileObject class, which implements the dynamic movement logic using RxJs Observables — this has been described in detail here
- the MobileObjectServer class, which manages the web-socket protocol, receiving commands from the Controller and sending out to the Monitors all information about the dynamics of MobileObject. This implementation has been inspired by this article from Luis Aviles.
MobileObject APIs
Let’s have a brief overview of the MobileObject class — all details can be found here while the code can be found in this repository.
The MobileObject offers two families of APIs.
The first one is the set of methods through which an external Controller can issue commands that affect the dynamics of the object (for example, accelerate, brake).
The second are streams of readonly data which communicate to external clients, the Monitors, the relevant data about the dynamic behaviour of the object (that is, its position and velocity over time).

In order to move an instance of a MobileObject, a Controller has to turn it on (with the turnOn()
method), apply the desired acceleration (with the methods accelerateX(acc: number)
and accelerateY(acc: number)
), and then maybe brake (with the method brake()
).
When a Monitor connects to the MobileObjectServer, the MobileObjectServer subscribes to the dynamicsObs
and the observable of the MobileObjects running in the server. It then starts sending the data related to their movement to the connected Monitors.
For the purpose of this article, this is all you need to know about the MobileObject.
Sockets as Observables
The MobileObjectServer starts doing something when a client, either a Controller or a Monitor, opens a websocket connection. Over the course of time, the MobileObjectServer can receive many requests to open a connection from many clients.

This looks like an Observable of sockets. This is how to obtain it using the socket.io
library:
import { Server } from 'http';
import { Observable } from 'rxjs';
import { Observer } from 'rxjs';
import * as socketIoServer from 'socket.io';
import {SocketObs} from './socket-obs';
export function sockets(httpServer: Server, port) {
httpServer.listen(port, () => {
console.log('Running server on port %s', port);
});
return new Observable<SocketObs>(
(subscriber: Observer<SocketObs>) => {
socketIoServer(httpServer).on('connect',
socket => {
console.log('client connected');
subscriber.next(new SocketObs(socket));
}
);
}
);
}
Via the function sockets
, we create an Observable of SocketObs
(we will see the implementation of this class later). Any time the websocket server receives a connect request and creates a new socket, the Observable returned by this function emits an instance of SocketObs
which wraps the socket just created.
Messages over sockets as Observables
Sockets can be used to send messages from the client to the server and vice versa. With the socket.io
library, we can send messages using the emit
method.
SocketIO.Socket.emit(event: string, …args: any[]): SocketIO.Socket
The parameter event
can be seen as an identifier of the type of message we want to send. The …args
parameters can be used to send data specific to a single message.
Whoever is interested in a certain type of message (or event, to use the socket.io
terminology) can start listening on the socket using the method on
.
SocketIO.Emitter.on(event: string, fn: Function): SocketIO.Emitter

Again, the sequences of messages received by the Receiver look like Observables. This is how we can create Observables that actually emit any time a message of a certain type is received.
The onMessageType
method is the one that does the trick. It returns an Observable, which emits any time a message of type messageType
is received.
import { Observable, Observer } from 'rxjs';
export class SocketObs {
constructor(private socket: SocketIO.Socket) {}
onMessageType(messageType): Observable<any> {
return new Observable<any>((observer: Observer<any>) => {
this.socket.on(messageType, data => observer.next(data));
});
}
}
In this way, sockets events, or messages as we call them here, have been transformed into Observables. These are going to be the foundations of our design.
Determine the nature of the Client
There are two types of clients which can connect with the MobileObjectServer. One is the Controller and one is the Monitor. The MobileObjectServer first needs to determine which type of client it is going to deal with on a specific socket.
The way we have chosen to implement such logic is to have the Controller and the Monitor send different message types as their first message.
- Controller sends a message of type BIND_CONTROLLER
- Monitor sends a message of type BIND_MONITOR
Depending on the type of the first message received on a socket, the MobileObjectServer is able to identify whether it is communicating with a Controller or a Monitor.

As soon as a socket is created, the MobileObjectServer has to start listening to both types of messages, BIND_CONTROLLER and BIND_MONITOR. The first to occur will win. It is a race
between the two Observables which map the two different types of messages.
Such logic has to be repeated any time a new socket is created, that is any time the Observable returned by the function sockets
emits. Therefore, we need to merge all the events that win the race. We need to use the mergeMap
operator, which merges all the events raised by the Observables involved, and flatten the results into a new Observable (mergeMap
was formerly know as flatMap
).

The code to obtain this result is the following:
startSocketServer(httpServer: Server) {
sockets(httpServer, this.port).pipe(
mergeMap(socket =>
race(
socket.onMessageType(MessageType.BIND_MONITOR),
socket.onMessageType(MessageType.BIND_CONTROLLER)
)
)
)
.subscribe();
}
Now that we know how to differentiate Controllers and Monitors, we can focus on what to do in these two cases.
Events relevant for a Monitor
A Monitor shows the movement of all MobileObjects which are running on the MobileObjectServer. So the MobileObjectServer has to send the right information to the monitors at the right times. Let’s see first what those times are, that is which are the relevant events that the MobileObjectServer has to be aware of in order to fulfill its job.
Adding and removing MobileObjects
The first relevant events are:
- a MobileObject has been added => the MobileObject is shown on the Monitor
- a MobileObject has been removed => the MobileObject is removed from the Monitor
MobileObjects are added or removed over time, so such events can be modeled with two Observables:
- an Observable which emits when a MobileObject is added
- an Observable which emits when a MobileObject is removed
Once a Monitor is connected, the MobileObjectServer starts being interested in both of those Observables, so it has to merge
them:

Similar to what we have seen before, we need to repeat such logic any time a Monitor is added. Therefore we need to mergeMap
all the Observables which are the result of the merge
of the ‘mobile object added’ Observable with the ‘mobile object removed’ Observable.

This is the code to obtain an Observable which emits any time a MobileObject has to be added to or removed from every Monitor:
import {sockets} from './socket-io-observable';
import {SocketObs} from './socket-obs';
class MobileObjectServer {
private mobileObjectAdded = new Subject<{mobObj: MobileObject, mobObjId: string}>();
private mobileObjectRemoved = new Subject<string>();
startSocketServer(httpServer: Server) {
sockets(httpServer, this.port).pipe(
mergeMap(socket =>
race(
socket.onMessageType(MessageType.BIND_MONITOR)
.pipe(
map(() => (socketObs: SocketObs) => this.handleMonitorObs(socketObs))
),
socket.onMessageType(MessageType.BIND_CONTROLLER)
// something will be added here soon to make this logic work
)
.pipe(
mergeMap(handler => handler(socket))
)
)
)
.subscribe();
}
handleMonitorObs(socket: SocketObs) {
const mobObjAdded = this.mobileObjectAdded;
const mobObjRemoved = this.mobileObjectRemoved;
return merge(mobObjAdded, mobObjRemoved);
}
}
We have introduced a few things with this code which are worth commenting on here.
We have created the MobileObjectServer
class, which will be the place where we will code all our server logic from now on.
The method handleMonitorsObs
, which we are going to enrich later on, returns simply the merge
of two Observables, mobileObjectAdded
and mobileObjectRemoved
, which are Subjects. This is the “inner” merge
shown in the picture above.
Subjects are Observables, and therefore can be merged as we do here. But Subjects are also Observers, so we can emit events through them. As we will see later in the code, there will be a time when we will use these Subjects to emit the events their names suggest.
The last point is related to the code we have added in the startSocketServer method:
race(
socket.onMessageType(MessageType.BIND_MONITOR)
.pipe(
map(() => (sObs: SocketObs) => this.handleMonitorObs(sObs))
),
socket.onMessageType(MessageType.BIND_CONTROLLER)
// something will be added here soon to make this logic work
)
.pipe(
mergeMap(handler => handler(socket))
)
This is basically a way to say: any time a BIND_MONITOR message is received, return the function
(socketObs: SocketObs) => this.handleMonitorObs(socketObs)
which will be executed within the mergeMap
operator piped into the result of the race
function. This mergeMap
operator is the external mergeMap
shown in the picture above.
Another way to read the code is the following: any event corresponding to a message of type BIND_MONITOR gets transformed by the logic of
mergeMap(() => this.handleMonitorObs(socket))
where socket
is the instance of type SocketsObs
emitted by the race
function.
Soon we will add something similar for the BIND_CONTROLLER case to make this whole logic work.
Handle MobileObject dynamics Observables
Let’s consider one Monitor which connects to the MobileObjectServer. After the connection, a couple of MobileObjects are added to the MobileObjectServer.

Now for each MobileObject, we have to start considering the dynamics Observables they offer as part of their APIs. These Observables emit, at regular intervals of time, data about the dynamics (position and velocity) of the MobileObject. If mobileObject
stores a reference to a MobileObject, we can obtain its dynamics Observable via mobileObject.dynamicsObs
(see MobileObject APIs).
First we have to transform each event representing the fact that a MobileObject has been added into the series of events emitted by its dynamicsObs
. Then we mergeMap
all these series into a new single Observable which emits all dynamic events for all MobileObjects which are added.

Then we apply all this jazz to all the Monitors which connect to the MobileObjectServer. So we end up with a new Observable which emits dynamics data for all Monitors and all MobileObjects (plus all events related to the fact that a MobileObject has been removed).

Per each time interval, we have groups of four events related to the emission of data about the dynamics of our MobileObjects. Why? This makes sense if we think that we have two Monitors and two MobileObjects. Each MobileObject has to send its dynamics data to each Monitor per every time interval. Therefore it is correct to see four events per each time interval.
Once this is clear, the code is very simple:
import {sockets} from './socket-io-observable';
import {SocketObs} from './socket-obs';
class MobileObjectServer {
private mobileObjectAdded = new Subject<{mobObj: MobileObject, mobObjId: string}>();
private mobileObjectRemoved = new Subject<string>();
startSocketServer(httpServer: Server) {
sockets(httpServer, this.port).pipe(
mergeMap(socket =>
race(
socket.onMessageType(MessageType.BIND_MONITOR)
.pipe(
map(() => (socketObs: SocketObs) => this.handleMonitorObs(socketObs))
),
socket.onMessageType(MessageType.BIND_CONTROLLER)
// something will be added here soon to make this logic work
)
.pipe(
mergeMap(handler => handler(socket))
)
)
)
.subscribe();
}
handleMonitorObs(socket: SocketObs) {
const mobObjAdded = this.mobileObjectAdded
.pipe(
mergeMap(data => data.mobileObject.dynamicsObs)
);
const mobObjRemoved = this.mobileObjectRemoved;
return merge(mobObjAdded, mobObjRemoved);
}
}
We have just introduced one simple change. We changed the handleMonitorObs
method to add the mergeMap
operator. This transforms the mobileObjectAdded
Observable so that the new Observable emits the dynamics data we are looking for.
The rest has remained untouched.
Summary so far
What have we done so far? We have just transformed Observables to obtain new Observables which emit all the events MobileObjectServer is interested in when it has to deal with a Monitor. Nothing else.

You can see how these transformations are reflected in the code in the following image:

The only thing we need to do now is to add the desired side effects to the relevant events. This will eventually allow us to achieve what we want, that is to communicate to the Monitor the right information at the right time.
But before moving to side effects, let’s cover what MobileObjectServer needs to do when interacting with a Controller, the other client in our distributed system.
Events relevant for a Controller
When a Controller connects to the MobileObjectServer there are fewer things that the server needs to care about. At least there are fewer nested relevant events happening.
The things that the MobileObjectServer needs to care about are:
- A Controller has connected, which in our simple logic means that we have to create a brand new MobileObject
- The Controller has sent commands for its MobileObject
- The Controller has disconnected. In our implementation, this means that we somehow have to delete the MobileObject controlled by the Controller (we have a 1 to 1 relationship between MobileObject and its Controller)
We already know the first event: it is the one emitted by the Observable returned by socket.onMessageType(BIND_CONTROLLER)
.
Commands are sent by the Controller to the MobileObjectServer in the form of messages. So we can create an Observable of commands received over a certain socket (received from a certain Controller) since each Controller has its own socket. We do this by simply using the onMessageType
method of SocketObs
socket.onMessageType(CONTROLLER_COMMAND)
SocketObs
also offers a method, onDisconnect
, which returns an Observable that emits when the socket is disconnected. This is what we need in order to deal with the third event.

Since we are dealing with more than one Controller potentially connecting to the MobileObjectServer, it should not surprise you to learn that we need to mergeMap
the result of the merge
. This is the same type of transformation we have already done a few times.

The code should be no surprise as well.
startSocketServer(httpServer: Server) {
sockets(httpServer, this.port).pipe(
mergeMap(socket =>
race(
socket.onMessageType(MessageType.BIND_MONITOR)
.pipe(
map(() => (socketObs: SocketObs) => this.handleMonitorObs(socketObs))
),
socket.onMessageType(MessageType.BIND_CONTROLLER)
.pipe(
map(() => (socketObs: SocketObs) => this.handleControllerObs(socketObs))
),
)
.pipe(
mergeMap(handler => handler(socket))
)
)
)
.subscribe();
}
handleMonitorObs(socket: SocketObs) {
const mobObjAdded = this.mobileObjectAdded
.pipe(
mergeMap(data => data.mobileObject.dynamicsObs)
);
const mobObjRemoved = this.mobileObjectRemoved;
return merge(mobObjAdded, mobObjRemoved);
}
handleControllerObs(socket: SocketObs) {
const commands = socket.onMessageType(MessageType.CONTROLLER_COMMAND);
const disconnect = socket.onDisconnect();
return merge(commands, disconnect);
}
We have simply added an handleControllerObs
method that deals with commands received and the disconnect of a Controller. We apply the mergeMap transformation to it as we have already done with handleMonitorObs
.
Summary of the transformations applied to Controllers
The following diagram illustrates all transformations we have applied starting from the Observable that emits when a Controller connects.

The Final Observable
If we put together the transformations we have done for both the Monitors and the Controllers, what we obtain is the following final Observable.

Just by subscribing to this one final Observable, the whole tree of events gets unfolded.
Side effects
The beautiful tree of events we have created by subscribing to the Final Observable does not do anything. But it does a good job of mapping the Events we identified while describing the requirements of the Server at the beginning of this article.
Basically it tells us clearly when we have to do something.
This something is what we call a side effect.

When a Controller connects and disconnects, we respectively create or delete a MobileObject. As side effect of these actions is that we raise “MobileObject added” and “MobileObject deleted” events using the mobileObjectAdded
and mobileObjectRemoved
Subjects we introduces some paragraphs ago.
How to implement side effects
In RxJs there are different ways to implement side effects.
Observers is one. We can add Observers while we subscribe
using the tap
operator (formerly know as do
).
Another way is to inject them in any function we pass to any RxJs operator.
We are mainly going to use tap
, since it allows us to place side effects throughout the entire tree of events. But we are also going to place side effects directly inside functions we pass to RxJs operators.
The only place we do not put side effects is subscribe
. The reason is that, given how we built it, the Final Observer emits many different types of events. Therefore subscribe
, which works the same for all events, is not the right place to put behavior which depends on certain types of events.
Hopefully at this point the code sort of speaks for itself.

Last but not least: completion of Observables
There is one thing that we still need to do to complete our design: stop the streams of events, or complete the Observables, when either a Controller or a Monitor disconnects.
When a Controller disconnects
When a Controller disconnects, we delete the MobileObject it controls. As part of the deletion, it is important to make sure that the MobileObjectServer stops sending dynamics data related to this MobileObject to the connected Monitors. This means that we must complete the following Observable:
mobObjInfo.mobObj.dynamicsObs
.pipe(
tap(dynamics => socket.send(MessageType.DYNAMICS_INFO, dynamics)),
)
We can easily achieve this just using the takeUntil
operator together with the mobileObjectRemoved
Observable we already know:
mobObjInfo.mobObj.dynamicsObs
.pipe(
tap(dynamics => socket.send(MessageType.DYNAMICS_INFO, dynamics)),
takeUntil(this.mobileObjectRemoved.pipe(
filter(id => id === mobObjInfo.mobObjId)
))
)
takeUntil
ensures that an Observable completes when the Observable passed as a parameter to takeUntil
emits.
mobileObjectRemoved
emits every time a MobileObject is removed. What we want, though, is to stop sending dynamics info when a specific MobileObject, identified by its id, is removed. So we add the filter
logic.
When a Monitor disconnects
In this case, we can also use takeUntil.
We know when a Monitor disconnects because the socket
, of type SocketObs
, associated to it emits via the socket.onDisconnect()
Observable. So what we need to do is stop sending dynamics info when socket.onDisconnect()
emits.
So the final logic to govern the completion of the Observable is
mobObjInfo.mobObj.dynamicsObs
.pipe(
tap(dynamics => socket.send(MessageType.DYNAMICS_INFO, dynamics)),
takeUntil(this.stopSendDynamics(socket, mobObjInfo.mobObjId))
)
where
private stopSendDynamics(socket: SocketObs, mobObjId: string){
return merge(
this.mobileObjectRemoved.pipe(
filter(id => id === mobObjId)
),
socket.onDisconnect()
);
}
And this is how the core of the code implementing our logic looks:
import {sockets} from './socket-io-observable';
import {SocketObs} from './socket-obs';
class MobileObjectServer {
private mobileObjectAdded = new Subject<{mobObj: MobileObject, mobObjId: string}>();
private mobileObjectRemoved = new Subject<string>();
public startSocketServer(httpServer: Server) {
sockets(httpServer, this.port).pipe(
mergeMap(socket =>
race(
socket.onMessageType(MessageType.BIND_MONITOR)
.pipe(
map(() => (socketObs: SocketObs) => this.handleMonitorObs(socketObs))
),
socket.onMessageType(MessageType.BIND_CONTROLLER)
.pipe(
map(() => (socketObs: SocketObs) => this.handleControllerObs(socketObs))
),
)
.pipe(
mergeMap(handler => handler(socket))
)
)
)
.subscribe();
}
private handleMonitorObs(socket: SocketObs) {
const mobObjAdded = this.mobileObjectAdded
.pipe(
tap(mobObjInfo => socket.send(MessageType.MOBILE_OBJECT, mobObjInfo.mobObjId)),
mergeMap(mobObjInfo => mobObjInfo.mobObj.dynamicsObs
.pipe(
tap(dynamics => socket.send(MessageType.DYNAMICS_INFO, dynamics)),
takeUntil(this.stopSendDynamicsInfo(socket, mobObjInfo.mobObjId))
)
)
);
const mobObjRemoved = this.mobileObjectRemoved
.pipe(
tap(mobObjId => socket.send(MessageType.MOBILE_OBJECT_REMOVED, mobObjId)),
);
return merge(mobObjAdded, mobObjRemoved);
}
private handleControllerObs(socket: SocketObs) {
const {mobObj, mobObjId} = this.newMobileObject();
this.mobileObjectAdded.next({mobObj, mobObjId});
const commands = socket.onMessageType(MessageType.CONTROLLER_COMMAND)
.pipe(
tap(command => this.execute(command, mobObj))
);
const disconnect = socket.onDisconnect()
.pipe(
tap(() => this.mobileObjectRemoved.next(mobObjId)),
);
return merge(commands, disconnect);
}
private stopSendDynamicsInfo(socket: SocketObs, mobObjId: string) {
return merge(this.mobileObjectRemoved.pipe(filter(id => id === mobObjId)), socket.onDisconnect());
}
}
Conclusion
It has been a pretty long journey. We have seen some reasoning driven by Reactive Thinking and some implementations of this reasoning.
We started transforming WebSockets events into Observables. Then, applying incremental transformations, we ended up creating a single Observable that, once subscribed, unfolds all the events we are interested in.
At this point, adding the side effects that allow us to achieve our goal has been straightforward.
This mental process of design, which is incremental in itself, is the meaning I give to “Reactive Thinking”.
The full code base, comprising Server Controller and Monitor, can be found here.