import { Observable } from '../Observable';
|
import { ObservableInput, SchedulerLike, ObservedValueOf } from '../types';
|
import { isScheduler } from '../util/isScheduler';
|
import { isArray } from '../util/isArray';
|
import { Subscriber } from '../Subscriber';
|
import { OuterSubscriber } from '../OuterSubscriber';
|
import { Operator } from '../Operator';
|
import { InnerSubscriber } from '../InnerSubscriber';
|
import { subscribeToResult } from '../util/subscribeToResult';
|
import { fromArray } from './fromArray';
|
|
const NONE = {};
|
|
/* tslint:disable:max-line-length */
|
|
// If called with a single array, it "auto-spreads" the array, with result selector
|
/** @deprecated resultSelector no longer supported, pipe to map instead */
|
export function combineLatest<O1 extends ObservableInput<any>, R>(sources: [O1], resultSelector: (v1: ObservedValueOf<O1>) => R, scheduler?: SchedulerLike): Observable<R>;
|
/** @deprecated resultSelector no longer supported, pipe to map instead */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, R>(sources: [O1, O2], resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>) => R, scheduler?: SchedulerLike): Observable<R>;
|
/** @deprecated resultSelector no longer supported, pipe to map instead */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, R>(sources: [O1, O2, O3], resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>) => R, scheduler?: SchedulerLike): Observable<R>;
|
/** @deprecated resultSelector no longer supported, pipe to map instead */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, R>(sources: [O1, O2, O3, O4], resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>, v4: ObservedValueOf<O4>) => R, scheduler?: SchedulerLike): Observable<R>;
|
/** @deprecated resultSelector no longer supported, pipe to map instead */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, R>(sources: [O1, O2, O3, O4, O5], resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>, v4: ObservedValueOf<O4>, v5: ObservedValueOf<O5>) => R, scheduler?: SchedulerLike): Observable<R>;
|
/** @deprecated resultSelector no longer supported, pipe to map instead */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>, R>(sources: [O1, O2, O3, O4, O5, O6], resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>, v4: ObservedValueOf<O4>, v5: ObservedValueOf<O5>, v6: ObservedValueOf<O6>) => R, scheduler?: SchedulerLike): Observable<R>;
|
/** @deprecated resultSelector no longer supported, pipe to map instead */
|
export function combineLatest<O extends ObservableInput<any>, R>(sources: O[], resultSelector: (...args: ObservedValueOf<O>[]) => R, scheduler?: SchedulerLike): Observable<R>;
|
|
// standard call, but with a result selector
|
/** @deprecated resultSelector no longer supported, pipe to map instead */
|
export function combineLatest<O1 extends ObservableInput<any>, R>(v1: O1, resultSelector: (v1: ObservedValueOf<O1>) => R, scheduler?: SchedulerLike): Observable<R>;
|
/** @deprecated resultSelector no longer supported, pipe to map instead */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, R>(v1: O1, v2: O2, resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>) => R, scheduler?: SchedulerLike): Observable<R>;
|
/** @deprecated resultSelector no longer supported, pipe to map instead */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, R>(v1: O1, v2: O2, v3: O3, resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>) => R, scheduler?: SchedulerLike): Observable<R>;
|
/** @deprecated resultSelector no longer supported, pipe to map instead */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, R>(v1: O1, v2: O2, v3: O3, v4: O4, resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>, v4: ObservedValueOf<O4>) => R, scheduler?: SchedulerLike): Observable<R>;
|
/** @deprecated resultSelector no longer supported, pipe to map instead */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, R>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>, v4: ObservedValueOf<O4>, v5: ObservedValueOf<O5>) => R, scheduler?: SchedulerLike): Observable<R>;
|
/** @deprecated resultSelector no longer supported, pipe to map instead */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>, R>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6, resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>, v4: ObservedValueOf<O4>, v5: ObservedValueOf<O5>, v6: ObservedValueOf<O6>) => R, scheduler?: SchedulerLike): Observable<R>;
|
|
// With a scheduler (deprecated)
|
/** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
|
export function combineLatest<O1 extends ObservableInput<any>>(sources: [O1], scheduler: SchedulerLike): Observable<[ObservedValueOf<O1>]>;
|
/** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>>(sources: [O1, O2], scheduler: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>]>;
|
/** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(sources: [O1, O2, O3], scheduler: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>]>;
|
/** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(sources: [O1, O2, O3, O4], scheduler: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>]>;
|
/** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>>(sources: [O1, O2, O3, O4, O5], scheduler: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>]>;
|
/** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(sources: [O1, O2, O3, O4, O5, O6], scheduler: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>, ObservedValueOf<O6>]>;
|
/** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
|
export function combineLatest<O extends ObservableInput<any>>(sources: O[], scheduler: SchedulerLike): Observable<ObservedValueOf<O>[]>;
|
|
// Best case
|
export function combineLatest<O1 extends ObservableInput<any>>(sources: [O1]): Observable<[ObservedValueOf<O1>]>;
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>>(sources: [O1, O2]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>]>;
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(sources: [O1, O2, O3]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>]>;
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(sources: [O1, O2, O3, O4]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>]>;
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>>(sources: [O1, O2, O3, O4, O5]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>]>;
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(sources: [O1, O2, O3, O4, O5, O6]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>, ObservedValueOf<O6>]>;
|
export function combineLatest<O extends ObservableInput<any>>(sources: O[]): Observable<ObservedValueOf<O>[]>;
|
|
// Standard calls
|
/** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
|
export function combineLatest<O1 extends ObservableInput<any>>(v1: O1, scheduler?: SchedulerLike): Observable<[ObservedValueOf<O1>]>;
|
/** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>>(v1: O1, v2: O2, scheduler?: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>]>;
|
/** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, scheduler?: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>]>;
|
/** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, scheduler?: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>]>;
|
/** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, scheduler?: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>]>;
|
/** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
|
export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6, scheduler?: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>, ObservedValueOf<O6>]>;
|
|
/** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
|
export function combineLatest<O extends ObservableInput<any>>(...observables: O[]): Observable<any[]>;
|
|
/** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
|
export function combineLatest<O extends ObservableInput<any>, R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): Observable<R>;
|
|
/** @deprecated resultSelector no longer supported, pipe to map instead */
|
export function combineLatest<O extends ObservableInput<any>, R>(array: O[], resultSelector: (...values: ObservedValueOf<O>[]) => R, scheduler?: SchedulerLike): Observable<R>;
|
|
/** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
|
export function combineLatest<O extends ObservableInput<any>>(...observables: Array<O | SchedulerLike>): Observable<any[]>;
|
|
/** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
|
export function combineLatest<O extends ObservableInput<any>, R>(...observables: Array<O | ((...values: ObservedValueOf<O>[]) => R) | SchedulerLike>): Observable<R>;
|
|
/** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
|
export function combineLatest<R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R) | SchedulerLike>): Observable<R>;
|
/* tslint:enable:max-line-length */
|
|
/**
|
* Combines multiple Observables to create an Observable whose values are
|
* calculated from the latest values of each of its input Observables.
|
*
|
* <span class="informal">Whenever any input Observable emits a value, it
|
* computes a formula using the latest values from all the inputs, then emits
|
* the output of that formula.</span>
|
*
|
* 
|
*
|
* `combineLatest` combines the values from all the Observables passed as
|
* arguments. This is done by subscribing to each Observable in order and,
|
* whenever any Observable emits, collecting an array of the most recent
|
* values from each Observable. So if you pass `n` Observables to operator,
|
* returned Observable will always emit an array of `n` values, in order
|
* corresponding to order of passed Observables (value from the first Observable
|
* on the first place and so on).
|
*
|
* Static version of `combineLatest` accepts either an array of Observables
|
* or each Observable can be put directly as an argument. Note that array of
|
* Observables is good choice, if you don't know beforehand how many Observables
|
* you will combine. Passing empty array will result in Observable that
|
* completes immediately.
|
*
|
* To ensure output array has always the same length, `combineLatest` will
|
* actually wait for all input Observables to emit at least once,
|
* before it starts emitting results. This means if some Observable emits
|
* values before other Observables started emitting, all these values but the last
|
* will be lost. On the other hand, if some Observable does not emit a value but
|
* completes, resulting Observable will complete at the same moment without
|
* emitting anything, since it will be now impossible to include value from
|
* completed Observable in resulting array. Also, if some input Observable does
|
* not emit any value and never completes, `combineLatest` will also never emit
|
* and never complete, since, again, it will wait for all streams to emit some
|
* value.
|
*
|
* If at least one Observable was passed to `combineLatest` and all passed Observables
|
* emitted something, resulting Observable will complete when all combined
|
* streams complete. So even if some Observable completes, result of
|
* `combineLatest` will still emit values when other Observables do. In case
|
* of completed Observable, its value from now on will always be the last
|
* emitted value. On the other hand, if any Observable errors, `combineLatest`
|
* will error immediately as well, and all other Observables will be unsubscribed.
|
*
|
* `combineLatest` accepts as optional parameter `project` function, which takes
|
* as arguments all values that would normally be emitted by resulting Observable.
|
* `project` can return any kind of value, which will be then emitted by Observable
|
* instead of default array. Note that `project` does not take as argument that array
|
* of values, but values themselves. That means default `project` can be imagined
|
* as function that takes all its arguments and puts them into an array.
|
*
|
* ## Examples
|
* ### Combine two timer Observables
|
* ```ts
|
* import { combineLatest, timer } from 'rxjs';
|
*
|
* const firstTimer = timer(0, 1000); // emit 0, 1, 2... after every second, starting from now
|
* const secondTimer = timer(500, 1000); // emit 0, 1, 2... after every second, starting 0,5s from now
|
* const combinedTimers = combineLatest(firstTimer, secondTimer);
|
* combinedTimers.subscribe(value => console.log(value));
|
* // Logs
|
* // [0, 0] after 0.5s
|
* // [1, 0] after 1s
|
* // [1, 1] after 1.5s
|
* // [2, 1] after 2s
|
* ```
|
*
|
* ### Combine an array of Observables
|
* ```ts
|
* import { combineLatest, of } from 'rxjs';
|
* import { delay, starWith } from 'rxjs/operators';
|
*
|
* const observables = [1, 5, 10].map(
|
* n => of(n).pipe(
|
* delay(n * 1000), // emit 0 and then emit n after n seconds
|
* startWith(0),
|
* )
|
* );
|
* const combined = combineLatest(observables);
|
* combined.subscribe(value => console.log(value));
|
* // Logs
|
* // [0, 0, 0] immediately
|
* // [1, 0, 0] after 1s
|
* // [1, 5, 0] after 5s
|
* // [1, 5, 10] after 10s
|
* ```
|
*
|
*
|
* ### Use project function to dynamically calculate the Body-Mass Index
|
* ```ts
|
* import { combineLatest, of } from 'rxjs';
|
* import { map } from 'rxjs/operators';
|
*
|
* const weight = of(70, 72, 76, 79, 75);
|
* const height = of(1.76, 1.77, 1.78);
|
* const bmi = combineLatest(weight, height).pipe(
|
* map(([w, h]) => w / (h * h)),
|
* );
|
* bmi.subscribe(x => console.log('BMI is ' + x));
|
*
|
* // With output to console:
|
* // BMI is 24.212293388429753
|
* // BMI is 23.93948099205209
|
* // BMI is 23.671253629592222
|
* ```
|
*
|
* @see {@link combineAll}
|
* @see {@link merge}
|
* @see {@link withLatestFrom}
|
*
|
* @param {ObservableInput} observable1 An input Observable to combine with other Observables.
|
* @param {ObservableInput} observable2 An input Observable to combine with other Observables.
|
* More than one input Observables may be given as arguments
|
* or an array of Observables may be given as the first argument.
|
* @param {function} [project] An optional function to project the values from
|
* the combined latest values into a new value on the output Observable.
|
* @param {SchedulerLike} [scheduler=null] The {@link SchedulerLike} to use for subscribing to
|
* each input Observable.
|
* @return {Observable} An Observable of projected values from the most recent
|
* values from each input Observable, or an array of the most recent values from
|
* each input Observable.
|
*/
|
export function combineLatest<O extends ObservableInput<any>, R>(
|
...observables: (O | ((...values: ObservedValueOf<O>[]) => R) | SchedulerLike)[]
|
): Observable<R> {
|
let resultSelector: ((...values: Array<any>) => R) | undefined = undefined;
|
let scheduler: SchedulerLike|undefined = undefined;
|
|
if (isScheduler(observables[observables.length - 1])) {
|
scheduler = observables.pop() as SchedulerLike;
|
}
|
|
if (typeof observables[observables.length - 1] === 'function') {
|
resultSelector = observables.pop() as (...values: Array<any>) => R;
|
}
|
|
// if the first and only other argument besides the resultSelector is an array
|
// assume it's been called with `combineLatest([obs1, obs2, obs3], resultSelector)`
|
if (observables.length === 1 && isArray(observables[0])) {
|
observables = observables[0] as any;
|
}
|
|
return fromArray(observables, scheduler).lift(new CombineLatestOperator(resultSelector));
|
}
|
|
export class CombineLatestOperator<T, R> implements Operator<T, R> {
|
constructor(private resultSelector?: (...values: Array<any>) => R) {
|
}
|
|
call(subscriber: Subscriber<R>, source: any): any {
|
return source.subscribe(new CombineLatestSubscriber(subscriber, this.resultSelector));
|
}
|
}
|
|
/**
|
* We need this JSDoc comment for affecting ESDoc.
|
* @ignore
|
* @extends {Ignored}
|
*/
|
export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> {
|
private active: number = 0;
|
private values: any[] = [];
|
private observables: any[] = [];
|
private toRespond?: number;
|
|
constructor(destination: Subscriber<R>, private resultSelector?: (...values: Array<any>) => R) {
|
super(destination);
|
}
|
|
protected _next(observable: any) {
|
this.values.push(NONE);
|
this.observables.push(observable);
|
}
|
|
protected _complete() {
|
const observables = this.observables;
|
const len = observables.length;
|
if (len === 0) {
|
this.destination.complete!();
|
} else {
|
this.active = len;
|
this.toRespond = len;
|
for (let i = 0; i < len; i++) {
|
const observable = observables[i];
|
this.add(subscribeToResult(this, observable, undefined, i));
|
}
|
}
|
}
|
|
notifyComplete(unused: Subscriber<R>): void {
|
if ((this.active -= 1) === 0) {
|
this.destination.complete!();
|
}
|
}
|
|
notifyNext(_outerValue: T, innerValue: R,
|
outerIndex: number): void {
|
const values = this.values;
|
const oldVal = values[outerIndex];
|
const toRespond = !this.toRespond
|
? 0
|
: oldVal === NONE ? --this.toRespond : this.toRespond;
|
values[outerIndex] = innerValue;
|
|
if (toRespond === 0) {
|
if (this.resultSelector) {
|
this._tryResultSelector(values);
|
} else {
|
this.destination.next!(values.slice());
|
}
|
}
|
}
|
|
private _tryResultSelector(values: any[]) {
|
let result: any;
|
try {
|
result = this.resultSelector!.apply(this, values);
|
} catch (err) {
|
this.destination.error!(err);
|
return;
|
}
|
this.destination.next!(result);
|
}
|
}
|