Let’s say your web-application communicates with a 3rd party service and this service is somewhat fragile. Either because of its fragility – or for some other reason – under no circumstances do you want your application to bombard this service with concurrent requests. Whenever a component needs to make a request to this service, and there is already another request waiting for a response, the new request should wait until the previous request completes.
Normally, in a web-application that uses RxJS (e.g. a modern Angular application), asynchronous requests are represented with instances of Observable
. It’s a “single use” Observable
that emits a single value—the response—before it completes. Once the Observable
is built, the actual request is sent only when the Observable
is subscribed, and this is where we need to introduce the new logic: the client code should not care about whether there are any active requests. It should be able to simply create and service request Observable
and subscribe to it. However, if an active request against the service exists, sending the request to the service should not execute immediately but be delayed until the other request completes. All of that should happen transparently to the client code. How can it be done?
To implement the above logic with RxJS we can model the process that happens in various places, such as a DMV office, where people have to wait for their turn to go to a window and talk to the person behind it. At the entrance, every visitor gets a ticket with sequential number from a ticket dispensing machine. Then there is a display in the waiting room where the next number is shown. The person whose ticket number matches the number on the display can proceed to the window and be served. Once served, the clerk behind the window pushes the button and the display starts showing the next ticket number. This is exactly what we need, so let’s model it using RxJS:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
import { Observable, Subject, of } from 'rxjs'; import { switchMapTo, finalize, map, shareReplay, delayWhen, filter, startWith } from 'rxjs/operators'; class MyService { // the subject, to which we will be sending next ticket number that can be served // this models the button the clerk pushes to show the next number on the display private readonly _serveNextTicket = new Subject<void>(); // the observable that multicasts the current ticket number that can be served // this models the display that shows the next ticket number private readonly _currentTicket: Observable<number>; // the ticket number for the next request // this models the ticket dispenser private _nextTicket = 0; constructor() { // let’s construct our “display” and attach the “button” to it this._currentTicket = this._serveNextTicket.pipe( startWith(0), // start with “showing” ticket number 0 map((_, i) => i), // count “button” events, increment ticket number shareReplay(1) // multicast it for subscribers ); } callService(): Observable<Response> { return this._queueUp( // here we create the call to the service and return it as an observable // ... ); } private _queueUp<T>(call: Observable<T>): Observable<T> { // get the ticket at the “dispenser” return of(this._nextTicket++).pipe( // delay until the “display” shows our ticket number delayWhen(ticket => this._currentTicket.pipe( filter(currentTicket => currentTicket === ticket) )), // proceed with our service call switchMapTo(call.pipe( // push the “button” to “display” next ticket finalize(() => { this._serveNextTicket.next(); }) )) ); } } |
Now we can call the callService()
from anywhere in any order and always be sure that all requests to the service are sent sequentially, the next one only starts once the previous one completes.
Here’s a question for the RxJS experts out there: Is there a better way? (And I’m sure there is — such is the world of RxJS, isn’t it?)