Issue
I have an RxJS Observable which emits binary data of type Uint8Array
values. But not every emitted value contains exactly one complete data oject which can be processed on its own.
The data format of complete data objects consist of a start Byte (0xAA
), some variable length data in between and an end byte (0xFF
). The data in between is BCD encoded, this means principally it does not contain start or end bytes but only binary values from 0x00
to 0x99
.
Here is an example:
// This is a mock of the source observable which emits values:
const source$ = from([
// Case 1: One complete data object with start (0xAA) and end byte (0xFF)
new Uint8Array([0xAA, 0x01, 0x05, 0x95, 0x51, 0xFF,]),
// Case 2: Two complete data objects in a single value emit
new Uint8Array([0xAA, 0x12, 0x76, 0xFF, 0xAA, 0x83, 0x43, 0xFF,]),
// Case 3: Two uncomplete value emits which form a single data object
new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67]),
new Uint8Array([0x82, 0x73, 0x44, 0x28, 0x85, 0xFF]),
// Case 4: A combination of Cases 2 and 3
new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67]),
new Uint8Array([0x55, 0x81, 0xFF, 0xAA, 0x73, 0x96]),
new Uint8Array([0x72, 0x23, 0x11, 0x95, 0xFF]),
])
source$.subscribe((x) => {
console.log('Emitted value as Hexdump:')
console.log(hexdump(x.buffer))
})
The goal is to receive only complete data objects. Maybe as a tranformed new observable?
The example from above should be something like this:
const transformedSource$ = from([
// Case 1
new Uint8Array([0xAA, 0x01, 0x05, 0x95, 0x51, 0xFF,]),
// Case 2
new Uint8Array([0xAA, 0x12, 0x76, 0xFF,]),
new Uint8Array([0xAA, 0x83, 0x43, 0xFF,]),
// Case 3
new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67, 0x82, 0x73, 0x44, 0x28, 0x85, 0xFF]),
// Case 4
new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67, 0x55, 0x81, 0xFF]),
new Uint8Array([0xAA, 0x73, 0x96, 0x72, 0x23, 0x11, 0x95, 0xFF]),
])
- Which RxJS methods or operators are suitable for this?
- I thought about first to do splits at
0xFF
and then to do merges again. How to do this? Ideas from people with RxJS experience is much appreciated.
Solution
What you could try is splitting the chunks in order to process the bytes one by one, adding them to a buffer until the 0xff
byte appears in the stream and returning all buffered elements and reset the buffer for the next chunk:
let buffer = new Uint8Array();
transformedSource$.pipe(
mergeAll(), // this splits your array and emits the single bytes into the stream
mergeMap((next) => {
buffer = new Uint8Array([...buffer, next]);
if (next === 0xff) { (
const result = buffer;
buffer = new Uint8Array(); // resets the buffer
return of(result); // will emit a completed chunk
}
return EMPTY; // won't emit anything
})
)
.subscribe(console.log);
and in case you are not a fan of global variables or you want to reuse that code also for other streams, here an alternative solution with a custom operator:
const mergeChunks = () => {
let buffer = new Uint8Array();
return (source$) =>
source$.pipe(
mergeMap((next) => {
buffer = new Uint8Array([...buffer, next]);
if (next === 0xff) {
const result = buffer;
buffer = new Uint8Array();
return of(result);
}
return EMPTY;
})
);
}
transformedSource$.pipe(
mergeAll(),
mergeChunks(),
)
.subscribe(console.log);