Skip to main content

Command and Query Responsibility Segregation (CQRS)

What is CQRS?

Command and Query Responsibility Segregation (CQRS) is a software architectural pattern that involves separating the read and write operations of a system into separate components.

In a CQRS system, the "command" side of the system is responsible for modifying the state of the system, while the "query" side is responsible for reading the current state of the system. These two sides are typically implemented as separate components, which can be scaled independently to better handle the different demands placed on them.

One of the main benefits of CQRS is that it allows the read and write operations of a system to be optimized for their respective purposes. For example, the write side of the system can be optimized for handling high levels of write traffic, while the read side can be optimized for fast, efficient queries.

CQRS is often used in conjunction with event sourcing, which involves storing a log of events that describe changes to the state of the system. The events stored in the event store can be used to reconstruct the current state of the system at any point in time, which can be useful in situations where it is necessary to recover from a system failure or to track the history of changes made to the system.

CQRS can be particularly useful in systems that require high levels of scalability and performance, such as systems that handle large amounts of data or that are accessed by a large number of users simultaneously.

Commands and Queries

2 objects:

  1. Commands: responsible for state mutations,
  2. Queries: responsible for getting data to client

This allows us to make different decisions (infra and coding) on different places; different on queries vs commands.

Why do we need CQRS and Event Sourcing? The single most important reason we want to use CQRS or Event Sourcing is that it gives us the ability to build any structural model coming back from the events. This is business driven and NOT technology driven. We can offer things to our business users now or in the future that we typically can't do. Storing events allow us this ability to suddenly give business users information they need or don't have at any point, and hence helping provide a competitive advantage. By using event sourcing, your modeling the actual behavior of the business instead of a developer or architect's perceived view of the business which is influenced by technical concerns.

A secondary business reason is that we don't have any model on the write model, other than our events. We don't have any other model on the write side, and there is no cost associated with those models. On write model, we just need to load up an aggregate to process a transaction, which we do by replaying the events. Overtime this is very low-overhead and as a company scales, there is tremendous ROI compared to traditional systems.

Commands

  • Request to mutate state.
  • A method with void return type that mutates state

Queries

  • Ask for data
  • Request DTO on remote facade, and thin read layer where it is direct to DTO
  • Don't need to go through domain model and help remove accidental complexity
  • Don't need to support reads on our domain model, and no application services (use-cases).*

Command handler

public interface Handles<T> where T: message {
void Handle(T: message)
}
  • All a handler will do is coordinate a domain to get the process done.
  • We don't want to handle behavior here, that should be in the domain object.
  • Just the item, and then call the command on the item

Example of a command handler:

public class DeactivateInventoryItemHandler: Handles<DeactivateInventoryItem> {
public DeactivateInventoryItemHandler(InventoryItemRepository rep){}
void Handle(DeactivateInventoryItemCommand cmd){
var item=repository.GetById(cmd.Id);
item.Deactivate(comd.Comment);
}
}

On the client:

Send (new DeactivateInventoryCommand)

The handler will not know when to commit the transaction, and it shouldn't know either. You don't commit transactions in the command handler. It only happens conceptionally in the handler (a chain of responsibility).

You can just build a "chain" of these, where different handlers handle cross cutting concerns (for example, 1 for auth, 1 for logging 1 for handling command)

public class TransactionalHandler<T> : Handles<T> where T:message {
private Handles<T> next;
public TransactionalHandler(Handles<T> Next){
next = Next;
}

public Handle(T cmd){
using(new UnitOfWork()){
next.Handle(cmd);
}

}
}

public class LoggingHandler<T> : Handles>T? where T:Message {
private Handles<T> next;
public LoggingHandler(Handles<T> Next){
next = Next;
}

public Handle(T cmd){
Logging.WriteLog(cmd);
next.Handle(cmd);
}
}

Then it is easy to build a chain of commands:

var handler = new TransactionalHandler<DeactivateInventoryItemCommand>(
new LoggingHandler<DeactivateInventoryItemCommand>(
new DeactivateInventoryItemHandler()))

The previous can be replaced by adding attributes to make it more cleaner. This will wrap the Handler with attributes we want (instead of the previous code-block). In TypeScript, the below code-block can be achieved by using Decorators.

[RequiresPermission("Admin")] // this will then just wrap this with a RequiresPermission Handler
[Transactional] // this will then just wrap this with a Transactional Handler
[LogsTo("C:\\foo.txt")] // this will then just wrap this with a Logging Handler
public class DeactivateInventoryItemHandler: Handles<DeactivateInventoryItem> {
public DeactivateInventoryItemHandler(InventoryItemRepository rep){}
void Handle(DeactivateInventoryItemCommand cmd){
var item=repository.GetById(cmd.Id);
item.Deactivate(comd.Comment);
}
}

Events

Big difference between commands and events has to do with language; events are verbs in the past-tense. Code-wise, they look similar, but they have a linguistic difference.

// Command
public class DeactivateInventoryItem: Handles<DeactivateInventoryItem> {
public readonly Guid InventoryItemId;
public readonly string Comment;

public DeactivateInventoryItemCommand (Guid id, string comment){
InventoryItemId = id
Comment = comment
}
}

// Event
public class InventoryItemDeactivated: Handles<InventoryItemDeactivated> {
public readonly Guid InventoryItemId;
public readonly string Comment;

public InventoryItemDeactivatedEvent (Guid id, string comment){
InventoryItemId = id
Comment = comment
}
}

