ESG Gateway
The nomenclature we're using for the esg that is responsible for controlling the flow of events in and out of a subsystem is ${subSystem}-esg-gateway
(i.e. jedi-esg-gateway
). This means that this is the esg that is responsible for controlling flow of events "internally" in Chaine; with other subsystems.
Each ${subSystem}-esg-gateway
has two primary listeners:
- ingress-listener
- egress-listener
1. Ingress listener
- takes domain events from external systems, and converts it to an internal format.
- domain events are filtered to this function using the following filterPatterns on kinesis:
"source": "external"
"source": [ { "anything-but": [ "${subSystem}-esg-gateway" ] } ]
- in a serverless.yml, this would look like:
functions:
ingress-listener:
handler: src/ingress-listener/index.listener
name: ${self:service}-ingress-listener-${sls:stage}
events:
- stream:
type: kinesis
arn: ${self:custom.kinesisStream}
filterPatterns:
- data:
source: ['external']
# 👆 filter on kinesis to only process events from external sources
# prevents processing of events generated by itself
- data:
detail:
tags:
source:
- anything-but: ['${subSystem}-esg-gateway']
# 👆 prevents processing of events generated by the egress-listener
# which will have "source":"external" for internal events leaving the subsystem
2. Egress listener
- takes internal domain events from within a subsystem, converts it to a format that it wants to support, with strong backwards compatibility enforcement, and publishes the event back to the event bus.
- attaches a
"source": "external"
field to all events - The event bus then can route the event, now in an "external format" with a rule to other subsystems that want to consume it.
- takes domain events that are filtered by:
"source": [ { "anything-but": [ "external" ] } ]
and,"source": [ { "anything-but": [ "${subSystem}-esg-gateway" ] } ]
.
functions:
egress-listener:
handler: src/egress-listener/index.listener
name: ${self:service}-egress-listener-${sls:stage}
events:
- stream:
type: kinesis
arn: ${self:custom.kinesisStream}
filterPatterns:
- data:
source:
- anything-but: ['external']
# 👆 prevents processing of events generated by this listener
- data:
source:
- anything-but: ['${subSystem}-esg-gateway']
# # 👆 prevents processing of events generated by the ingress-listener
Sharing events with other subsystems
- Decide what fields your subsystem wants to support to send to other subsystems.
- Refer to mappers to see how to do mappers. This will used the `toEventbridge1 flavor and publish events to the event bus, which then will use routing rules to send the event from event bus to another subsystem's event bus.
- Make sure to set the source "external", and in the return, have a
tags.source='${subSystem}-esg-gateway'
in the rule any time you are sending events from the subsytem to
const MY_EVENT_RULE = {
id: 'my-event',
flavor: toEventbridge,
eventType: 'thingChanged',
// 👇 Used to filter on Event bus and Kinesis
source: 'external',
//...
toEnvelope: faulty(uow) =>{
//...
return {
event:{
//...
},
//...
tags:{
/** 👇 will prevent the esg from processing events created by itself */
source: '${subSystem}-esg-gateway'
//...
}
//...
}
}
}
The format of the final event to event bridge will be:
{
"account": "001122334455",
"detail":{ // this is the part that is returned in the toEnvelope
"event": {
// ..
},
"tags": {
"source": "${subSystem}-esg-gateway"
},
},
"detail-type": "thingChanged",
//...
"source": "external"
}
Filter in the event bus
Once the above event goes to your event bus, you can use rules to do further filtering.
/**
* Create a reference to an existing event bus that can be used in the targets of the eventRule
*/
const otherSubsystemEventBus = events.EventBus.fromEventBusArn(
this,
`otherSubsystem-${env}-event-bus`,
`arn:aws:events:us-east-1:001122334455:event-bus/otherSubsystem-${env}-event-bus`
)
this.eventRule = new events.Rule(this, `${subSystem}-external-${env}-event-bus-rule`, {
ruleName: `${subSystem}-external-${env}-event-bus-rule`,
eventBus: this.bus,
eventPattern: {
detailType: ['thingChanged'], // In function publishToEventBridge in core, detailType is from uow.event.type
// Only for events that are meant to be for external subsystems
source: ['external'],
detail: {
type: [{'anything-but': 'fault'}]
}
},
targets: [new targets.EventBus(otherSubsystemEventBus)]
})
Mappers
To Eventbridge
Example:
export const PARTICIPANT_ADDED_EVENT: ToEventBridgeRule = {
/** This converts a participantAdded external domain event from jedi to an internal format, called shipmentParticipantAdded for downstream consumers to consume.
* @id id of projection pipe, this will not work if the name is not unique
*/
id: 'shipment-participant-added',
flavor: toEventbridge,
eventType: 'participantAdded',
source: 'tachyon-esg-gateway', // Can be used for filtering on event bus level
busName: process.env.BUS_NAME || 'tachyon-dev-event-bus',
parallel: 4, // this can also be controlled in process.env.PARALLEL, and this field removed.
toEnvelope: faulty((uow: UnitOfWork<ParticipantAdded>): ParticipantAddedEnvelope | string => {
if (!uow.event) return ''
const {event: envelope} = uow
const event: ParticipantAdded = envelope.event
return {
id: envelope.id,
type: `shipmentParticipantAdded`,
timestamp: envelope.timestamp,
tags: {
source: `tachyon-esg-gateway-${process.env.ENV_STAGE || 'dev'}` // 👈 add the name of this service. This is IMPORTANT for the same event to be processed again by this Rule that is generated by this Rule.
},
event: {
id: event.id,
$name: `chaine/tachyon/esg-gateway/user/shipmentParticipantAdded`,
version: 0, // Internal versioning
participant: {
id: event.participant.id,
emailOrPhone: event.participant.emailOrPhone,
workspaceID: event.participant.workspaceID
},
metadata: {invitedByWorkspaceID: event.invitedByWorkspaceID, invitedByUserID: event.invitedByUserID}
}
}
})
}
For all