import { Subject } from '../Subject';
|
import { Operator } from '../Operator';
|
import { Subscriber } from '../Subscriber';
|
import { Observable } from '../Observable';
|
import { ConnectableObservable, connectableObservableDescriptor } from '../observable/ConnectableObservable';
|
import { MonoTypeOperatorFunction, OperatorFunction, UnaryFunction, ObservedValueOf, ObservableInput } from '../types';
|
|
/* tslint:disable:max-line-length */
|
export function multicast<T>(subject: Subject<T>): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
|
export function multicast<T, O extends ObservableInput<any>>(subject: Subject<T>, selector: (shared: Observable<T>) => O): UnaryFunction<Observable<T>, ConnectableObservable<ObservedValueOf<O>>>;
|
export function multicast<T>(subjectFactory: (this: Observable<T>) => Subject<T>): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
|
export function multicast<T, O extends ObservableInput<any>>(SubjectFactory: (this: Observable<T>) => Subject<T>, selector: (shared: Observable<T>) => O): OperatorFunction<T, ObservedValueOf<O>>;
|
/* tslint:enable:max-line-length */
|
|
/**
|
* Returns an Observable that emits the results of invoking a specified selector on items
|
* emitted by a ConnectableObservable that shares a single subscription to the underlying stream.
|
*
|
* 
|
*
|
* @param {Function|Subject} subjectOrSubjectFactory - Factory function to create an intermediate subject through
|
* which the source sequence's elements will be multicast to the selector function
|
* or Subject to push source elements into.
|
* @param {Function} [selector] - Optional selector function that can use the multicasted source stream
|
* as many times as needed, without causing multiple subscriptions to the source stream.
|
* Subscribers to the given source will receive all notifications of the source from the
|
* time of the subscription forward.
|
* @return {Observable} An Observable that emits the results of invoking the selector
|
* on the items emitted by a `ConnectableObservable` that shares a single subscription to
|
* the underlying stream.
|
* @method multicast
|
* @owner Observable
|
*/
|
export function multicast<T, R>(subjectOrSubjectFactory: Subject<T> | (() => Subject<T>),
|
selector?: (source: Observable<T>) => Observable<R>): OperatorFunction<T, R> {
|
return function multicastOperatorFunction(source: Observable<T>): Observable<R> {
|
let subjectFactory: () => Subject<T>;
|
if (typeof subjectOrSubjectFactory === 'function') {
|
subjectFactory = <() => Subject<T>>subjectOrSubjectFactory;
|
} else {
|
subjectFactory = function subjectFactory() {
|
return <Subject<T>>subjectOrSubjectFactory;
|
};
|
}
|
|
if (typeof selector === 'function') {
|
return source.lift(new MulticastOperator(subjectFactory, selector));
|
}
|
|
const connectable: any = Object.create(source, connectableObservableDescriptor);
|
connectable.source = source;
|
connectable.subjectFactory = subjectFactory;
|
|
return <ConnectableObservable<R>> connectable;
|
};
|
}
|
|
export class MulticastOperator<T, R> implements Operator<T, R> {
|
constructor(private subjectFactory: () => Subject<T>,
|
private selector: (source: Observable<T>) => Observable<R>) {
|
}
|
call(subscriber: Subscriber<R>, source: any): any {
|
const { selector } = this;
|
const subject = this.subjectFactory();
|
const subscription = selector(subject).subscribe(subscriber);
|
subscription.add(source.subscribe(subject));
|
return subscription;
|
}
|
}
|