The early days of GUI applications ushered in a new kind of paradigm known as event-driven architecture. Its main feature is that the flow of the program is determined by events such as user actions (mouse clicks, key presses), sensor outputs, or message passing from other programs or threads. It was part of Visual Basic, Java, and JavaScript (JS). In all of these languages, the process is much the same: a daemon thread listens for events and then triggers a callback function when one of those events is detected.
In today’s responsive web apps, event-driven programming has reached new heights of flexibility and complexity. Aided by libraries such as RxJS, events may be triggered by everything from AJAX responses, elements loading, results of complex calculation, you name it. To handle this vast array of event types, RxJS introduced the Observable. In this tutorial, we’ll learn what an Observable is, and how to use it within the context of your Angular applications.
Learn about Error Handling in RxJS.
Streams and Observables in RxJS
If you’ve ever worked in Java, you may have some familiarity with streams already. In any event, we’ll recap what a stream is here so that we’re all on the same page. A stream is basically a sequence of data values over time. Data values may be represented by any type, including primitives such as numbers, built-in JS objects such as strings and Dates, as well as custom classes and/or objects. The key take away is that data values may be collected over time.
An Observable is a special type of function that can return a stream of values to an observer as they are made available, in a synchronous or asynchronous manner. The stream may never emit a value or produce an infinite number of values during the life of an application. The observer subscribes to the Observable and provides a callback function that executes whenever the underlying stream emits a value.
The Observable Lifecycle
Due to the fact that you can have many Observables listening for new values, they can take up a lot of resources if not managed with the utmost care. Every Observable instance passes through these four stages throughout its lifetime:
- Creation
- Subscription
- Execution
- Destruction
The next several sections cover each lifecycle stage in more detail.
Creation and the new Keyword
Observables are objects; hence, they can be instantiated using the new keyword:
import { Observable, Subscriber } from 'rxjs'; const observable = new Observable<string>((subscriber: Subscriber<string>) => { subscriber.next('Hello World!'); });
You need to pass in a subscriber function. This is the function that is executed when a consumer calls the subscribe() method. The subscriber function defines how to obtain or generate values or messages to be published. Your function will receive a subscriber that implements the Observer interface and extends the Subscription class. The Observer interface includes three useful methods that handle the different types of notifications that an observable can send. These are:
- next: Required. A handler for each delivered value. Called zero or more times after execution starts.
- error: Optional. A handler for an error notification. An error halts execution of the observable instance.
- complete: Optional. A handler for the execution-complete notification. Delayed values can continue to be delivered to the next handler after execution is complete.
An Observable instance will only begin publishing values once someone subscribes to it.
Subscription and the subscribe() method
You subscribe by calling the subscribe() method of the instance, passing an observer object to receive the notifications. Subscribe() receives an object with the three handlers mentioned above, but only next is required. Here is the method signature:
source$.subscribe({ next: doSomething, [error: handleTheError,] [complete: wrapItUp] })
Here’s some code that implements all three handlers:
observable.subscribe({ next: (message: string) => { this.messages.push(message); this.message = this.messages.join(', '); }, error: (error: string) => { console.log(error); this.errMsg = error; }, complete: () => console.log('All done.') });
Execution
The observer is in charge of executing instructions in the Observable by providing data, errors, and the signal that transmissions are complete. Once the complete value is sent, no new data can be delivered to the Observable (although delayed ones can).
import { Observable, Subscriber } from 'rxjs'; const observable = new Observable<string>((subscriber: Subscriber<string>) => { subscriber.next('I am number 1'); subscriber.next('I am number 2'); setTimeout(() => { subscriber.next('I am number 3'); subscriber.complete(); }, 1000); });
Destruction and the complete() Method
It’s vitally important that you always call complete() on your observables when done. Otherwise, you could wind up with memory leaks and code executing when you don’t expect it to. This is especially true in Angular applications. Here’s a popular Angular design pattern that calls complete() when a component is destroyed so that the observable no longer emits values:
import { Subject, Subscription, Observable } from 'rxjs'; ngUnsubscribe = new Subject(); observable .pipe(takeUntil(this.ngUnsubscribe)) .subscribe({ next: (message: string) => { this.messages.push(message); this.message = this.messages.join(', '); }, error: (error: string) => { console.log(error); this.errMsg = error; }, complete: () => console.log('All done.') }); ngOnDestroy() { this.ngUnsubscribe.next(); this.ngUnsubscribe.complete(); }
Subject is a type of Observable so it can invoke its three broadcasting methods.
Conclusion
In this tutorial, we learned what an Observable is, and how to use it to broadcast asynchronous messages to subscribers within your Angular applications.
You’ll find a demo of today’s code on stackblitz.com.