Skip to main content

What is the pipes and filters design pattern?

The pipes and filters design pattern is a way to process data or perform actions on data by breaking the process down into smaller, independent steps, or "filters." These filters are connected together using "pipes," which pass the data from one filter to the next in the sequence.

A simple analogy for the pipes and filters pattern is a factory assembly line. In a factory, raw materials are transformed into finished products by passing through a series of stations or "filters," each of which performs a specific task. For example, a car assembly line might have filters for welding the frame, installing the engine, attaching the wheels, and painting the body. These filters are connected together by a conveyor belt or "pipe," which moves the partially completed car from one filter to the next.

In a software system, the pipes and filters pattern can be used to perform a wide range of tasks, such as data validation, data transformation, and data analysis. Each filter is responsible for performing a specific action on the data, and the pipes connect the filters together to form a workflow or process. This allows for a high degree of modularity and flexibility, as the filters can be easily added, removed, or rearranged to suit the needs of the system.

At Chaine, we're using this design pattern quite often via our internal Streams Pipeline library that utilizes NodeJS streams via the HighlandJS library. The general way this is done is to first define out your series of handlers "or filters", that are then encapsulated in a controller, and which are finally used in a NodeJS stream. Note, this can also be done without NodeJS streams, in which case you are simply running the handlers in the controller.


Let's first define some types (for context only):

// src/utils/types.ts
/**
* Implemented by a class to indicate it acts as a handler for a given message
*/
export interface Handler<I, O> {
/**
* A function that is called each time a message of `messageType` is received
* @param message The message read from the bus
* @param attributes Attributes of the message read from the bus
*/
handle: (dto: I) => Promise<O>
}

Let's define out a "step" or work to be done in an handler called DoWorkHandler:

At Chaine, we use inversify, so if you are not using it for your project, ignore the @injectable and use simple instantiated classes when using the handlers

// src/pipelines/do-work/do-work-handler.ts

import _ from 'highland'
import {Handler} from '../../utils/types'
import {inject, injectable} from 'inversify'
import {PipelineRepo, IPipelineRepo} from '../_infra/pipeline-repo'
import {WorkDTO, WorkReturnType} from './do-work-types'

/**
* Does some work
*/
@injectable()
export class DoWorkHandler implements Handler<WorkDTO, DoWorkReturnType> {
constructor(
@inject(PipelineRepo)
private readonly pipelineRepo: IPipelineRepo
) {}

async handle() {
const result = await this.pipelineRepo.doWork()
return {...result}
}
}

Similarly, you can define out another "step" and for brevity, we will exclude imports. Let's call it DoWorkHandler:

// src/pipelines/do-more-work/do-more-work-handler.ts
// ... other imports similar to do-work-handler.ts
import {MoreWorkDTO, DoMoreWorkReturnType} from './do-more-work-types'

/**
* Does more work
*/
@injectable()
export class DoMoreWorkHandler implements Handler<MoreWorkDTO, DoMoreWorkReturnType> {
constructor(
@inject(PipelineRepo)
private readonly pipelineRepo: IPipelineRepo
) {}

async handle() {
const result = await this.pipelineRepo.doMoreWork()
return {...result}
}
}

Then, simply define a controller that encompasses your handlers:

Ignore the ApplicationContainer if you're not using inversiy or working on a Chaine repo.

// src/pipelines/controller.ts
import {
ApplicationController,
IApplicationErrors,
UnitOfWork,
UUID,
faulty,
publishToEventBridge as publish
} from '@chaine/core'
import highland from 'highland'

// Local imports
import {ApplicationContainer} from '../infra/application-container'
import {DoWorkHandler} from './do-work/do-work-handler'
import {DoWorkReturnType} from './do-work/do-work-types'
import {DoMoreWorkHandler} from './do-more-work/do-more-work-handler'
import {Controller} from '../utils/types'

// cache container instance if we get the same lambda container.
// Only needed if you're using inversify (at Chaine).
// For other projects, you can instantiate your classes here.
let applicationContainer: ApplicationContainer

function initApplication(): void {
if (!applicationContainer) {
applicationContainer = ApplicationContainer.instance()
}
}

/**
* Do some work and get some data
*
* @returns a highland stream promise with the results of DoWork
*/
export const doWorkController = () => () => {
initApplication()
return highland(applicationContainer.get<DoWorkHandler>(DoWorkHandler).handle())
}

/**
* Do some more work and get some data or events to publish
*
* @returns a highland stream promise of the unzipped csv file data
*/
export const doMoreWorkController = () => (workDone: DoWorkReturnType) => {
initApplication()
return highland(applicationContainer.get<DoMoreWorkHandler>(DoMoreWorkHandler).handle(workDone))
}

Now, your pipeline will be:

// src/pipelines/the-pipelines.ts
import {ApplicationController, IApplicationErrors, UUID, faulty, publishToEventBridge as publish} from '@chaine/core'

import {ApplicationContainer} from '../infra/application-container'

import {doWorkController, doMoreWorkController} from './controllers'
import highland from 'highland'

// You can also use a simple filter function that will filter incoming events in the pipeline prior to doing your other 2 steps. This is also considered a "step" in the pipes and filters pattern
export const onEventType = faulty(({envelope}) => envelope.type === 'SomethingHappened')

/**
* Pipeline that does a series of work to show the pipes and filters pattern
* @param example the example properties needed during creation
*/

export const registrationPipeline =
() =>
<T>(stream: Highland.Stream<T>) =>
stream.filter(onEventType).flatMap(doWorkController()).flatMap(doMoreWorkController()).through(publish())

So what the steps we are doing here are:

  • Filter all incoming events and only execute 'SomethingHappened' events
  • Do some work that returns some async data or array of async promises. Highland will then separate out each of the results in an array to a separate pipeline under the hood for more efficient processing
  • Do more work similar to previous step
  • Publish each final result to AWS EventBridge using the publish function.