Skip to main content

ESG Gateway

The nomenclature we're using for the esg that is responsible for controlling the flow of events in and out of a subsystem is ${subSystem}-esg-gateway (i.e. jedi-esg-gateway). This means that this is the esg that is responsible for controlling flow of events "internally" in Chaine; with other subsystems.

Each ${subSystem}-esg-gateway has two primary listeners:

  1. ingress-listener
  2. egress-listener

1. Ingress listener

  • takes domain events from external systems, and converts it to an internal format.
  • domain events are filtered to this function using the following filterPatterns on kinesis:
    • "source": "external"
    • "source": [ { "anything-but": [ "${subSystem}-esg-gateway" ] } ]
  • in a serverless.yml, this would look like:
functions:
ingress-listener:
handler: src/ingress-listener/index.listener
name: ${self:service}-ingress-listener-${sls:stage}
events:
- stream:
type: kinesis
arn: ${self:custom.kinesisStream}
filterPatterns:
- data:
source: ['external']
# 👆 filter on kinesis to only process events from external sources
# prevents processing of events generated by itself
- data:
detail:
tags:
source:
- anything-but: ['${subSystem}-esg-gateway']
# 👆 prevents processing of events generated by the egress-listener
# which will have "source":"external" for internal events leaving the subsystem

2. Egress listener

  • takes internal domain events from within a subsystem, converts it to a format that it wants to support, with strong backwards compatibility enforcement, and publishes the event back to the event bus.
  • attaches a "source": "external" field to all events
  • The event bus then can route the event, now in an "external format" with a rule to other subsystems that want to consume it.
  • takes domain events that are filtered by:
    • "source": [ { "anything-but": [ "external" ] } ] and,
    • "source": [ { "anything-but": [ "${subSystem}-esg-gateway" ] } ].
functions:
egress-listener:
handler: src/egress-listener/index.listener
name: ${self:service}-egress-listener-${sls:stage}
events:
- stream:
type: kinesis
arn: ${self:custom.kinesisStream}
filterPatterns:
- data:
source:
- anything-but: ['external']
# 👆 prevents processing of events generated by this listener
- data:
source:
- anything-but: ['${subSystem}-esg-gateway']
# # 👆 prevents processing of events generated by the ingress-listener

Sharing events with other subsystems

  1. Decide what fields your subsystem wants to support to send to other subsystems.
  2. Refer to mappers to see how to do mappers. This will used the `toEventbridge1 flavor and publish events to the event bus, which then will use routing rules to send the event from event bus to another subsystem's event bus.
  3. Make sure to set the source "external", and in the return, have a tags.source='${subSystem}-esg-gateway' in the rule any time you are sending events from the subsytem to
const MY_EVENT_RULE = {
id: 'my-event',
flavor: toEventbridge,
eventType: 'thingChanged',
// 👇 Used to filter on Event bus and Kinesis
source: 'external',

//...
toEnvelope: faulty(uow) =>{
//...
return {
event:{
//...
},
//...
tags:{
/** 👇 will prevent the esg from processing events created by itself */
source: '${subSystem}-esg-gateway'
//...
}
//...
}
}
}

The format of the final event to event bridge will be:

{
"account": "001122334455",
"detail":{ // this is the part that is returned in the toEnvelope
"event": {
// ..
},
"tags": {
"source": "${subSystem}-esg-gateway"
},

},
"detail-type": "thingChanged",
//...
"source": "external"
}

Filter in the event bus

Once the above event goes to your event bus, you can use rules to do further filtering.

/**
* Create a reference to an existing event bus that can be used in the targets of the eventRule
*/
const otherSubsystemEventBus = events.EventBus.fromEventBusArn(
this,
`otherSubsystem-${env}-event-bus`,
`arn:aws:events:us-east-1:001122334455:event-bus/otherSubsystem-${env}-event-bus`
)

this.eventRule = new events.Rule(this, `${subSystem}-external-${env}-event-bus-rule`, {
ruleName: `${subSystem}-external-${env}-event-bus-rule`,
eventBus: this.bus,
eventPattern: {
detailType: ['thingChanged'], // In function publishToEventBridge in core, detailType is from uow.event.type
// Only for events that are meant to be for external subsystems
source: ['external'],
detail: {
type: [{'anything-but': 'fault'}]
}
},
targets: [new targets.EventBus(otherSubsystemEventBus)]
})

Mappers

To Eventbridge

Example:

export const PARTICIPANT_ADDED_EVENT: ToEventBridgeRule = {
/** This converts a participantAdded external domain event from jedi to an internal format, called shipmentParticipantAdded for downstream consumers to consume.
* @id id of projection pipe, this will not work if the name is not unique
*/
id: 'shipment-participant-added',
flavor: toEventbridge,
eventType: 'participantAdded',
source: 'tachyon-esg-gateway', // Can be used for filtering on event bus level
busName: process.env.BUS_NAME || 'tachyon-dev-event-bus',
parallel: 4, // this can also be controlled in process.env.PARALLEL, and this field removed.
toEnvelope: faulty((uow: UnitOfWork<ParticipantAdded>): ParticipantAddedEnvelope | string => {
if (!uow.event) return ''
const {event: envelope} = uow
const event: ParticipantAdded = envelope.event
return {
id: envelope.id,
type: `shipmentParticipantAdded`,
timestamp: envelope.timestamp,
tags: {
source: `tachyon-esg-gateway-${process.env.ENV_STAGE || 'dev'}` // 👈 add the name of this service. This is IMPORTANT for the same event to be processed again by this Rule that is generated by this Rule.
},
event: {
id: event.id,
$name: `chaine/tachyon/esg-gateway/user/shipmentParticipantAdded`,
version: 0, // Internal versioning
participant: {
id: event.participant.id,
emailOrPhone: event.participant.emailOrPhone,
workspaceID: event.participant.workspaceID
},
metadata: {invitedByWorkspaceID: event.invitedByWorkspaceID, invitedByUserID: event.invitedByUserID}
}
}
})
}

For all