Skip to main content
Architecture

Event Sourcing + CQRS: Event Store Design, Projections, Snapshots, and When NOT to Use It

8 min read
LD
Lucio Durán
Engineering Manager & AI Solutions Architect
Also available in: Español, Italiano

Event Sourcing Fundamentals

Instead of storing the current state of an entity, you store the sequence of events that led to that state. The current state is derived by replaying events. This pattern is typically implemented with PostgreSQL or a dedicated event store.

# event_store.py - Core event sourcing abstractions
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import json
import uuid

@dataclass(frozen=True)
class Event:
 event_id: str
 stream_id: str
 event_type: str
 data: dict
 metadata: dict
 version: int
 timestamp: datetime
 schema_version: int = 1

@dataclass
class EventStream:
 stream_id: str
 events: list[Event] = field(default_factory=list)
 version: int = 0

class EventStore:
 """PostgreSQL-backed event store."""

 def __init__(self, connection):
 self.conn = connection

 def append(self, stream_id: str, events: list[Event],
 expected_version: int) -> None:
 """
 Append events to a stream with optimistic concurrency.
 Fails if expected_version doesn't match current version.
 """
 with self.conn.cursor() as cur:
 # Check current version (optimistic concurrency)
 cur.execute(
 "SELECT MAX(version) FROM events WHERE stream_id = %s FOR UPDATE",
 (stream_id,)
 )
 current_version = cur.fetchone()[0] or 0

 if current_version != expected_version:
 raise ConcurrencyError(
 f"Expected version {expected_version}, "
 f"got {current_version} for stream {stream_id}"
 )

 # Append events
 for i, event in enumerate(events):
 new_version = expected_version + i + 1
 cur.execute("""
 INSERT INTO events
 (event_id, stream_id, event_type, data, metadata,
 version, timestamp, schema_version)
 VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
 """, (
 event.event_id, stream_id, event.event_type,
 json.dumps(event.data), json.dumps(event.metadata),
 new_version, event.timestamp, event.schema_version
 ))

 self.conn.commit()

 def read_stream(self, stream_id: str,
 from_version: int = 0) -> list[Event]:
 with self.conn.cursor() as cur:
 cur.execute("""
 SELECT event_id, stream_id, event_type, data, metadata,
 version, timestamp, schema_version
 FROM events
 WHERE stream_id = %s AND version > %s
 ORDER BY version ASC
 """, (stream_id, from_version))

 return [
 Event(
 event_id=row[0], stream_id=row[1],
 event_type=row[2], data=json.loads(row[3]),
 metadata=json.loads(row[4]), version=row[5],
 timestamp=row[6], schema_version=row[7]
 )
 for row in cur.fetchall()
 ]

The Event Store Schema

-- event_store.sql
CREATE TABLE events (
 event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
 stream_id TEXT NOT NULL,
 event_type TEXT NOT NULL,
 data JSONB NOT NULL,
 metadata JSONB NOT NULL DEFAULT '{}',
 version INTEGER NOT NULL,
 timestamp TIMESTAMPTZ NOT NULL DEFAULT now(),
 schema_version INTEGER NOT NULL DEFAULT 1,

 -- Optimistic concurrency: unique stream + version
 UNIQUE (stream_id, version)
);

CREATE INDEX idx_events_stream ON events (stream_id, version);
CREATE INDEX idx_events_type ON events (event_type, timestamp);
CREATE INDEX idx_events_timestamp ON events (timestamp);

-- Snapshots table
CREATE TABLE snapshots (
 stream_id TEXT NOT NULL,
 version INTEGER NOT NULL,
 state JSONB NOT NULL,
 created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
 PRIMARY KEY (stream_id, version)
);

Aggregates: Where Business Logic Lives

