Sunday, September 26, 2021

Combining RxJS Observables Using the withLatestFrom and zip Operators

In Angular applications, we often rely on more than one asynchronous call to load component data. This often necessitates that we hold off on initializing the component until we have received all of the responses. One way to do that is to employ some of the RxJS library’s plethora of combination operators. In this tutorial, we’ll examine withLatestFrom and zip.

Read: RxJS Observables Primer in Angular.

withLatestFrom in RxJS

You could say that withLatestFrom is the counterpart to combineLatest. Both operators combine two or more source observables and emit values calculated from the latest values of each. What distinguishes withLatestFrom from combineLatest can be seen in its signature:

observable.withLatestFrom(other: Observable, project: Function): Observable

As you can see, withLatestFrom acts on a source observable to combine it with other streams to emit values calculated from the latest values of each, but only when the source emits. So, while combineLatest emits a new value whenever there’s a new emission from any of the input streams, withLatestFrom emits a new value only if there’s a new emission from the source stream instance.

Like combineLatest, it still waits for at least one emitted value from each stream and may complete without a single emission when the source stream completes. Moreover, it will never complete if the source stream doesn’t complete and will throw an error if any of the inner streams errors out.

To better understand how withLatestFrom works, let’s use it to calculate BMI (Body Mass Index) from two streams of weight and height values, just as we did in the combineLatest demo:

const weight = of(70, 72, 76, 79, 75);
const height = of(1.76, 1.77, 1.78);

const bmi = weight.pipe(
  withLatestFrom(height, (w, h) => {
  console.log('project values: w =', w, ', h =', h);
  return w / (h * h);
}));
bmi.subscribe(res => console.log('BMI is ' + res));

// Output to console is:
project values: w = 70 , h = 1.78
BMI is 22.093170054286073
project values: w = 72 , h = 1.78
BMI is 22.724403484408533
project values: w = 76 , h = 1.78
BMI is 23.98687034465345
project values: w = 79 , h = 1.78
BMI is 24.933720489837143
project values: w = 75 , h = 1.78
BMI is 23.671253629592222

In the above example, the weight observable provides the source stream. As such, withLatestFrom calculated the BMI whenever it emits a value. In each case, th final value height value of 1.78 is utilized.

Compare that output to what was produced by combineLatest:

// Output to console is:
// project values: w = 75, h = 1.76
// BMI is 24.212293388429753
// project values: w = 75, h = 1.77
// BMI is 23.93948099205209
// project values: w = 75, h = 1.78
// BMI is 23.671253629592222

It only emitted three BMI values – one of each input stream emission combinations. In this case, the number of values emitted from combineLatest was determined by the stream that emitted the least number of values, which was height.

zip in RxJS

Typically, an operator’s name reveals enough about its functioning to remind us what it does, once you’ve familiarized yourself with it. In the case of zip, we have an operator that combines observable streams in a way that mimics the behavior of a zipper on clothing or a bag. As such, it combines two or more stream sequences as an array of values whose length is equal to the number of input streams provided, i.e., a pair in case of two input streams. To do that, zip waits for values to be emitted from all input streams, then emits them as an array.

It is also worth noting that zip will only emit an array once it has fresh values from each source sequence. So if one of the source observables emits values faster than the others, the rate of publishing will be dictated by the slowest stream.

Zip stops emitting values when any of the input streams complete and the corresponding new values are emitted from other streams. Conversely, it will never complete if none of the input streams completes and will throw an error if any of the inner streams errors out.

Here is zip‘s signature:

zip(observable_1[, observable_2...observable_n]): Observable

The following code shows the zip operator in action:

import { map } from 'rxjs/operators';
import { of, zip } from 'rxjs';

let age$ = of<number>(30, 29, 44);
let name$ = of<string>('Bob', 'Carol', 'Jim');
let isDev$ = of<boolean>(true, true, false, false);

zip(age$, name$, isDev$)
  .pipe(map(([age, name, isDev]) => ({ age, name, isDev })))
  .subscribe(res => console.log(res));

// outputs
// { age: 30, name: 'Bob', isDev: true }
// { age: 29, name: 'Carol', isDev: true }
// { age: 44, name: 'Jim', isDev: false }

In the above snippet, three streams that emit all of there values without any delay. The zip output is piped to the RxJS map operator so that input parameters are displayed as name: value pairs in the subscribe function. Notice that the last boolean value of false is never emitted because the other two streams have completed by that point.

A zip Example with Delay

As this second example shows, introducing a delay to stream emissions does not change the results:

zip(
  age$.pipe(delay(2000)), 
  name$.pipe(delay(1000)),  
  isDev$.pipe(delay(3000))
)
  .pipe(map(([age, name, isDev]) => ({ age, name, isDev })))
  .subscribe(res => console.log(res));
  
// outputs
// { age: 30, name: 'Bob', isDev: true }
// { age: 29, name: 'Carol', isDev: true }
// { age: 44, name: 'Jim', isDev: false }

The only difference is that zip won’t emit anything until it has received values from each of the input streams, meaning that the subscribe function won’t receive a value until the isDev$ observable emits after a delay of 3 seconds. All values are subsequently emitted in quick succession.

RxJS Combination Operators

In this tutorial, we learned how to use two of the RxJS library’s two most powerful combination operators: withLatestFrom and zip.

All of the above code snippets are included in the stackblitz.com demo so that you can play with the code and observe the results.

Rob Gravelle
Rob Gravelle resides in Ottawa, Canada, and has been an IT guru for over 20 years. In that time, Rob has built systems for intelligence-related organizations such as Canada Border Services and various commercial businesses. In his spare time, Rob has become an accomplished music artist with several CDs and digital releases to his credit.

Popular Articles

Featured