Nouns are used in the domain model, as a transaction object, which will have noun names. Command is telling you to do something Event is representing an action that is completed in the past (past means you are no longer part of the transaction). That means the transaction has already occured. If you disagree with the event, you can't throw an exception, instead you need to determine a compensating action.

The language should mimc the way the system works.

Event Handler

[Transactional] // You can use same attributes as command handler, re-use the tooling and significantly reduce your code and tooling
public class InventoryItemDeactivatedEventHandler: Handles<InventoryItemDeactivatedEvent> {
public void Handle(InventoryItemDeactivatedEvent event) {
// INSERT INTO FOO() VALUES e.cjjd e.ccc
}
}

Use event handlers with store procedures.

Event Sourcing

  • Event sourcing doesn't really store data objects, but instead the object is created from the series of events
  • Another example is a cart. At any time, you can replay the events of a shopping cart to get the current state of the cart. By replaying those events in a domain object, we can rebuild the state. There are lots of benefits to doing this rather than saving a current versioned of the state.
  • Main reason we want to use event sourcing (or events), if we have an event stream, we can create any structural model from those events. Even in the future if the requirement requires something different in 10 years, then you can build that structural model from the series of events.
    • The power of this is you can use the data to answer any business question that can be asked. For example, if business asks "We think users that remove an item 5 minutes before checkout will buy that item 5 months later", we can prove or disapprove that by building a structural model on all the events
  • You cannont replay commands like this because commands have a behavior (i.e. when you checkout, part of chekcout is billing a credit card). When you go to replay, you don't want to bill the card.
  • Instead, you can "replay" by looking at events because events don't have behaviors.
  • Bugs become easier to reproduce because you can go back in time and look at all the events and commands at the time of bug.
  • We can run all the events in a new version of our software to see how our new version will behave

What happens if we have a million events on an aggregate? We have to pull a million events on the aggregate?

  • To solve this, we introduce the concept of rolling snapshots
  • All events are bound to aggregate id, and aggregate id is the query we are querying against

Most basic event storage we can build is here, here is an Event log: | Column Name | Column Type | |---|---| | AggregateId | Guid | | Data | Blob | | Version | Int |

  • AggregateId
  • Data as a blob (binary, text, xml) depends on the serialization
  • Version: incrementing version for each event done per aggregate. Only applies per aggregate

Notice there is no primary key on the event table since we are not updating or deleting any of the events.

Aggregate table: | Column Name | Column Type | |---|---| | AggregateId | Guid | | Type | Varchar | | Version | Int |

  • Type: not necessary but you can use this to determine how your system is being used on a per aggregate type
  • Version: version here represents the MAX version in the events table. Its easy to query here to get the latest version

Reading Data Without snapshotting, this is all that is needed to query data:

SELECT * FROM EVENTS WHERE AGGREGATED-" ORDER BY VERSION

This will give all events of an aggregate in the order they occured and only query we ever need to ask the event store (without snapshotting). This will get modified with snapshotting, but at its basic form, this is it.

Writing Data

Write Operation in Event Storage

//Begin Transaction
var version = getVersionFromAggregates(aggregateID) //SELECT version from aggregates where AgregateId = "

// if version is null, meaning aggregate doesn't exisit
if (!version){
...insert into aggregates
version = 0
} //end

// Concurrency check: Whene evnts are saved, it tells us what version it expects us to be at.
if(expectedversion != version){ //expectedversion !=version
...raise concurrency problem
}

// Iterate through events we are being asked to save
events.foreach((event)=>{ //foreach event
...insert event with incremented version number
...update aggregate with last version number we're saving
}
//End Transaction

** Rolling Snapshots are heuristic to prevent the need to load all of the events

Logging

  • Logging in command handler, you can log commands that failed
  • You can do it off command or event, depends on the requirements of your system

Snapshots

Rolling Snapshots are heuristic to prevent the need to load all of the events when issuing a query to rebuild an aggregate. They are a denormalization of the aggregate at a given point in time. A change to the query logic and an additional table are all that is necessary to add the heuristic to the basic Event Storage.

Column NameColumn Type
AggregateIdGuid
SerializedDataBlob
VersionInt

Figure: Definition of Snapshots Table

The serialized data could be in any one of a host of possible schemas, binary, XML, raw text, etc. The decision on how to serialize the snapshots is really dependent upon the system being built. A version number is included with the snapshot, it represents which version of the aggregate the snapshot represents.

How it works

  • Snapshotter issues a query, something like: select * from aggregates, where current_version - last_snapshotted_version > 50 (At most your system should be running about 50 events to load up an object, but you can select this value). So 50 events for a snapshot
  • Runs query, loads up object from last snapshot, and takes a new snapshot and writes a snapshot back to the table asynchronously

When building a snapshot, most people use rename an internal field on the aggregate? The serializer won't be able to deserialize anymore. Another common pattern, momento, can be used where you have a separate object that can be serialized. This allows us to versionize the snapshot separately from the aggregate itself. Most small systems can work with normal serialized snapshot, but for bigger complex systems you may want to use this.

How we query when dealing with snapshots

  • First query to see if there is a snapshot
  • If yes, then only get all events greater than that snapshot value, and don't concern ourselves with anything greater. This makes event storage really fast

ℹ️ Resources

Sample repositories on GitHub