[javascript] How to get current value of RxJS Subject or Observable?

I have an Angular 2 service:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';

@Injectable()
export class SessionStorage extends Storage {
  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
  }
}

Everything works great. But I have another component which doesn't need to subscribe, it just needs to get the current value of isLoggedIn at a certain point in time. How can I do this?

This question is related to javascript angular rxjs

The answer is


The best way to do this is using Behaviur Subject, here is an example:

var sub = new rxjs.BehaviorSubject([0, 1])
sub.next([2, 3])
setTimeout(() => {sub.next([4, 5])}, 1500)
sub.subscribe(a => console.log(a)) //2, 3 (current value) -> wait 2 sec -> 4, 5

Although it may sound overkill, this is just another "possible" solution to keep Observable type and reduce boilerplate...

You could always create an extension getter to get the current value of an Observable.

To do this you would need to extend the Observable<T> interface in a global.d.ts typings declaration file. Then implement the extension getter in a observable.extension.ts file and finally include both typings and extension file to your application.

You can refer to this StackOverflow Answer to know how to include the extensions into your Angular application.

// global.d.ts
declare module 'rxjs' {
  interface Observable<T> {
    /**
     * _Extension Method_ - Returns current value of an Observable.
     * Value is retrieved using _first()_ operator to avoid the need to unsubscribe.
     */
    value: Observable<T>;
  }
}

// observable.extension.ts
Object.defineProperty(Observable.prototype, 'value', {
  get <T>(this: Observable<T>): Observable<T> {
    return this.pipe(
      filter(value => value !== null && value !== undefined),
      first());
  },
});

// using the extension getter example
this.myObservable$.value
  .subscribe(value => {
    // whatever code you need...
  });

I had similar situation where late subscribers subscribe to the Subject after its value arrived.

I found ReplaySubject which is similar to BehaviorSubject works like a charm in this case. And here is a link to better explanation: http://reactivex.io/rxjs/manual/overview.html#replaysubject


A subscription can be created and after taking the first emitted item destroyed. Pipe is a function that uses an Observable as its input and returns another Observable as output, while not modifying the first observable. Angular 8.1.0. Packages: "rxjs": "6.5.3", "rxjs-observable": "0.0.7"

  ngOnInit() {

    ...

    // If loading with previously saved value
    if (this.controlValue) {

      // Take says once you have 1, then close the subscription
      this.selectList.pipe(take(1)).subscribe(x => {
        let opt = x.find(y => y.value === this.controlValue);
        this.updateValue(opt);
      });

    }
  }

A similar looking answer was downvoted. But I think I can justify what I'm suggesting here for limited cases.


While it's true that an observable doesn't have a current value, very often it will have an immediately available value. For example with redux / flux / akita stores you may request data from a central store, based on a number of observables and that value will generally be immediately available.

If this is the case then when you subscribe, the value will come back immediately.

So let's say you had a call to a service, and on completion you want to get the latest value of something from your store, that potentially might not emit:

You might try to do this (and you should as much as possible keep things 'inside pipes'):

 serviceCallResponse$.pipe(withLatestFrom(store$.select(x => x.customer)))
                     .subscribe(([ serviceCallResponse, customer] => {

                        // we have serviceCallResponse and customer 
                     });

The problem with this is that it will block until the secondary observable emits a value, which potentially could be never.

I found myself recently needing to evaluate an observable only if a value was immediately available, and more importantly I needed to be able to detect if it wasn't. I ended up doing this:

 serviceCallResponse$.pipe()
                     .subscribe(serviceCallResponse => {

                        // immediately try to subscribe to get the 'available' value
                        // note: immediately unsubscribe afterward to 'cancel' if needed
                        let customer = undefined;

                        // whatever the secondary observable is
                        const secondary$ = store$.select(x => x.customer);

                        // subscribe to it, and assign to closure scope
                        sub = secondary$.pipe(take(1)).subscribe(_customer => customer = _customer);
                        sub.unsubscribe();

                        // if there's a delay or customer isn't available the value won't have been set before we get here
                        if (customer === undefined) 
                        {
                           // handle, or ignore as needed
                           return throwError('Customer was not immediately available');
                        }
                     });

Note that for all of the above I'm using subscribe to get the value (as @Ben discusses). Not using a .value property, even if I had a BehaviorSubject.


I encountered the same problem in child components where initially it would have to have the current value of the Subject, then subscribe to the Subject to listen to changes. I just maintain the current value in the Service so it is available for components to access, e.g. :

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';

@Injectable()
export class SessionStorage extends Storage {

  isLoggedIn: boolean;

  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
    this.currIsLoggedIn = false;
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
    this.isLoggedIn = value;
  }
}

