| | |
| | | 'use strict'; |
| | | // https://github.com/tc39/proposal-observable |
| | | var $ = require('../internals/export'); |
| | | var DESCRIPTORS = require('../internals/descriptors'); |
| | | var setSpecies = require('../internals/set-species'); |
| | | var aCallable = require('../internals/a-callable'); |
| | | var isCallable = require('../internals/is-callable'); |
| | | var isConstructor = require('../internals/is-constructor'); |
| | | var anObject = require('../internals/an-object'); |
| | | var isObject = require('../internals/is-object'); |
| | | var anInstance = require('../internals/an-instance'); |
| | | var defineProperty = require('../internals/object-define-property').f; |
| | | var redefine = require('../internals/redefine'); |
| | | var redefineAll = require('../internals/redefine-all'); |
| | | var getIterator = require('../internals/get-iterator'); |
| | | var getMethod = require('../internals/get-method'); |
| | | var iterate = require('../internals/iterate'); |
| | | var hostReportErrors = require('../internals/host-report-errors'); |
| | | var wellKnownSymbol = require('../internals/well-known-symbol'); |
| | | var InternalStateModule = require('../internals/internal-state'); |
| | | |
| | | var OBSERVABLE = wellKnownSymbol('observable'); |
| | | var getInternalState = InternalStateModule.get; |
| | | var setInternalState = InternalStateModule.set; |
| | | |
| | | var cleanupSubscription = function (subscriptionState) { |
| | | var cleanup = subscriptionState.cleanup; |
| | | if (cleanup) { |
| | | subscriptionState.cleanup = undefined; |
| | | try { |
| | | cleanup(); |
| | | } catch (error) { |
| | | hostReportErrors(error); |
| | | } |
| | | } |
| | | }; |
| | | |
| | | var subscriptionClosed = function (subscriptionState) { |
| | | return subscriptionState.observer === undefined; |
| | | }; |
| | | |
| | | var close = function (subscriptionState) { |
| | | var subscription = subscriptionState.facade; |
| | | if (!DESCRIPTORS) { |
| | | subscription.closed = true; |
| | | var subscriptionObserver = subscriptionState.subscriptionObserver; |
| | | if (subscriptionObserver) subscriptionObserver.closed = true; |
| | | } subscriptionState.observer = undefined; |
| | | }; |
| | | |
| | | var Subscription = function (observer, subscriber) { |
| | | var subscriptionState = setInternalState(this, { |
| | | cleanup: undefined, |
| | | observer: anObject(observer), |
| | | subscriptionObserver: undefined |
| | | }); |
| | | var start; |
| | | if (!DESCRIPTORS) this.closed = false; |
| | | try { |
| | | if (start = getMethod(observer, 'start')) start.call(observer, this); |
| | | } catch (error) { |
| | | hostReportErrors(error); |
| | | } |
| | | if (subscriptionClosed(subscriptionState)) return; |
| | | var subscriptionObserver = subscriptionState.subscriptionObserver = new SubscriptionObserver(this); |
| | | try { |
| | | var cleanup = subscriber(subscriptionObserver); |
| | | var subscription = cleanup; |
| | | if (cleanup != null) subscriptionState.cleanup = isCallable(cleanup.unsubscribe) |
| | | ? function () { subscription.unsubscribe(); } |
| | | : aCallable(cleanup); |
| | | } catch (error) { |
| | | subscriptionObserver.error(error); |
| | | return; |
| | | } if (subscriptionClosed(subscriptionState)) cleanupSubscription(subscriptionState); |
| | | }; |
| | | |
| | | Subscription.prototype = redefineAll({}, { |
| | | unsubscribe: function unsubscribe() { |
| | | var subscriptionState = getInternalState(this); |
| | | if (!subscriptionClosed(subscriptionState)) { |
| | | close(subscriptionState); |
| | | cleanupSubscription(subscriptionState); |
| | | } |
| | | } |
| | | }); |
| | | |
| | | if (DESCRIPTORS) defineProperty(Subscription.prototype, 'closed', { |
| | | configurable: true, |
| | | get: function () { |
| | | return subscriptionClosed(getInternalState(this)); |
| | | } |
| | | }); |
| | | |
| | | var SubscriptionObserver = function (subscription) { |
| | | setInternalState(this, { subscription: subscription }); |
| | | if (!DESCRIPTORS) this.closed = false; |
| | | }; |
| | | |
| | | SubscriptionObserver.prototype = redefineAll({}, { |
| | | next: function next(value) { |
| | | var subscriptionState = getInternalState(getInternalState(this).subscription); |
| | | if (!subscriptionClosed(subscriptionState)) { |
| | | var observer = subscriptionState.observer; |
| | | try { |
| | | var nextMethod = getMethod(observer, 'next'); |
| | | if (nextMethod) nextMethod.call(observer, value); |
| | | } catch (error) { |
| | | hostReportErrors(error); |
| | | } |
| | | } |
| | | }, |
| | | error: function error(value) { |
| | | var subscriptionState = getInternalState(getInternalState(this).subscription); |
| | | if (!subscriptionClosed(subscriptionState)) { |
| | | var observer = subscriptionState.observer; |
| | | close(subscriptionState); |
| | | try { |
| | | var errorMethod = getMethod(observer, 'error'); |
| | | if (errorMethod) errorMethod.call(observer, value); |
| | | else hostReportErrors(value); |
| | | } catch (err) { |
| | | hostReportErrors(err); |
| | | } cleanupSubscription(subscriptionState); |
| | | } |
| | | }, |
| | | complete: function complete() { |
| | | var subscriptionState = getInternalState(getInternalState(this).subscription); |
| | | if (!subscriptionClosed(subscriptionState)) { |
| | | var observer = subscriptionState.observer; |
| | | close(subscriptionState); |
| | | try { |
| | | var completeMethod = getMethod(observer, 'complete'); |
| | | if (completeMethod) completeMethod.call(observer); |
| | | } catch (error) { |
| | | hostReportErrors(error); |
| | | } cleanupSubscription(subscriptionState); |
| | | } |
| | | } |
| | | }); |
| | | |
| | | if (DESCRIPTORS) defineProperty(SubscriptionObserver.prototype, 'closed', { |
| | | configurable: true, |
| | | get: function () { |
| | | return subscriptionClosed(getInternalState(getInternalState(this).subscription)); |
| | | } |
| | | }); |
| | | |
| | | var $Observable = function Observable(subscriber) { |
| | | anInstance(this, $Observable, 'Observable'); |
| | | setInternalState(this, { subscriber: aCallable(subscriber) }); |
| | | }; |
| | | |
| | | redefineAll($Observable.prototype, { |
| | | subscribe: function subscribe(observer) { |
| | | var length = arguments.length; |
| | | return new Subscription(isCallable(observer) ? { |
| | | next: observer, |
| | | error: length > 1 ? arguments[1] : undefined, |
| | | complete: length > 2 ? arguments[2] : undefined |
| | | } : isObject(observer) ? observer : {}, getInternalState(this).subscriber); |
| | | } |
| | | }); |
| | | |
| | | redefineAll($Observable, { |
| | | from: function from(x) { |
| | | var C = isConstructor(this) ? this : $Observable; |
| | | var observableMethod = getMethod(anObject(x), OBSERVABLE); |
| | | if (observableMethod) { |
| | | var observable = anObject(observableMethod.call(x)); |
| | | return observable.constructor === C ? observable : new C(function (observer) { |
| | | return observable.subscribe(observer); |
| | | }); |
| | | } |
| | | var iterator = getIterator(x); |
| | | return new C(function (observer) { |
| | | iterate(iterator, function (it, stop) { |
| | | observer.next(it); |
| | | if (observer.closed) return stop(); |
| | | }, { IS_ITERATOR: true, INTERRUPTED: true }); |
| | | observer.complete(); |
| | | }); |
| | | }, |
| | | of: function of() { |
| | | var C = isConstructor(this) ? this : $Observable; |
| | | var length = arguments.length; |
| | | var items = new Array(length); |
| | | var index = 0; |
| | | while (index < length) items[index] = arguments[index++]; |
| | | return new C(function (observer) { |
| | | for (var i = 0; i < length; i++) { |
| | | observer.next(items[i]); |
| | | if (observer.closed) return; |
| | | } observer.complete(); |
| | | }); |
| | | } |
| | | }); |
| | | |
| | | redefine($Observable.prototype, OBSERVABLE, function () { return this; }); |
| | | |
| | | $({ global: true }, { |
| | | Observable: $Observable |
| | | }); |
| | | |
| | | setSpecies('Observable'); |
| | | // TODO: Remove this module from `core-js@4` since it's split to modules listed below |
| | | require('../modules/esnext.observable.constructor'); |
| | | require('../modules/esnext.observable.from'); |
| | | require('../modules/esnext.observable.of'); |