[Fixed] How to add the latest emission to the already emitted set, creating a new observable containing both?

Issue

I have an observable that emits values every now and then. At the consumer, I get the latest value but I’d like to get said value as a part of (and including) the previously emitted ones. I get that I need to use switchMap to cancel the current observable and return a new one that contains all the historical emissions, adhering the latest one.

Starting with the identity mapping below, what operator should I use? I have tried with a bunch of different ones but didn’t really see a methodical way to narrow down the set of available choices.

const source = interval(3000);
const transform = source.pipe(switchMap(_ => of(_)));
const subscribe = transform.subscribe(val =>
  console.log("unchanged: " + val)
);

At the moment, the emitted values result a sequence 0, 1, 2, 3, 4, … but I’d like it to conserve the previously emitted values and build up an array amending the latest emission to it. So the final result would become [], [0], [0,1], [0,1,2], [0,1,2,3], [0,1,2,3,4], ….

Which RxJs operator is the proper one to use in switchMap(...)? Is there a better choice than switchMap(...)? Is the approach with pipe(...) appropriate at all?

StackBlitz

I thought I found a pre-existing answer but that’s not really what I’m helped by.

Edit

Based on the answers, I sense that scan or scanMap might be an appropriate choice. I didn’t realize it when I was checking the docs for it, which might be due to my confusion and lack of certainty.

I have the impression that creating a new observable for each new emitted value is to be preferred but I can’t tell how that would affect the performance etc. as I can’t see pros and cons of using scan versus switchScan.

Solution

Sounds like you could use switchScan which is a new operator since RxJS 7 (partly implemented by me :)) but it really depends on what exactly you want to do.

const source = interval(500);
const transform = source.pipe(
  switchScan((acc, num) => of([...acc, num]), []),
);

For most cases, even mergeScan does the job and even scan if there’s no requirement to return an observable. At the moment, only RxJs 6 is publicly documented making the plain and merge versions more recognizable.

const source = interval(500);
const transform = source.pipe(
  mergeScan((acc, num) => of([...acc, num]), []),
);

Live demo for scan, mergeScan and switchScan. Original author’s demo (for switchScan only).

Leave a Reply

(*) Required, Your email will not be published