esnext.observable.js 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. 'use strict';
  2. // https://github.com/tc39/proposal-observable
  3. var $ = require('../internals/export');
  4. var global = require('../internals/global');
  5. var call = require('../internals/function-call');
  6. var DESCRIPTORS = require('../internals/descriptors');
  7. var setSpecies = require('../internals/set-species');
  8. var aCallable = require('../internals/a-callable');
  9. var isCallable = require('../internals/is-callable');
  10. var isConstructor = require('../internals/is-constructor');
  11. var anObject = require('../internals/an-object');
  12. var isObject = require('../internals/is-object');
  13. var anInstance = require('../internals/an-instance');
  14. var defineProperty = require('../internals/object-define-property').f;
  15. var redefine = require('../internals/redefine');
  16. var redefineAll = require('../internals/redefine-all');
  17. var getIterator = require('../internals/get-iterator');
  18. var getMethod = require('../internals/get-method');
  19. var iterate = require('../internals/iterate');
  20. var hostReportErrors = require('../internals/host-report-errors');
  21. var wellKnownSymbol = require('../internals/well-known-symbol');
  22. var InternalStateModule = require('../internals/internal-state');
  23. var $$OBSERVABLE = wellKnownSymbol('observable');
  24. var OBSERVABLE = 'Observable';
  25. var SUBSCRIPTION = 'Subscription';
  26. var SUBSCRIPTION_OBSERVER = 'SubscriptionObserver';
  27. var getterFor = InternalStateModule.getterFor;
  28. var setInternalState = InternalStateModule.set;
  29. var getObservableInternalState = getterFor(OBSERVABLE);
  30. var getSubscriptionInternalState = getterFor(SUBSCRIPTION);
  31. var getSubscriptionObserverInternalState = getterFor(SUBSCRIPTION_OBSERVER);
  32. var Array = global.Array;
  33. var NativeObservable = global.Observable;
  34. var NativeObservablePrototype = NativeObservable && NativeObservable.prototype;
  35. var FORCED = !isCallable(NativeObservable)
  36. || !isCallable(NativeObservable.from)
  37. || !isCallable(NativeObservable.of)
  38. || !isCallable(NativeObservablePrototype.subscribe)
  39. || !isCallable(NativeObservablePrototype[$$OBSERVABLE]);
  40. var SubscriptionState = function (observer) {
  41. this.observer = anObject(observer);
  42. this.cleanup = undefined;
  43. this.subscriptionObserver = undefined;
  44. };
  45. SubscriptionState.prototype = {
  46. type: SUBSCRIPTION,
  47. clean: function () {
  48. var cleanup = this.cleanup;
  49. if (cleanup) {
  50. this.cleanup = undefined;
  51. try {
  52. cleanup();
  53. } catch (error) {
  54. hostReportErrors(error);
  55. }
  56. }
  57. },
  58. close: function () {
  59. if (!DESCRIPTORS) {
  60. var subscription = this.facade;
  61. var subscriptionObserver = this.subscriptionObserver;
  62. subscription.closed = true;
  63. if (subscriptionObserver) subscriptionObserver.closed = true;
  64. } this.observer = undefined;
  65. },
  66. isClosed: function () {
  67. return this.observer === undefined;
  68. }
  69. };
  70. var Subscription = function (observer, subscriber) {
  71. var subscriptionState = setInternalState(this, new SubscriptionState(observer));
  72. var start;
  73. if (!DESCRIPTORS) this.closed = false;
  74. try {
  75. if (start = getMethod(observer, 'start')) call(start, observer, this);
  76. } catch (error) {
  77. hostReportErrors(error);
  78. }
  79. if (subscriptionState.isClosed()) return;
  80. var subscriptionObserver = subscriptionState.subscriptionObserver = new SubscriptionObserver(subscriptionState);
  81. try {
  82. var cleanup = subscriber(subscriptionObserver);
  83. var subscription = cleanup;
  84. if (cleanup != null) subscriptionState.cleanup = isCallable(cleanup.unsubscribe)
  85. ? function () { subscription.unsubscribe(); }
  86. : aCallable(cleanup);
  87. } catch (error) {
  88. subscriptionObserver.error(error);
  89. return;
  90. } if (subscriptionState.isClosed()) subscriptionState.clean();
  91. };
  92. Subscription.prototype = redefineAll({}, {
  93. unsubscribe: function unsubscribe() {
  94. var subscriptionState = getSubscriptionInternalState(this);
  95. if (!subscriptionState.isClosed()) {
  96. subscriptionState.close();
  97. subscriptionState.clean();
  98. }
  99. }
  100. });
  101. if (DESCRIPTORS) defineProperty(Subscription.prototype, 'closed', {
  102. configurable: true,
  103. get: function () {
  104. return getSubscriptionInternalState(this).isClosed();
  105. }
  106. });
  107. var SubscriptionObserver = function (subscriptionState) {
  108. setInternalState(this, {
  109. type: SUBSCRIPTION_OBSERVER,
  110. subscriptionState: subscriptionState
  111. });
  112. if (!DESCRIPTORS) this.closed = false;
  113. };
  114. SubscriptionObserver.prototype = redefineAll({}, {
  115. next: function next(value) {
  116. var subscriptionState = getSubscriptionObserverInternalState(this).subscriptionState;
  117. if (!subscriptionState.isClosed()) {
  118. var observer = subscriptionState.observer;
  119. try {
  120. var nextMethod = getMethod(observer, 'next');
  121. if (nextMethod) call(nextMethod, observer, value);
  122. } catch (error) {
  123. hostReportErrors(error);
  124. }
  125. }
  126. },
  127. error: function error(value) {
  128. var subscriptionState = getSubscriptionObserverInternalState(this).subscriptionState;
  129. if (!subscriptionState.isClosed()) {
  130. var observer = subscriptionState.observer;
  131. subscriptionState.close();
  132. try {
  133. var errorMethod = getMethod(observer, 'error');
  134. if (errorMethod) call(errorMethod, observer, value);
  135. else hostReportErrors(value);
  136. } catch (err) {
  137. hostReportErrors(err);
  138. } subscriptionState.clean();
  139. }
  140. },
  141. complete: function complete() {
  142. var subscriptionState = getSubscriptionObserverInternalState(this).subscriptionState;
  143. if (!subscriptionState.isClosed()) {
  144. var observer = subscriptionState.observer;
  145. subscriptionState.close();
  146. try {
  147. var completeMethod = getMethod(observer, 'complete');
  148. if (completeMethod) call(completeMethod, observer);
  149. } catch (error) {
  150. hostReportErrors(error);
  151. } subscriptionState.clean();
  152. }
  153. }
  154. });
  155. if (DESCRIPTORS) defineProperty(SubscriptionObserver.prototype, 'closed', {
  156. configurable: true,
  157. get: function () {
  158. return getSubscriptionObserverInternalState(this).subscriptionState.isClosed();
  159. }
  160. });
  161. var $Observable = function Observable(subscriber) {
  162. anInstance(this, ObservablePrototype);
  163. setInternalState(this, {
  164. type: OBSERVABLE,
  165. subscriber: aCallable(subscriber)
  166. });
  167. };
  168. var ObservablePrototype = $Observable.prototype;
  169. redefineAll(ObservablePrototype, {
  170. subscribe: function subscribe(observer) {
  171. var length = arguments.length;
  172. return new Subscription(isCallable(observer) ? {
  173. next: observer,
  174. error: length > 1 ? arguments[1] : undefined,
  175. complete: length > 2 ? arguments[2] : undefined
  176. } : isObject(observer) ? observer : {}, getObservableInternalState(this).subscriber);
  177. }
  178. });
  179. redefineAll($Observable, {
  180. from: function from(x) {
  181. var C = isConstructor(this) ? this : $Observable;
  182. var observableMethod = getMethod(anObject(x), $$OBSERVABLE);
  183. if (observableMethod) {
  184. var observable = anObject(call(observableMethod, x));
  185. return observable.constructor === C ? observable : new C(function (observer) {
  186. return observable.subscribe(observer);
  187. });
  188. }
  189. var iterator = getIterator(x);
  190. return new C(function (observer) {
  191. iterate(iterator, function (it, stop) {
  192. observer.next(it);
  193. if (observer.closed) return stop();
  194. }, { IS_ITERATOR: true, INTERRUPTED: true });
  195. observer.complete();
  196. });
  197. },
  198. of: function of() {
  199. var C = isConstructor(this) ? this : $Observable;
  200. var length = arguments.length;
  201. var items = Array(length);
  202. var index = 0;
  203. while (index < length) items[index] = arguments[index++];
  204. return new C(function (observer) {
  205. for (var i = 0; i < length; i++) {
  206. observer.next(items[i]);
  207. if (observer.closed) return;
  208. } observer.complete();
  209. });
  210. }
  211. });
  212. redefine(ObservablePrototype, $$OBSERVABLE, function () { return this; });
  213. $({ global: true, forced: FORCED }, {
  214. Observable: $Observable
  215. });
  216. setSpecies(OBSERVABLE);