Skip to main content

promises-in-streams

Streams vs Promises

  1. Streams: Think of a stream as a conveyor belt in a factory that keeps bringing items (chunks of data) one by one. It's a flow of data over time. Streams can handle backpressure, which means they can slow down or speed up based on the consumer's ability to handle the data.

  2. Promises: A promise is like a one-time delivery box. It either contains the item you requested (resolved) or a note saying why the item couldn't be delivered (rejected). Promises are one-time operations; once they are resolved or rejected, they can't be reused.

Combining Streams and Promises

Now, how do we combine these two? A common use-case is to have a stream of promises where each chunk of the stream is a promise that will eventually resolve to some value.

In Highland.js, you can create a stream from a promise by wrapping it with _(promise).

Conversely, you can turn a stream into a promise by using .toPromise(PromiseConstructor).

Functions Returning Streams of Promises

Let's say we have a pipline as such:

import {returnsStreams} from './returns-streams'
import _ from 'highland'

export const myPipeline = () => (stream: any) => stream.through(returnsStreams())

In the returnsStreams function, it return streams that internally deal with promises.

import {previousMonthYear} from '@chaine/core'
import axios, {AxiosResponse} from 'axios'
import _ from 'highland'
import {IncomingMessage} from 'http'

export const returnsStreams = () => {
const getData = () => {
const promise = new Promise((resolve, reject) => {
try {
const {month, year} = previousMonthYear()
console.log('11111here now', month, year)
const getURL = `https://get.test.com/get`
const response = axios.get(getURL, {
responseType: 'stream'
})

resolve(response)
} catch (err) {
reject(err)
}
})
.then((result) => {
return (result as AxiosResponse).data as IncomingMessage
})

.catch((e) => {
console.error('ERROR::::: ', JSON.stringify(e))
})
return _(promise)
}
return (s: Highland.Stream<void>) => s.flatMap(getData)
}

Here's a breakdown of what's happening:

  1. Initial State: At the beginning, you have a Highland stream stream that is fed into the myPipeline function.
  2. The through Method: .through() takes a function that itself takes a Highland stream and returns a new Highland stream. Essentially, it allows you to extend the stream pipeline. When you use .through(returnsStreams()), it means that the chunks flowing through stream will be processed according to whatever returnsStreams specifies, and then continue down the pipeline.
  • Why through and not flatMap directly?: Using .through() allows for better separation of concerns and modularity. You can define the transformation function (returnsStreams in this case) outside the main pipeline, making it easier to test, debug, and reason about.
  1. Inside returnsStreams: This function returns another function that takes a Highland stream s as an argument and applies .flatMap(getData) to it. The getData function is designed to return a promise that resolves to a stream (responseType: 'stream').

  2. Highland Stream from Promise: When getData is called, it returns a promise. Using _(promise) in Highland converts that promise into a Highland stream. This is not just a stream of data; it's a stream that will contain another stream once the promise resolves.

  3. Unwrapping with flatMap: Now, we get to the s.flatMap(getData). The .flatMap() method expects the function it receives (in this case, getData) to return a stream. Since getData returns a stream of a promise that resolves to a stream, .flatMap() will flatten this two-layer nesting into a single stream of data chunks.

  4. Result: The output is a Highland stream that effectively unwraps the nested streams and promises, yielding actual data chunks.

  5. Creating a Promise: Inside returnsStreams, a promise getData is created that fetches some data. This promise will resolve to a stream (IncomingMessage).

const promise = new Promise((resolve, reject) => {
// ...
resolve(response)
})
  1. Converting Promise to Stream: This promise is then converted into a Highland stream.
return _(promise)
  1. Flat Mapping the Stream: The function returns a Highland stream that will flatMap this promise-based stream, effectively unwrapping the promises into actual data chunks.
return (s: Highland.Stream<void>) => s.flatMap(getData)

Using Streams of Promises for Async Operations

  1. Sequencing: If you have a stream of promises, you can use .sequence() to wait for each promise to resolve before moving on to the next.

  2. FlatMap for Parallelism You can use .flatMap() if you don't care about the order and want to run operations in parallel.

  3. Error Handling: Highland streams can catch errors that would otherwise have to be caught in each individual promise's .catch() method.