bufferTime.js 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. /** PURE_IMPORTS_START tslib,_scheduler_async,_Subscriber,_util_isScheduler PURE_IMPORTS_END */
  2. import * as tslib_1 from "tslib";
  3. import { async } from '../scheduler/async';
  4. import { Subscriber } from '../Subscriber';
  5. import { isScheduler } from '../util/isScheduler';
  6. export function bufferTime(bufferTimeSpan) {
  7. var length = arguments.length;
  8. var scheduler = async;
  9. if (isScheduler(arguments[arguments.length - 1])) {
  10. scheduler = arguments[arguments.length - 1];
  11. length--;
  12. }
  13. var bufferCreationInterval = null;
  14. if (length >= 2) {
  15. bufferCreationInterval = arguments[1];
  16. }
  17. var maxBufferSize = Number.POSITIVE_INFINITY;
  18. if (length >= 3) {
  19. maxBufferSize = arguments[2];
  20. }
  21. return function bufferTimeOperatorFunction(source) {
  22. return source.lift(new BufferTimeOperator(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler));
  23. };
  24. }
  25. var BufferTimeOperator = /*@__PURE__*/ (function () {
  26. function BufferTimeOperator(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler) {
  27. this.bufferTimeSpan = bufferTimeSpan;
  28. this.bufferCreationInterval = bufferCreationInterval;
  29. this.maxBufferSize = maxBufferSize;
  30. this.scheduler = scheduler;
  31. }
  32. BufferTimeOperator.prototype.call = function (subscriber, source) {
  33. return source.subscribe(new BufferTimeSubscriber(subscriber, this.bufferTimeSpan, this.bufferCreationInterval, this.maxBufferSize, this.scheduler));
  34. };
  35. return BufferTimeOperator;
  36. }());
  37. var Context = /*@__PURE__*/ (function () {
  38. function Context() {
  39. this.buffer = [];
  40. }
  41. return Context;
  42. }());
  43. var BufferTimeSubscriber = /*@__PURE__*/ (function (_super) {
  44. tslib_1.__extends(BufferTimeSubscriber, _super);
  45. function BufferTimeSubscriber(destination, bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler) {
  46. var _this = _super.call(this, destination) || this;
  47. _this.bufferTimeSpan = bufferTimeSpan;
  48. _this.bufferCreationInterval = bufferCreationInterval;
  49. _this.maxBufferSize = maxBufferSize;
  50. _this.scheduler = scheduler;
  51. _this.contexts = [];
  52. var context = _this.openContext();
  53. _this.timespanOnly = bufferCreationInterval == null || bufferCreationInterval < 0;
  54. if (_this.timespanOnly) {
  55. var timeSpanOnlyState = { subscriber: _this, context: context, bufferTimeSpan: bufferTimeSpan };
  56. _this.add(context.closeAction = scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
  57. }
  58. else {
  59. var closeState = { subscriber: _this, context: context };
  60. var creationState = { bufferTimeSpan: bufferTimeSpan, bufferCreationInterval: bufferCreationInterval, subscriber: _this, scheduler: scheduler };
  61. _this.add(context.closeAction = scheduler.schedule(dispatchBufferClose, bufferTimeSpan, closeState));
  62. _this.add(scheduler.schedule(dispatchBufferCreation, bufferCreationInterval, creationState));
  63. }
  64. return _this;
  65. }
  66. BufferTimeSubscriber.prototype._next = function (value) {
  67. var contexts = this.contexts;
  68. var len = contexts.length;
  69. var filledBufferContext;
  70. for (var i = 0; i < len; i++) {
  71. var context_1 = contexts[i];
  72. var buffer = context_1.buffer;
  73. buffer.push(value);
  74. if (buffer.length == this.maxBufferSize) {
  75. filledBufferContext = context_1;
  76. }
  77. }
  78. if (filledBufferContext) {
  79. this.onBufferFull(filledBufferContext);
  80. }
  81. };
  82. BufferTimeSubscriber.prototype._error = function (err) {
  83. this.contexts.length = 0;
  84. _super.prototype._error.call(this, err);
  85. };
  86. BufferTimeSubscriber.prototype._complete = function () {
  87. var _a = this, contexts = _a.contexts, destination = _a.destination;
  88. while (contexts.length > 0) {
  89. var context_2 = contexts.shift();
  90. destination.next(context_2.buffer);
  91. }
  92. _super.prototype._complete.call(this);
  93. };
  94. BufferTimeSubscriber.prototype._unsubscribe = function () {
  95. this.contexts = null;
  96. };
  97. BufferTimeSubscriber.prototype.onBufferFull = function (context) {
  98. this.closeContext(context);
  99. var closeAction = context.closeAction;
  100. closeAction.unsubscribe();
  101. this.remove(closeAction);
  102. if (!this.closed && this.timespanOnly) {
  103. context = this.openContext();
  104. var bufferTimeSpan = this.bufferTimeSpan;
  105. var timeSpanOnlyState = { subscriber: this, context: context, bufferTimeSpan: bufferTimeSpan };
  106. this.add(context.closeAction = this.scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
  107. }
  108. };
  109. BufferTimeSubscriber.prototype.openContext = function () {
  110. var context = new Context();
  111. this.contexts.push(context);
  112. return context;
  113. };
  114. BufferTimeSubscriber.prototype.closeContext = function (context) {
  115. this.destination.next(context.buffer);
  116. var contexts = this.contexts;
  117. var spliceIndex = contexts ? contexts.indexOf(context) : -1;
  118. if (spliceIndex >= 0) {
  119. contexts.splice(contexts.indexOf(context), 1);
  120. }
  121. };
  122. return BufferTimeSubscriber;
  123. }(Subscriber));
  124. function dispatchBufferTimeSpanOnly(state) {
  125. var subscriber = state.subscriber;
  126. var prevContext = state.context;
  127. if (prevContext) {
  128. subscriber.closeContext(prevContext);
  129. }
  130. if (!subscriber.closed) {
  131. state.context = subscriber.openContext();
  132. state.context.closeAction = this.schedule(state, state.bufferTimeSpan);
  133. }
  134. }
  135. function dispatchBufferCreation(state) {
  136. var bufferCreationInterval = state.bufferCreationInterval, bufferTimeSpan = state.bufferTimeSpan, subscriber = state.subscriber, scheduler = state.scheduler;
  137. var context = subscriber.openContext();
  138. var action = this;
  139. if (!subscriber.closed) {
  140. subscriber.add(context.closeAction = scheduler.schedule(dispatchBufferClose, bufferTimeSpan, { subscriber: subscriber, context: context }));
  141. action.schedule(state, bufferCreationInterval);
  142. }
  143. }
  144. function dispatchBufferClose(arg) {
  145. var subscriber = arg.subscriber, context = arg.context;
  146. subscriber.closeContext(context);
  147. }
  148. //# sourceMappingURL=bufferTime.js.map