restartable.ts 856 B

12345678910111213141516171819202122232425
  1. export function restartableAsync<T>(
  2. iter: AsyncIterable<T>
  3. ): () => AsyncIterable<T> {
  4. // buffer stores all items that have been previously consumed.
  5. const buffer: T[] = [];
  6. return async function* () {
  7. // index of the next item in the buffer to yield.
  8. let i = 0;
  9. // produce all items previously consumed by other iterators.
  10. for (; i < buffer.length; i++) {
  11. yield buffer[i];
  12. }
  13. // now takes the next from the iterator.
  14. for await (const item of iter) {
  15. // this is a little subtle, but other concurrent iterators may have
  16. // consumed and buffered items while we were waiting. So we need to put
  17. // our new item in the back of the buffer and yield from where we preiously
  18. // left off.
  19. buffer.push(item);
  20. for (; i < buffer.length; i++) {
  21. yield buffer[i];
  22. }
  23. }
  24. };
  25. }