Skip to content

Creating Observables in RxJS

Observables are the foundation of RxJS. Everything to do with RxJS revolves around Observables. In this article, we will look at the many different methods of creating Observables provided to us by RxJS.

There are two main methods to create Observables in RxJS. Subjects and Operators. We will take a look at both of these!

What is an Observable?

But first, what is an Observable?

Observables are like functions with zero arguments that push multiple values to their Observers, either synchronously or asynchronously.

This can be kind of confusing, so let's take a very basic example of an Observable that pushes 4 values to any of its Observers.

const obs$ = Observable.create((observer) => {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => observer.next(4), 1000);
});

console.log("before subscribe");
const observer = obs$.subscribe((v) => console.log("received: ", v));
console.log("after subscribe");

In the example above, we create the Observable and tell it to send 1, 2 and 3 to it's Observer immediately when it subscribes to the Observable. These are the synchronous calls. However, 4 doesn't get sent until 1 second later, occurring after we've logged after subscribe, making this an async operation.

You can see this in the output:

before subscribe
received:  1
received:  2
received:  3
after subscribe 
received:  4

The Observer will keep receiving values until the Observable notifies it that it has completed pushing values. If we modify the example above, we can see this in action.

const obs$ = Observable.create((observer) => {
  observer.next(1);
  observer.next(2);
  observer.complete();
  observer.next(3);
  setTimeout(() => observer.next(4), 1000);
});

console.log("before subscribe");
obs$.subscribe((v) => console.log("received: ", v));
console.log("after subscribe");

We've added a call to observer.complete(); after observer.next(2) which will notify the Observer that the Observer has finished pushing values.

Take a look at the new output:

before subscribe 
received:  1
received:  2
after subscribe 

We can see that even though we try to push the values 3 and 4 to the Observer, the Observer does not receive them.

A method of creating an Observable using the static create method is illustrated above. Now, we will take a look at creating Observables with Subjects and Operators.

Creating Observables with Subjects

A Subject can be thought of as a combination of EventEmitters and Observables. They act like both. An Observer can subscribe to a Subject to receive the values it pushes, while you can use the Subject directly to push new values to each Observer, or to tell each Observer that the Subject has completed pushing values.

There are 4 types of Subjects that RxJS exposes to us. We'll take a look at each in turn.

Subject

Subject is the most basic Subject that we can use to create Observables. It's very simple to use, and we can use it to push values to all Observers that are subscribed to it. Each Observer will only receive values that are pushed by the Subject after the Observer has subscribed.

Let's see this in action.

const subject$ = new Subject();

const observerA = subject$.subscribe((v) => console.log("Observer A: ", v));
const observerB = subject$.subscribe((v) => console.log("Observer B: ", v));

subject$.next(1);

const observerC = subject$.subscribe((v) => console.log("Observer C: ", v))

subject$.next(2);

We start by creating the subject, then create two Observers that will log each value they receive from the Subject (Observable).
We tell the Subject to push the value 1.
We then create ObserverC which also logs each value it receives from the Subject.
Finally, we tell the Subject to push the value 2.

Now, take a look at the output of this:

Observer A:  1
Observer B:  1
Observer A:  2
Observer B:  2
Observer C:  2

We can see that ObserverA and ObserverB both received 1 but ObserverC only received 2, highlighting that Observers of the basic Subject will only receive values that are pushed after they have subscribed!

BehaviorSubject

Another type of Subject we can use is BehaviorSubject. It works exactly the same as the basic Subject with one key difference. It has a sense of a current value. When the Subject pushes a new value, it stores this value internally. When any new Observer subscribes to the BehaviorSubject, it will immediately send them the last value that it pushed to its Observers.

If we take the example we used for Subject and change it to use a BehaviorSubject we can see this functionality in action:

const behaviorSubject$ = new BehaviorSubject();

const observerA = behaviorSubject$.subscribe((v) => console.log("Observer A: ", v));
const observerB = behaviorSubject$.subscribe((v) => console.log("Observer B: ", v));

behaviorSubject$.next(1);

const observerC = behaviorSubject$.subscribe((v) => console.log("Observer C: ", v))

behaviorSubject$.next(2);

Let's see the output to see the difference:

Observer A:  1
Observer B:  1
Observer C:  1
Observer A:  2
Observer B:  2
Observer C:  2

We can see that ObserverC was sent the value 1 even though it subscribed to the BehaviorSubject after the 1 was pushed.

ReplaySubject

The ReplaySubject is very similar to the BehaviorSubject in that it can remember the values it has pushed and immediately send them to new Observers that have subscribed. However, it allows you to specify how many values it should remember and will send all these values to each new Observer that subscribes.

If we modify the example above slightly, we can see this functionality in action:

const replaySubject$ = new ReplaySubject(2); // 2 - number of values to store

const observerA = replaySubject$.subscribe((v) => console.log("Observer A: ", v));

