One of the first challenges you’ll face when working with RxJS Observables is getting them to execute in a specific order. On the surface, this seems like a straightforward goal, but, when it comes to asynchronous processes, nothing is ever simple. Even if observables are executed in succession, they won’t necessarily emit values in the same order. Hence, “order” can apply to either invocation or subscription.
To complicate matters further, you will usually want to pass on emitted values to the next observable in the chain as well as to subscribers. Rest assured, RxJS provides the perfect operator for every occasion; the question is which one(s) to use for the task at hand. Today’s blog will hopefully demystify some tasks for choosing the right operator for chaining observables together.
Before we begin, if you need a refresher on JavaScript Observables – or you missed our first article in this series, check out our RxJS Observables primer.
JavaScript Observables and concat
Sometimes, we want two or more observables to complete in succession. For that, there’s concat. It treats your observables like a checkout line at the local grocery store; the next person (subscription) in line can’t pay for their groceries until the previous customer is done paying (completes)!
Here’s a JavaScript code snippet that displays several messages in succession. The concat operator ensures that messages appear in order, no matter how long each observable takes to complete:
// helper method const delayedMessage = (message: string, delayedTime: number) => EMPTY.pipe(startWith(message), delay(delayedTime)); const concatMessage = 'Concat message '; concat( delayedMessage(concatMessage + 1, 1000), delayedMessage(concatMessage + 2, 3000), delayedMessage(concatMessage + 3, 2000), delayedMessage(concatMessage + 4, 1000), delayedMessage(concatMessage + 5, 4000), // clear the screen delayedMessage('', 2000) ) .subscribe((message: any) => userMessage.innerHTML = message);
Note that, because concat waits for observables to complete before moving on to the next one, should one of them fail to complete, then subsequent observables won’t run!
Collecting Observables with concatAll
In situations where you have an observable source that produces other streams (observables) rather than a collection of streams, you can use concatAll to combine them and sequentially emit all values from every given input stream. Just like concat, once the current active stream completes, concatAll subscribes to the next observable in the sequence. As values from any combined sequence are produced, those values are emitted as part of the resulting sequence. This process is often referred to as flattening.
In the following code, the of and map operators are combined to produce a number of delayed observables. Feeding them to concatAll then merges all emitted values and broadcasts them to subscribers in the order in which they execute:
const randomDelay = (min: number, max: number) => Math.floor( Math.random() * max ) + min; const msg = 'ConcatAll message '; const observable = of(1, 2, 3, 4, 5).pipe( map((num) => of(msg + num).pipe(delay(randomDelay(1, 4) * 1000))), //merge values from inner observable concatAll() ); observable.subscribe((message: string) => userMessage.innerHTML = message);
JavaScript Observables and the merge Operator
If, rather than display messages in the order in which their observables are executed, we wanted to display them as they complete, we could use the merge operator. It’s more efficient than concat in that merge creates all of the observables right away and then emits their output to subscribers as soon as they complete. In that sense, observables are still executed in the order listed, but their output is passed along once emitted. Here is the same code snippet as we saw previously, using merge:
// helper method const delayedMessage = (message: string, delayedTime: number) => EMPTY.pipe(startWith(message), delay(delayedTime)); const mergeMessage = 'Merge message '; merge( delayedMessage(mergeMessage + 1, 1000), delayedMessage(mergeMessage + 2, 3000), delayedMessage(mergeMessage + 3, 2000), delayedMessage(mergeMessage + 4, 1000), delayedMessage(mergeMessage + 5, 4000), // clear the screen delayedMessage('', 6000) ) .subscribe((message: any) => userMessage.innerHTML = message);
Now, instead of seeing messages in creation order of 1, 2, 3, 4, 5, they appear in order of execution time, i.e. 1000 (1), 1000 (4), 2000 (3), 3000 (2), 4000 (5). Another ramification of using merge is that the clear screen message has to have a longer execution time than the other messages.
Combining Streams as They Are Emitted Using mergeAll
The merge operator also has an equivalent mergeAll for combining several inner observable streams and concurrently emitting all input stream values. Like concatAll, mergeAll takes inner streams (observables) and emits them as a new stream, which we would refer to as higher-order observables.
Here is the previous concatAll example updated to employ mergeAll:
// helper method const delayedMessage = (message: string, delayedTime: number) => EMPTY.pipe(startWith(message), delay(delayedTime)); btnMergeAll.addEventListener("click", (ev: MouseEvent) => { const msg = 'MergeAll message '; const observable = of(1, 2, 3, 4, 5).pipe( map((num) => of(msg + num).pipe(delay(randomDelay(1, 4) * 1000))), //merge values from inner observable mergeAll() ); observable.subscribe((message: string) => userMessage.innerHTML = message); });
Due to the random delay, the above five streams may emit in any order. Likewise, successive streams that have the same delay will overlap so that the first is immediately eclipsed by the second.
Like to see the above code snippets in action? There’s a demo on stackblitz.com.
Chaining RxJS Observables
This blog presented a few options for chaining observables to execute in a specific order. Once you’ve got the hang of that, you can combine them with transformation operators such as map, mergeMap, switchMap, and flatMap to produce higher-order observables that are better suited to your specific requirements.