# order_aggregate.py
class OrderAggregate:
 def __init__(self):
 self.order_id = None
 self.status = None
 self.items = []
 self.total = 0
 self.version = 0

 # Command handlers - validate and produce events
 def place_order(self, order_id: str, items: list, customer_id: str):
 if self.status is not None:
 raise InvalidOperationError("Order already exists")
 if not items:
 raise ValidationError("Order must have at least one item")

 total = sum(item['price'] * item['quantity'] for item in items)

 return [Event(
 event_id=str(uuid.uuid4()),
 stream_id=f"order-{order_id}",
 event_type="OrderPlaced",
 data={
 'order_id': order_id,
 'customer_id': customer_id,
 'items': items,
 'total': total,
 },
 metadata={'causation_id': str(uuid.uuid4())},
 version=self.version + 1,
 timestamp=datetime.utcnow()
 )]

 def confirm_payment(self, payment_id: str, amount: float):
 if self.status != 'placed':
 raise InvalidOperationError(f"Cannot pay for order in status {self.status}")
 if amount < self.total:
 raise ValidationError(f"Payment {amount} less than total {self.total}")

 return [Event(
 event_id=str(uuid.uuid4()),
 stream_id=f"order-{self.order_id}",
 event_type="PaymentConfirmed",
 data={'payment_id': payment_id, 'amount': amount},
 metadata={},
 version=self.version + 1,
 timestamp=datetime.utcnow()
 )]

 # Event handlers - apply events to rebuild state
 def apply(self, event: Event):
 handler = getattr(self, f'_on_{event.event_type}', None)
 if handler:
 handler(event.data)
 self.version = event.version

 def _on_OrderPlaced(self, data):
 self.order_id = data['order_id']
 self.status = 'placed'
 self.items = data['items']
 self.total = data['total']

 def _on_PaymentConfirmed(self, data):
 self.status = 'paid'

 def _on_OrderShipped(self, data):
 self.status = 'shipped'

 def _on_OrderCancelled(self, data):
 self.status = 'cancelled'

 @classmethod
 def from_events(cls, events: list[Event]) -> 'OrderAggregate':
 aggregate = cls()
 for event in events:
 aggregate.apply(event)
 return aggregate

Projections: Building Read Models

Projections transform the event log into query-optimized read models. This is the "Q" in CQRS.

# projections.py
class OrderDashboardProjection:
 """Projects events into a denormalized read model for the order dashboard."""

 def __init__(self, read_db):
 self.db = read_db
 self.handlers = {
 'OrderPlaced': self._handle_order_placed,
 'PaymentConfirmed': self._handle_payment_confirmed,
 'OrderShipped': self._handle_order_shipped,
 'OrderCancelled': self._handle_order_cancelled,
 }

 def handle(self, event: Event):
 handler = self.handlers.get(event.event_type)
 if handler:
 handler(event)
 # Track projection position for restart
 self._update_checkpoint(event.version, event.timestamp)

 def _handle_order_placed(self, event):
 data = event.data
 self.db.execute("""
 INSERT INTO order_dashboard
 (order_id, customer_id, status, total, item_count,
 placed_at, updated_at)
 VALUES (%s, %s, 'placed', %s, %s, %s, %s)
 ON CONFLICT (order_id) DO UPDATE SET
 status = 'placed',
 total = EXCLUDED.total,
 updated_at = EXCLUDED.updated_at
 """, (
 data['order_id'], data['customer_id'],
 data['total'], len(data['items']),
 event.timestamp, event.timestamp
 ))

 def _handle_payment_confirmed(self, event):
 self.db.execute("""
 UPDATE order_dashboard
 SET status = 'paid', paid_at = %s, updated_at = %s
 WHERE order_id = %s
 """, (event.timestamp, event.timestamp, event.data.get('order_id', event.stream_id.replace('order-', ''))))

 def rebuild(self, event_store: EventStore):
 """Rebuild the entire projection from scratch."""
 self.db.execute("TRUNCATE order_dashboard")
 self.db.execute("DELETE FROM projection_checkpoints WHERE projection = 'order_dashboard'")

 events = event_store.read_all(batch_size=1000)
 for batch in events:
 for event in batch:
 self.handle(event)
 self.db.commit()

Snapshots: Taming Rehydration Cost