replaySubject$.next(1);
replaySubject$.next(2);
replaySubject$.next(3);

const observerB = replaySubject$.subscribe((v) => console.log("Observer B: ", v))

replaySubject$.next(4);

This time, we are going to have the ReplaySubject push 4 values to its Observers. We also tell it that it should always store the two latest values it emitted.

Let's take a look at the output:

Observer A:  1
Observer A:  2
Observer A:  3
Observer B:  2
Observer B:  3
Observer A:  4
Observer B:  4

We see that ObserverA receives the first 3 values perfectly fine. Then ObserverB subscribes to the ReplaySubject and it is immediately sent the values 2 and 3, which were the last two values the Subject had pushed. Then both Observers receive the next value of 4 correctly.

AsyncSubject

The AsyncSubject exposes all the same methods as Subject, however it works differently. It only ever sends the last value it has been told to push to its Observers, and it will only do this when the Subject is completed (by calling complete()). Therefore, Observers only receive values when the Subject completes and any Observers that subscribe after will immediately receive the value it pushed when it completed.

We can see this in action:

const asyncSubject$ = new AsyncSubject(2);

const observerA = asyncSubject$.subscribe((v) =>
  console.log("Observer A: ", v)
);

asyncSubject$.next(1);
asyncSubject$.next(2);

const observerB = asyncSubject$.subscribe((v) =>
  console.log("Observer B: ", v)
);

asyncSubject$.next(3);
asyncSubject$.complete();

const observerC = asyncSubject$.subscribe((v) =>
  console.log("Observer C: ", v)
);

The output of this is:

Observer A:  3
Observer B:  3
Observer C:  3

We can see that although ObserverA had subscribed before any values were pushed, it only received 3, the last one. We can also see that ObserverC also immediately received the value 3 even though it subscribed after the AsyncSubject had completed.

Creating Observables with Operators

An alternative method of creating Observables comes from the operators that RxJS exposes. These operators can be categorized based on their intention. In this article, we are going to look at the Creation Operators, so named as they create Observables.

You can see a list of these operators here: http://reactivex.io/rxjs/manual/overview.html#creation-operators

ajax

ajax is an operator that creates an Observable to handle AJAX Requests. It takes either a request object with URL, Headers etc or a string for a URL. Once the request completes, the Observable completes. This allows us to make AJAX requests and handle them reactively.

const obs$ = ajax("https://api.github.com/users?per_page=2");
obs$.subscribe((v) => console.log("received: ", v.response));

The output of this will be:

received:  (2) [Object, Object]

bindCallback

bindCallback allows you to take any function that usually uses a callback approach and transform it into an Observable. This can be quite difficult to wrap your head around, so we'll break it down with an example:

// Let's say we have a function that takes two numbers, multiplies them
// and passes the result to a callback function we manually provide to it
function multiplyNumbersThenCallback(x, y, callback) {
  callback(x * y);
}

// We would normally use this function as shown below
multiplyNumbersThenCallback(3, 4, (value) =>
  console.log("Value given to callback: ", value)
);

// However, with bindCallback, we can turn this function into
// a new function that takes the same arguments as the original
// function, but without the callback function
const multiplyNumbers = bindCallback(multiplyNumbersThenCallback);

// We call this function with the numbers we want to multiply
// and it returns to us an Observable that will only push 
// the result of the multiplication when we subscribe to it
multiplyNumbers(3, 4).subscribe((value) =>
  console.log("Value pushed by Observable: ", value)
);

By using bindCallback, we can take functions that use a Callback API and transform them into reactive functions that create Observables that we can subscribe to.

defer

defer allows you to create an Observable only when the Observer subscribes to it. It will create a new Observable for each Observer, meaning they do not share the same Observable even if it appears that they do.

const defferedObs$ = defer(() => of([1, 2, 3]));

const observerA = defferedObs$.subscribe((v) => console.log("Observer A: ", v));
const observerB = defferedObs$.subscribe((v) => console.log("Observer B: ", v));

This outputs:

Observer A:  (3) [1, 2, 3]
Observer B:  (3) [1, 2, 3]

Both Observers received an Observable with the same values pushed from it. These are actually different Observables even though they pushed the same values. We can illustrate that defer creates different Observables for each Observer by modifying the example:

let numOfObservers = 0;
const defferedObs$ = defer(() => {
  if(numOfObservers === 0) {
    numOfObservers++;
    return of([1, 2, 3]);
  }

  return of([4,5,6])
});

const observerA = defferedObs$.subscribe((v) => console.log("Observer A: ", v));
const observerB = defferedObs$.subscribe((v) => console.log("Observer B: ", v));

We've changed the defer object to give the first Observer an Observable of [1, 2, 3] and any other Observers [4, 5, 6]. Which we can then see in the output:

Observer A:  (3) [1, 2, 3]
Observer B:  (3) [4, 5, 6]

