Thursday, December 9, 2021

Combining RxJS Observables Using combineLatest and forkJoin

One of the hallmarks of complex applications is the sourcing of data from numerous sources. These can be external data providers, applications, or a local database. RxJS provides a variety of operators for combining these into one observable based on order, time, and/or structure of emitted values. In this article, we’ll learn to subscribe to multiple streams simultaneously by employing two of the most popular RxJS combination operators: combineLatest and forkJoin.

RxJS combineLatest Operator

One of the most common reasons for combining observables is to perform some type of calculation or determination using data from each. The combineLatest operator emits an item whenever any of the source observables emits a value, but only once each of the source observables has emitted at least one value. As such, whenever any of the source observables emits a value, combineLatest:

  1. combines the most recently emitted items from each of the other source observables, and
  2. if you provided a project function, emits the return value from that function.

Here is the combineLatest signature:

combineLatest(observables: ...Observable [, project: function]): Observable

The following code snippet shows a typical usage scenario of combineLatest usage. You’ll notice that the weight and height observables are passed to combineLatest as an array (of arrays). The second (optional) parameter is the project function that calculates the BMI using the emitted values from the weight and height observables. Finally, the results of the calculation are returned as a new observable:

import { combineLatest, of } from 'rxjs';

const weight = of(70, 72, 76, 79, 75);
const height = of(1.76, 1.77, 1.78);
const bmi = combineLatest([weight, height], (w, h) => {
  console.log('project values: w = ', w, ', h = ', h);
  return w / (h * h);
});
bmi.subscribe(res => console.log('BMI is ' + res));

// Output to console is:
// project values: w = 75, h = 1.76
// BMI is 24.212293388429753
// project values: w = 75, h = 1.77
// BMI is 23.93948099205209
// project values: w = 75, h = 1.78
// BMI is 23.671253629592222

The console.log() output from the project function shows that the last emitted weight value is used in all calculations. It is combined with each of the height values to produce the BMI results. This happens because the weight observable values are emitted immediately without a delay. Hence:

if one observable emits values before the others do, then those values are lost.

The number of values emitted from combineLatest is determined by the stream that emits the least number of values, in this case, height.

RxJS forkJoin Operator

Like combineLatest, forkJoin also takes an array of observables. However, it emits an array of values in the exact same order as the passed array. The returned observable will emit the last values emitted from each input stream.

The signature of forkJoin is nearly identical to that of combineLatest:

forkJoin(...args [, selector : function]): Observable

If we rerun our previous example, substituting forkJoin for combineLatest, we can see that only one BMI value is produced:

const bmi = forkJoin([weight, height], (w, h) => {
  console.log('selector values: w =', w, ', h =', h);
  return w / (h * h);
});
bmi.subscribe(res => console.log('BMI is ' + res));

// Output to console is:
// selector values: w = 75, h = 1.78
// BMI is 3.671253629592222

How Combination Operators Know When All Observables Have Completed

In order to emit the last values emitted from each input stream, forkJoin needs to know when all streams have completed. This begs the question, how does it know when a stream is done emitting values. The answer lies is in the Subscription class, which implements the Observer interface. In the recent RxJS Observables Primer in Angular article, we learned about the next(), error(), and complete() handlers of the Observer interface. The Combination Operators wait for the complete() signal to combine streams.

The of() function takes care of calling complete() for us, but it’s our responsibility when creating our own observables. We can see that both observables below call complete() when done emitting values:

const weight = 
  new Observable<number>((subscriber: Subscriber<number>) => {
    subscriber.next(70);
    subscriber.next(72);
    setTimeout(() => {
      subscriber.next(76);
      subscriber.next(79);
      subscriber.next(75);
      subscriber.complete();
    }, 1200);
});
const height = 
  new Observable<number>((subscriber: Subscriber<number>) => {
    subscriber.next(1.76);
    subscriber.next(1.77);
    setTimeout(() => {
      subscriber.next(1.78);
      subscriber.complete();
    }, 1000);
});
const bmi = forkJoin([weight2, height2], (w, h) => {
  console.log('selector values: w =', w, ', h =', h);
  return w / (h * h);
});
bmi.subscribe(res => console.log('BMI is ' + res));

// Output to console is:
// selector values: w = 75, h = 1.78
// BMI is 3.671253629592222

Once again, forkJoin returns 3.671253629592222.

All of the above code snippets are included in the stackblitz.com demo.

Conclusion

In this article, we learned to subscribe to multiple streams simultaneously by employing two of the most popular RxJS combination operators: combineLatest and forkJoin. Not quite what you were looking for? Fear not, there are more combination operators where they came from. We’ll explore some of these in the coming weeks.

Rob Gravelle
Rob Gravelle resides in Ottawa, Canada, and has been an IT guru for over 20 years. In that time, Rob has built systems for intelligence-related organizations such as Canada Border Services and various commercial businesses. In his spare time, Rob has become an accomplished music artist with several CDs and digital releases to his credit.

Popular Articles

Featured