When an aggregate has thousands of events, replaying them all for every command is expensive. Snapshots capture the aggregate state at a point in time:

# snapshot_strategy.py
class SnapshotRepository:
 def __init__(self, event_store: EventStore, snapshot_store, interval: int = 100):
 self.event_store = event_store
 self.snapshot_store = snapshot_store
 self.interval = interval

 def load(self, stream_id: str) -> OrderAggregate:
 # Try to load from snapshot first
 snapshot = self.snapshot_store.get_latest(stream_id)

 if snapshot:
 aggregate = OrderAggregate()
 aggregate.__dict__.update(snapshot['state'])
 aggregate.version = snapshot['version']
 # Load only events after the snapshot
 events = self.event_store.read_stream(stream_id, from_version=snapshot['version'])
 else:
 aggregate = OrderAggregate()
 events = self.event_store.read_stream(stream_id)

 for event in events:
 aggregate.apply(event)

 return aggregate

 def save(self, stream_id: str, aggregate: OrderAggregate, new_events: list[Event]):
 self.event_store.append(stream_id, new_events, aggregate.version - len(new_events))

 # Take snapshot every N events
 if aggregate.version % self.interval == 0:
 self.snapshot_store.save(
 stream_id=stream_id,
 version=aggregate.version,
 state=aggregate.__dict__
 )

When NOT to Use Event Sourcing

This section addresses common scenarios where event sourcing makes things worse:

1. Simple CRUD applications. If your domain is "user fills form, data goes in database, someone reads it later," event sourcing adds complexity with no benefit. A PostgreSQL table with created_at and updated_at columns covers 90% of audit needs.

2. When your team doesn't understand DDD. Event sourcing without proper aggregate boundaries becomes a nightmare. If your team can't answer "what is the consistency boundary here?", you'll end up with a distributed monolith that's harder to reason about than the database it replaced.

3. When you need ad-hoc queries. Event stores are optimized for reading a single stream. Cross-aggregate queries require projections. If your primary use case is "show me all orders from customer X that were placed in March and contain product Y," you need a projection for every new query pattern. With a relational database, it's a WHERE clause.

4. Prototypes and MVPs. The time you spend building event store infrastructure, projection rebuilding, snapshot strategies, and upcasting is time you're not spending on validating your business hypothesis. Ship the MVP with PostgreSQL. Migrate to event sourcing later if the domain complexity justifies it.

5. When eventual consistency is unacceptable. Projections are eventually consistent with the event store. There's always a delay between an event being stored and the read model being updated. If your business requires that a write is immediately visible in queries (e.g., a financial transaction), you need to work around this, and the workarounds add complexity.

A cautionary example: a user management service where users sign up, update their profile, and change their email. With perhaps five event types, the "audit trail" requirement that motivated event sourcing could have been solved with a simple user_audit_log table. Instead, the system had an event store, three projections, a snapshot strategy, and an upcasting pipeline for when a display_name field was added. Architecturally interesting and practically terrible.

When Event Sourcing Shines

Systems where event sourcing succeeds tend to share common traits:

  • Complex domain logic with many state transitions and business rules
  • Audit requirements that went beyond "who changed what" to "what was the state at time T and why"
  • Event-driven integrations where downstream systems needed to react to domain events
  • Temporal queries like "show me the order as it was at 3pm yesterday"

Consider a trading platform where regulatory requirements demand a complete, immutable audit trail of every state change. Or a logistics system where dozens of downstream services need to react to shipment events. In both cases, event sourcing is the natural fit, and the alternative (change data capture on a relational database) would be more complex, not less.

The reality is that most systems do not need event sourcing. But when the use case demands it, nothing else comes close.

event-sourcingcqrsevent-storeprojectionssagasarchitectureddd

Tools mentioned in this article

AWSTry AWS
SupabaseTry Supabase
Disclosure: Some links in this article are affiliate links. If you sign up through them, I may earn a commission at no extra cost to you. I only recommend tools I personally use and trust.
Compartir
Seguime