Loading...
Async actions in JavaScript are common and error prune.
We query the server, set timers, get events and we get most of them asynchronously.
In JavaScript we use to deal with async actions using promises but since async actions are so common, hard to debug and easy to make a lot of bugs in them, it might help us to deal with async actions with a stronger tool.
While Promises lets us deal with async event, with RXJS we can deal with async data stream, meaning one or more (can be infinite) async events.
Moreover RXJS brings us a large toolset to solve common async problems very easily.
In this article we are going to focus on RXJS operators, what they are, how to use them, and what are the common operators we can use.
In RXJS an operator either create a new data stream, or must of the operator manipulate a data stream to a different one. An operator usually returns an Observable, so this means you can chain operators together to make multiple transformation
modifiedObservalbe = someObservable
.operator1
.operator2
The goal of this article is to go over the common operators that are most used. So let's start experimenting...
import {Subject} from "rxjs/Subject";
import {combineLatest} from "rxjs/observable/combineLatest";
import {Observable} from "rxjs/Observable";
const subj1: Subject<string> = new Subject<string>();
const subj2: Subject<number> = new Subject<number>();
const combined: Observable<any> = combineLatest(subj1, subj2);
combined.subscribe((arg) => {
console.log('combined');
console.log(arg);
}, () => {
console.log('error')
});
setTimeout(() => {
console.log('first timer');
subj1.next('hello');
}, 1000);
setTimeout(() => {
console.log('second timer');
subj2.next(1);
}, 2000);
setTimeout(() => {
console.log('3rd timer');
subj1.next('world');
}, 3000);
setTimeout(() => {
console.log('4th timer');
subj1.error(new Error('stam'));
}, 3000);
We create here two Subjects (which inherit from observable)
we combine the subjects with combineLatest.
we set 3 timers one after a second one after 2 seconds and one after 3
The first timer emits a value for the first subject, at this point the combined is not emitting since the other observable didn't emit a vlue
The second timer emits a value for the second subject, at this point the combined has atleast one value from each observables in the list so it will emit the array of latest values ['hello', 1]
The third timer emits a value for the first subject, the combined will log: ['world', 1]
The fourth timer throws an error, if one of the observables return an error the combined will call the error method.
common usage can also be when we send multiple requests to server and we want an observable to emit after all requests got a response.
This operator will combine multiple Observables into one. So if one of the observables emit a value the combined one will emit as well
Use case can be if querying the server with multiple requests and we want a combined observable when each one of the requests if resolved.
import {merge} from "rxjs/observable/merge";
import {Subject} from "rxjs/Subject";
import {Observable} from "rxjs/Observable";
const subOdd: Subject<number> = new Subject<number>();
const subEven: Subject<number> = new Subject<number>();
const mergedObservable: Observable<number> = merge(subEven, subOdd);
mergedObservable.subscribe((value: number) => {
console.log(value);
}, () => {
console.log('will error out if one of the observables errors')
});
subOdd.next(1);
subOdd.next(3);
subOdd.next(5);
subEven.next(0);
subEven.next(2);
subEven.next(4);
subOdd.error(new Error('Yet another error'));
only when observable completes, it will start with the next observable
import {concat} from "rxjs/observable/concat";
import {Subject} from "rxjs/Subject";
import {Observable} from "rxjs/Observable";
const subOdd: Subject<number> = new Subject<number>();
const subEven: Subject<number> = new Subject<number>();
const mergedObservable: Observable<number> = concat(subEven, subOdd);
mergedObservable.subscribe((value: number) => {
console.log(value);
}, () => {
console.log('will error out if one of the observables errors')
});
subOdd.next(1); // will not print since subEven is not complete
subOdd.next(3); // will not print since subEven is not complete
subOdd.next(5); // will not print since subEven is not complete
subEven.next(0); // will be printed
subEven.next(2); // will be printed
subEven.next(4); // will be printed
subEven.complete();
subOdd.next(7); // now it will be printed
subOdd.next(9); // now it will be printed
subOdd.error(new Error('Yet another error'));
This operator will turn array, promise or iterable into an observable
import {from} from "rxjs/observable/from";
import fetch from "isomorphic-fetch";
// observable from promise
from(fetch('https://nztodo.herokuapp.com/api/task/?format=json'))
.subscribe((res) => {
console.log('got response from server');
});
from([1,2,3,4])
.subscribe((value: number) => {
console.log(value);
})
In the first example we are turning an ajax request to the server, that usually returns a promise, to an Observable.
The transformation from promise to observable is extremely easy
of is getting arguments and emits them by order.
import {Observable} from "rxjs";
import "rxjs/Observable/of";
Observable.of(('first Value') as any, 2, ['third', 'value'])
.subscribe((value: string | number | string[]) => {
console.log(value);
})
We are casting the first value cause otherwise typescript will use type prediction and will assume that all value we send should be string.
the output will be: 'first value', 2, ['third', 'value']
Creates an observable that will close with an error
import {_throw} from "rxjs/observable/throw";
_throw(new Error)
.subscribe(() => {
console.log("this won't be called");
}, (err) => {
console.log("this will be called first");
}, () => {
console.log("this won't be called");
});
This will catch an observable with an error, and return an observable which calls the next with the error and then the complete. the operator gets the error and should return a new observable
import {catchError} from "rxjs/operators";
import {_throw} from "rxjs/observable/throw";
import {Observable} from "rxjs/Observable";
import {of} from "rxjs/observable/of";
const errorObservalbe: Observable<any> = _throw(new Error('some error message'));
errorObservalbe.pipe(
catchError((error: Error) => of(error.message))
).subscribe(
(message: string) => {
console.log(message);
},
() => console.log('error wont be called'),
() => console.log('complete will be called')
)
only emit the value if it meets a certain condition
The following example will create an observable that emit the numbers from 0 to 10. We will use the filter operator to only emit the even numbers
import {Observable} from "rxjs/Observable";
import 'rxjs/add/observable/range';
import {filter} from "rxjs/operators";
const counterObservable: Observable<number> = Observable.range(0,10);
counterObservable.pipe(
filter((num: number) => num % 2 === 0)
).subscribe((evenNum: number) => {
console.log(evenNum);
});
discard emitted values if a certain time didn't pass between the last input
popular in scenario where you have an event that happens frequent like user typing search in search input.
and you don't want ot query the server everytime only when the user stopped typing.
example we will emit values then wait and then emit again
import {Subject} from "rxjs/Subject";
import 'rxjs/add/operator/debounceTime';
const sub: Subject<number> = new Subject<number>();
sub.debounceTime(1000).subscribe((value: number) => {
console.log('subscribe');
console.log(value);
});
sub.next(1);
sub.next(2);
sub.next(3);
setTimeout(() => {
console.log('firsttimer');
sub.next(4);
}, 1100);
sub.next(5);
setTimeout(() => {
console.log('secondtimer');
sub.next(7);
}, 2200);
every second our subscribe will pop and emit the last value of the subject.
the result will be: 5, 4, 7
only emits a value if it is different then the last one
import {Subject} from "rxjs/Subject";
import 'rxjs/add/operator/distinctUntilChanged';
const sub: Subject<number> = new Subject<number>();
sub.distinctUntilChanged().subscribe((val: number) => {
console.log(val);
});
sub.next(0);
sub.next(0);
sub.next(1);
sub.next(2);
sub.next(2);
sub.next(2);
sub.next(3);
// will output 0,1,2,3
Transforms the data in the observable to a different data
example we can transform an observable sending strings to and observable with numbers of the length of the strings
common usage is to query the server and map the response to class instances
import {Subject} from "rxjs/Subject";
import {map} from 'rxjs/operators'
const sub: Subject<string> = new Subject<string>();
sub.pipe(
map((str: string) => str.length)
).subscribe((val: number) => {
console.log(val);
});
sub.next("hello world");
sub.next("foo bar");
given an observable, this operator will accept a function which gets the data of the observable, and returns a new observable or a promise
import {mergeMap} from "rxjs/operators";
import {Observable} from "rxjs/Observable";
import "rxjs/add/observable/of";
const helloObservable: Observable<string> = Observable.of('hello');
helloObservable.pipe(
mergeMap((message: string) => Observable.of('world'))
).subscribe((message: string) => {
console.log(message);
});
select a property to emit
import {of} from "rxjs/observable/of";
import {Observable} from "rxjs/Observable";
import {pluck} from "rxjs/operators";
const objObservable: Observable<any> = of({hello: 'world'});
objObservable.pipe(
pluck('hello')
).subscribe((val: any) => {
console.log(val);
});
transparently perform action or side-effects when a stream emits value, or error or complete
import {Subject} from "rxjs/Subject";
import {tap} from "rxjs/operators";
const sub: Subject<number> = new Subject<number>();
sub.pipe(
tap((val: number) => console.log(`saving value: ` + val))
).subscribe((originalValue: number) => {
console.log(`original action: ` + originalValue);
});
sub.next(1);
sub.next(2);
sub.next(3);
emits a value with a delay.
common usage is in tests if you want to mimic server response
import {delay} from "rxjs/operators";
import {Subject} from "rxjs/Subject";
const sub: Subject<number> = new Subject<number>();
sub.pipe(
delay(500)
).subscribe((val: number) => {
console.log(val);
});
sub.next(0);
sub.next(1);
sub.next(2);
// after 500ms delay will emit all the values
Will be called when observable terminates on complete or error
import {finalize} from "rxjs/operators";
import {Subject} from "rxjs/Subject";
const sub: Subject<number> = new Subject<number>();
sub.pipe(
finalize(() => {
console.log('this will be called before subject completes')
})
).subscribe();
sub.next(0);
sub.next(1);
sub.next(2);
sub.next(3);
sub.complete();
We have a great arsenal to deal with async common actions using operators.
We tried to include examples in this article of the most commonly used operators.
Although RXJS is only at stage 0 currently, It will probably enter ES at some point, moreover since it's much more superior then the good old promises (plus it's super easy to transform promise to observable)
There is no reason why not to use RXJS to solve our async problems.