diff --git a/src/internal/operators/skipLast.ts b/src/internal/operators/skipLast.ts index b9ddd1c26c..ed9e5dc8a3 100644 --- a/src/internal/operators/skipLast.ts +++ b/src/internal/operators/skipLast.ts @@ -4,27 +4,37 @@ import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; /** - * Skip the last `count` values emitted by the source Observable. + * Skip a specified number of values before the completion of an observable. * * ![](skipLast.png) * - * `skipLast` returns an Observable that accumulates a queue with a length - * enough to store the first `count` values. As more values are received, - * values are taken from the front of the queue and produced on the result - * sequence. This causes values to be delayed. + * Returns an observable that will emit values as soon as it can, given a number of + * skipped values. For example, if you `skipLast(3)` on a source, when the source + * emits its fourth value, the first value the source emitted will finally be emitted + * from the returned observable, as it is no longer part of what needs to be skipped. + * + * All values emitted by the result of `skipLast(N)` will be delayed by `N` emissions, + * as each value is held in a buffer until enough values have been emitted that that + * the buffered value may finally be sent to the consumer. + * + * After subscribing, unsubscribing will not result in the emission of the buffered + * skipped values. * * ## Example - * Skip the last 2 values of an Observable with many values + * + * Skip the last 2 values of an observable with many values + * * ```ts - * import { range } from 'rxjs'; + * import { of } from 'rxjs'; * import { skipLast } from 'rxjs/operators'; * - * const many = range(1, 5); - * const skipLastTwo = many.pipe(skipLast(2)); + * const numbers = of(1, 2, 3, 4, 5); + * const skipLastTwo = numbers.pipe(skipLast(2)); * skipLastTwo.subscribe(x => console.log(x)); * * // Results in: * // 1 2 3 + * // (4 and 5 are skipped) * ``` * * @see {@link skip} @@ -32,47 +42,54 @@ import { OperatorSubscriber } from './OperatorSubscriber'; * @see {@link skipWhile} * @see {@link take} * - * @throws {ArgumentOutOfRangeError} When using `skipLast(i)`, it throws - * ArgumentOutOfRangeError if `i < 0`. - * - * @param {number} skipCount Number of elements to skip from the end of the source Observable. - * @returns {Observable} An Observable that skips the last count values - * emitted by the source Observable. + * @param skipCount Number of elements to skip from the end of the source Observable. + * @returns An Observable that skips the last count values emitted by the source Observable. */ export function skipLast(skipCount: number): MonoTypeOperatorFunction { - // For skipCounts less than or equal to zero, we are just mirroring the source. return skipCount <= 0 - ? identity + ? // For skipCounts less than or equal to zero, we are just mirroring the source. + identity : operate((source, subscriber) => { // A ring buffer to hold the values while we wait to see // if we can emit it or it's part of the "skipped" last values. // Note that it is the _same size_ as the skip count. let ring: T[] = new Array(skipCount); - let count = 0; + // The number of values seen so far. This is used to get + // the index of the current value when it arrives. + let seen = 0; source.subscribe( - new OperatorSubscriber( - subscriber, - (value) => { - // Move us to the next slot in the ring buffer. - const currentCount = count++; - if (currentCount < skipCount) { - // Fill the ring first - ring[currentCount] = value; - } else { - const index = currentCount % skipCount; - // Pull the oldest value out and emit it, - // then stuff the new value in it's place. - const oldValue = ring[index]; - ring[index] = value; - subscriber.next(oldValue); - } - }, - undefined, - undefined, - () => - // Free up memory - (ring = null!) - ) + new OperatorSubscriber(subscriber, (value) => { + // Get the index of the value we have right now + // relative to all other values we've seen, then + // increment `seen`. This ensures we've moved to + // the next slot in our ring buffer. + const valueIndex = seen++; + if (valueIndex < skipCount) { + // If we haven't seen enough values to fill our buffer yet, + // Then we aren't to a number of seen values where we can + // emit anything, so let's just start by filling the ring buffer. + ring[valueIndex] = value; + } else { + // We are traversing over the ring array in such + // a way that when we get to the end, we loop back + // and go to the start. + const index = valueIndex % skipCount; + // Pull the oldest value out so we can emit it, + // and stuff the new value in it's place. + const oldValue = ring[index]; + ring[index] = value; + // Emit the old value. It is important that this happens + // after we swap the value in the buffer, if it happens + // before we swap the value in the buffer, then a synchronous + // source can get the buffer out of whack. + subscriber.next(oldValue); + } + }) ); + + return () => { + // Release our values in memory + ring = null!; + }; }); }