The most popular library used with the Angular framework for reactive programming is RxJS, which introduces itself on the official site as follows:

RxJS is a library for reactive programming using Observables, to make it easier to compose asynchronous or callback-based code.

https://rxjs.dev/

By subscribing to Observable we are connecting to the data source so that the handler we set will be called each time a new value is emitted. Since a subscription by default does not have a fixed ‘expiration date‘, we need to take care to terminate it at the right time to avoid memory leaks. Let’s take a look at some ideas on how we can end a subscription.

takeWhile()

source$
  .pipe(
    map(data => // get data),
    takeWhile(data => !data)
  )
  .subscribe();

The subscription is active as long as the date is empty (or more precisely, is null, NaN, 0, an empty string, or undefined).

source$
  .pipe(
    takeWhile(val => val <= 4)
  )
  .subscribe();

The subscription works as long as the received value from the stream is less than or equal to 4.

first()

source$
  .pipe(
    first()
  )
  .subscribe();

The subscription ends when the first value is received.

You can find more filter operators in the official RxJS documentation.

Implement decorator function

A way to bypass having to call unsubscribe for every subscription in a class is to implement a decorator.

export const AutoUnsubscribe = (constructor: Function) => {
  const original = constructor.prototype.ngOnDestroy;

  if (typeof original !== 'function') {
    throw new Error(
      `${constructor.name} is using @AutoUnsubscribe but does not implement ngOnDestroy`
    );
  }

  constructor.prototype.ngOnDestroy = function() {
    for (const prop in this) {
      if (typeof this[prop]?.unsubscribe === 'function') {
        this[prop].unsubscribe();
      }
    }
    original.apply(this, arguments);
  };
};

Usage:
We need to add @AutoUnsubscribe() before the class, ngOnDestroy() must be present.

@AutoUnsubscribe()
@Component(...)
export class MyComponent implements OnDestroy, OnInit {
  protected subscription: Subscription;

  ngOnInit() {
    this.subscription = ...
  }  
  ngOnDestroy() {} // must be present
}

When does the subscription automatically end?

Subscription ends automatically when an Observable issues an OnError or OnComplete notification to its observers.

Knowing how to use RxJS is a must have when developing Angular-based applications, so let this article be a motivation for anyone to further their knowledge of reactive programming.