empty

The empty operator creates an Observable that pushes no values and immediately completes when subscribed to:

const obs$ = empty();
obs$.subscribe((v) => console.log("received: ", v));

This produces NO output as it never pushes a value.

from

from is a powerful operator. It can convert almost anything into an Observable, and pushes the values from these sources in an intelligent manner, based on the source itself.

We'll take two examples- an array and an iterable from a generator:

const obs$ = from([1,2,3]);
obs$.subscribe((v) => console.log("received: ", v));

With an array, from will take each element in the array and push them separately:

received:  1
received:  2
received:  3

Similarly, with the iterable from the generator, we will get each value separately:

function* countToTen() {
  let i = 0;
  while(i < 11) {
    yield i;
    i++;
  }
}

const obs$ = from(countToTen());
obs$.subscribe((v) => console.log("received: ", v));

If we create a generator that counts to 10, then from will push each number from 0-10:

received:  0
received:  1
received:  2
received:  3
received:  4
received:  5
received:  6
received:  7
received:  8
received:  9
received:  10

fromEvent

The fromEvent operator will create an Observable that pushes a every event of a specified type that has occurred on a specified event target, such as every click on a webpage.

We can set this up very easily:

const obs$ = fromEvent(document, "click");
obs$.subscribe(() => console.log("received click!"));

Every time you click on the page it logs "received click!":

received click!
received click!

fromEventPattern

The fromEventPattern is similar to the fromEvent operator in that it works with events that have occurred. However, it takes two arguments. An addHandler function argument and a removeHandler function argument.

The addHandler function is called when the Observable is subscribed to, and the Observer that has subscribed will receive every event that is set up in the addHandler function.

The removeHandler function is called when the Observer unsubscribes from the Observable.

This sounds more confusing than it actually is. Let's use the example above where we want to get all clicks that occur on the page:

function addHandler(handler) {
  document.addEventListener('click', handler)
}

function removeHandler(handler) {
  document.removeEventListener('click', handler)
}

const obs$ = fromEventPattern(addHandler, removeHandler);
obs$.subscribe(() => console.log("received click!"));

Every time you click on the page it logs "received click!":

received click!
received click!

generate

This operator allows us to set up an Observable that will create values to push based on the arguments we pass to it, with a condition to tell it when to stop.

We can take our earlier example of counting to 10 and implement it with this operator:

const obs$ = generate(
  1,
  (x) => x < 11,
  (x) => x++
)

obs$.subscribe((v) => console.log("received: ", v));

This outputs:

received:  0
received:  1
received:  2
received:  3
received:  4
received:  5
received:  6
received:  7
received:  8
received:  9
received:  10

interval

The interval operator creates an Observable that pushes a new value at a set interval of time. The example below shows how we can create an Observable that pushes a new value every second:

const obs$ = interval(1000);
obs$.subscribe((v) => console.log("received: ", v));

Which will log a new value every second:

received:  0
received:  1
received:  2

never

The never operator creates an Observable that never pushes a new value, never errors, and never completes. It can be useful for testing or composing with other Observables.

const obs$ = never();
// This never logs anything as it never receives a value
obs$.subscribe((v) => console.log("received: ", v));

of

The of operator creates an Observable that pushes values you supply as arguments in the same order you supply them, and then completes.

Unlike the from operator, it will NOT take every element from an array and push each. It will, instead, push the full array as one value:

const obs$ = of(1000, [1,2,4]);
obs$.subscribe((v) => console.log("received: ", v));

The output of which is:

received:  1000
received:  (3) [1, 2, 4]

range

The range operator creates an Observable that pushes values in sequence between two specified values. We'll take our count to 10 example again, and show how it can be created using the range operator:

const obs$ = range(0, 10);
obs$.subscribe((v) => console.log("received: ", v));

The output of this is:

received:  0
received:  1
received:  2
received:  3
received:  4
received:  5
received:  6
received:  7
received:  8
received:  9
received:  10

throwError

The throwError operator creates an Observable that pushes no values but immediately pushes an error notification. We can handle errors thrown by Observables gracefully when an Observer subscribes to the Observable:

const obs$ = throwError(new Error("I've fallen over"));
obs$.subscribe(
  (v) => console.log("received: ", v),
  (e) => console.error(e)
);

The output of this is:

Error: I've fallen over

timer

timer creates an Observable that does not push any value until after a specified delay. You can also tell it an interval time, wherein after the initial delay, it will push increasing values at each interval.

const obs$ = timer(3000, 1000);
obs$.subscribe((v) => console.log("received: ", v));

The output starts occurring after 3 seconds and each log is 1 second apart

received:  0
received:  1
received:  2
received:  3

Hopefully, you have been introduced to new methods of creating Observables that will help you when working with RxJS in the future! There are some Creation Operators that can come in super handy for nuanced use-cases, such as bindCallback and fromEvent.