Skip to main content

Stale emission

If you have two properties(for example filter and counter) in the store and queries for them and on subscription emission of the filter$ you're updating counter property then you will get stale emission. Let's see the code example:

store.ts
import { createStore, withProps, select } from '@ngneat/elf';

interface Props {
filter: string | null;
counter: number;
}

export const store = createStore(
{ name: 'todo' },
withProps<Props>({ filter: null, counter: 0 })
);

export const filter$ = store.pipe(select(({ filters }) => filters));
export const counter$ = store.pipe(select(({ counter }) => counter));
component.ts
import { filter$, counter$, store } from './store.ts';

// FIRST SUBSCRIBER
filter$.subscribe(() => {
store.update((state) => ({
...state,
counter: state.counter + 1,
}));
});

// SECOND SUBSCRIBER
counter$.subscribe((counter) => {
console.log(counter);
});

// Update the filter
store.update((state) => ({
...state,
filter: 'test',
}));

Why would we see 1 2 1 in logs? Once FIRST SUBSCRIBER receives first emission on subscribing it updates counter to 1. After that, SECOND SUBSCRIBER receives first emission on subscribing and logs 1. When we update the filter it first passes to FIRST SUBSCRIBER which updates the counter property. The SECOND SUBSCRIBER receives this emission and logs the value 2. But the SECOND SUBSCRIBER will still receive the value 1, since the emission of the filter update is still in the pipeline with a staled state.

There are two ways to get around this issue:

  1. Change the subscriptions order - SECONED SUBSCRIBER and then FIRST SUBSCRIBER:
component.ts
import { filter$, counter$, store } from './store.ts';

counter$.subscribe((counter) => {
console.log(counter);
});

filter$.subscribe(() => {
store.update((state) => ({
...state,
counter: state.counter + 1,
}));
});
  1. Delay the FIRST SUBSCRIBER update using one of RxJS operators. (e.g auditTime(0)):
component.ts
import { filter$, counter$, store } from './store.ts';
import { auditTime } from 'rxjs';

filter$.pipe(auditTime(0)).subscribe(() => {
store.update((state) => ({
...state,
counter: state.counter + 1,
}));
});

counter$.subscribe((counter) => {
console.log(counter);
});