A component that needs the current value could just then access it from the service, i.e,:

sessionStorage.isLoggedIn

Not sure if this is the right practice :)


You could store the last emitted value separately from the Observable. Then read it when needed.

let lastValue: number;

const subscription = new Service().start();
subscription
    .subscribe((data) => {
        lastValue = data;
    }
);

const observable = of('response')

function hasValue(value: any) {
  return value !== null && value !== undefined;
}

function getValue<T>(observable: Observable<T>): Promise<T> {
  return observable
    .pipe(
      filter(hasValue),
      first()
    )
    .toPromise();
}

const result = await getValue(observable)
// Do the logic with the result
// .................
// .................
// .................

You can check the full article on how to implement it from here. https://www.imkrish.com/blog/development/simple-way-get-value-from-observable


The only way you should be getting values "out of" an Observable/Subject is with subscribe!

If you're using getValue() you're doing something imperative in declarative paradigm. It's there as an escape hatch, but 99.9% of the time you should NOT use getValue(). There are a few interesting things that getValue() will do: It will throw an error if the subject has been unsubscribed, it will prevent you from getting a value if the subject is dead because it's errored, etc. But, again, it's there as an escape hatch for rare circumstances.

There are several ways of getting the latest value from a Subject or Observable in a "Rx-y" way:

  1. Using BehaviorSubject: But actually subscribing to it. When you first subscribe to BehaviorSubject it will synchronously send the previous value it received or was initialized with.
  2. Using a ReplaySubject(N): This will cache N values and replay them to new subscribers.
  3. A.withLatestFrom(B): Use this operator to get the most recent value from observable B when observable A emits. Will give you both values in an array [a, b].
  4. A.combineLatest(B): Use this operator to get the most recent values from A and B every time either A or B emits. Will give you both values in an array.
  5. shareReplay(): Makes an Observable multicast through a ReplaySubject, but allows you to retry the observable on error. (Basically it gives you that promise-y caching behavior).
  6. publishReplay(), publishBehavior(initialValue), multicast(subject: BehaviorSubject | ReplaySubject), etc: Other operators that leverage BehaviorSubject and ReplaySubject. Different flavors of the same thing, they basically multicast the source observable by funneling all notifications through a subject. You need to call connect() to subscribe to the source with the subject.

Examples related to javascript

need to add a class to an element How to make a variable accessible outside a function? Hide Signs that Meteor.js was Used How to create a showdown.js markdown extension Please help me convert this script to a simple image slider Highlight Anchor Links when user manually scrolls? Summing radio input values How to execute an action before close metro app WinJS javascript, for loop defines a dynamic variable name Getting all files in directory with ajax

Examples related to angular

error NG6002: Appears in the NgModule.imports of AppModule, but could not be resolved to an NgModule class error TS1086: An accessor cannot be declared in an ambient context in Angular 9 TS1086: An accessor cannot be declared in ambient context @angular/material/index.d.ts' is not a module Why powershell does not run Angular commands? error: This is probably not a problem with npm. There is likely additional logging output above Angular @ViewChild() error: Expected 2 arguments, but got 1 Schema validation failed with the following errors: Data path ".builders['app-shell']" should have required property 'class' Access blocked by CORS policy: Response to preflight request doesn't pass access control check origin 'http://localhost:4200' has been blocked by CORS policy in Angular7

Examples related to rxjs

Angular - "has no exported member 'Observable'" What is pipe() function in Angular How to convert Observable<any> to array[] TypeError: You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable What is the difference between Subject and BehaviorSubject? Best way to import Observable from rxjs take(1) vs first() Observable Finally on Subscribe Getting an object array from an Angular service BehaviorSubject vs Observable?