Saltar al contenido principal
Architecture

Event Sourcing + CQRS: Diseño de Event Store, Proyecciones, Snapshots y Cuándo NO Usarlo

6 min lectura
LD
Lucio Durán
Engineering Manager & AI Solutions Architect
También disponible en: English, Italiano

Fundamentos de Event Sourcing

En lugar de almacenar el estado actual de una entidad, se almacena la secuencia de eventos que condujeron a ese estado. El estado actual se deriva reproduciendo los eventos. Este patrón se implementa típicamente con PostgreSQL o un event store dedicado.

# event_store.py
from dataclasses import dataclass, field
from datetime import datetime
import json, uuid

@dataclass(frozen=True)
class Event:
 event_id: str
 stream_id: str
 event_type: str
 data: dict
 metadata: dict
 version: int
 timestamp: datetime
 schema_version: int = 1

class EventStore:
 def __init__(self, connection):
 self.conn = connection

 def append(self, stream_id: str, events: list[Event],
 expected_version: int) -> None:
 with self.conn.cursor() as cur:
 cur.execute(
 "SELECT MAX(version) FROM events WHERE stream_id = %s FOR UPDATE",
 (stream_id,)
 )
 current_version = cur.fetchone()[0] or 0
 if current_version != expected_version:
 raise ConcurrencyError(
 f"Versión esperada {expected_version}, "
 f"obtuve {current_version} para stream {stream_id}"
 )
 for i, event in enumerate(events):
 new_version = expected_version + i + 1
 cur.execute("""
 INSERT INTO events
 (event_id, stream_id, event_type, data, metadata,
 version, timestamp, schema_version)
 VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
 """, (
 event.event_id, stream_id, event.event_type,
 json.dumps(event.data), json.dumps(event.metadata),
 new_version, event.timestamp, event.schema_version
 ))
 self.conn.commit()

El Schema del Event Store

CREATE TABLE events (
 event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
 stream_id TEXT NOT NULL,
 event_type TEXT NOT NULL,
 data JSONB NOT NULL,
 metadata JSONB NOT NULL DEFAULT '{}',
 version INTEGER NOT NULL,
 timestamp TIMESTAMPTZ NOT NULL DEFAULT now(),
 schema_version INTEGER NOT NULL DEFAULT 1,
 UNIQUE (stream_id, version)
);

CREATE INDEX idx_events_stream ON events (stream_id, version);
CREATE INDEX idx_events_type ON events (event_type, timestamp);

CREATE TABLE snapshots (
 stream_id TEXT NOT NULL,
 version INTEGER NOT NULL,
 state JSONB NOT NULL,
 created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
 PRIMARY KEY (stream_id, version)
);

Agregados: Donde Vive la Lógica de Negocio

# order_aggregate.py
class OrderAggregate:
 def __init__(self):
 self.order_id = None
 self.status = None
 self.items = []
 self.total = 0
 self.version = 0

 def place_order(self, order_id, items, customer_id):
 if self.status is not None:
 raise InvalidOperationError("La orden ya existe")
 if not items:
 raise ValidationError("La orden debe tener al menos un item")
 total = sum(item['price'] * item['quantity'] for item in items)
 return [Event(
 event_id=str(uuid.uuid4()),
 stream_id=f"order-{order_id}",
 event_type="OrderPlaced",
 data={'order_id': order_id, 'customer_id': customer_id,
 'items': items, 'total': total},
 metadata={'causation_id': str(uuid.uuid4())},
 version=self.version + 1,
 timestamp=datetime.utcnow()
 )]

 def apply(self, event: Event):
 handler = getattr(self, f'_on_{event.event_type}', None)
 if handler:
 handler(event.data)
 self.version = event.version

 def _on_OrderPlaced(self, data):
 self.order_id = data['order_id']
 self.status = 'placed'
 self.items = data['items']
 self.total = data['total']

 def _on_PaymentConfirmed(self, data):
 self.status = 'paid'

 def _on_OrderCancelled(self, data):
 self.status = 'cancelled'

 @classmethod
 def from_events(cls, events):
 aggregate = cls()
 for event in events:
 aggregate.apply(event)
 return aggregate

Proyecciones: Construyendo Read Models

Las proyecciones transforman el event log en read models optimizados para queries. Esta es la "Q" en CQRS.

class OrderDashboardProjection:
 def __init__(self, read_db):
 self.db = read_db

 def handle(self, event: Event):
 handler = getattr(self, f'_handle_{event.event_type}', None)
 if handler:
 handler(event)

 def _handle_OrderPlaced(self, event):
 data = event.data
 self.db.execute("""
 INSERT INTO order_dashboard
 (order_id, customer_id, status, total, item_count, placed_at)
 VALUES (%s, %s, 'placed', %s, %s, %s)
 ON CONFLICT (order_id) DO UPDATE SET
 status = 'placed', total = EXCLUDED.total
 """, (data['order_id'], data['customer_id'],
 data['total'], len(data['items']), event.timestamp))

 def rebuild(self, event_store):
 """Reconstruir toda la proyección desde cero."""
 self.db.execute("TRUNCATE order_dashboard")
 for batch in event_store.read_all(batch_size=1000):
 for event in batch:
 self.handle(event)
 self.db.commit()

Snapshots: Domando el Costo de Rehidratación

class SnapshotRepository:
 def __init__(self, event_store, snapshot_store, interval=100):
 self.event_store = event_store
 self.snapshot_store = snapshot_store
 self.interval = interval

 def load(self, stream_id):
 snapshot = self.snapshot_store.get_latest(stream_id)
 if snapshot:
 aggregate = OrderAggregate()
 aggregate.__dict__.update(snapshot['state'])
 aggregate.version = snapshot['version']
 events = self.event_store.read_stream(stream_id, from_version=snapshot['version'])
 else:
 aggregate = OrderAggregate()
 events = self.event_store.read_stream(stream_id)

 for event in events:
 aggregate.apply(event)
 return aggregate

 def save(self, stream_id, aggregate, new_events):
 self.event_store.append(stream_id, new_events, aggregate.version - len(new_events))
 if aggregate.version % self.interval == 0:
 self.snapshot_store.save(
 stream_id=stream_id, version=aggregate.version,
 state=aggregate.__dict__
 )

Cuándo NO Usar Event Sourcing

Esta sección aborda los escenarios comunes donde event sourcing empeora las cosas.

1. Aplicaciones CRUD simples. Si tu dominio es "usuario llena formulario, datos van a la base, alguien los lee después," event sourcing agrega complejidad sin beneficio. Una tabla PostgreSQL con columnas created_at y updated_at cubre el 90% de las necesidades de auditoría.

2. Cuando tu equipo no entiende DDD. Event sourcing sin boundaries de agregados correctos se vuelve una pesadilla. Si tu equipo no puede responder "cuál es el boundary de consistencia aquí?", se va a terminar con un monolito distribuido que es más difícil de razonar que la base de datos que reemplazó.

3. Cuando se necesita queries ad-hoc. Los event stores están optimizados para leer un solo stream. Queries cross-aggregate requieren proyecciones. Si tu caso de uso principal es "mostrame todas las órdenes del cliente X de marzo que contengan producto Y," se necesita una proyección para cada nuevo patrón de query. Con una base relacional, es una cláusula WHERE.

4. Prototipos y MVPs. El tiempo que se invierte construyendo infraestructura de event store, rebuild de proyecciones, estrategias de snapshots y upcasting es tiempo que no se está gastando validando tu hipótesis de negocio.

5. Cuando la consistencia eventual es inaceptable. Las proyecciones son eventualmente consistentes con el event store. Siempre hay un delay entre que un evento se almacena y el read model se actualiza.

Un ejemplo ilustrativo: un servicio de gestión de usuarios donde los usuarios se registran, actualizan su perfil y cambian su email. Con quizás cinco tipos de eventos, el requerimiento de "audit trail" que motivó event sourcing se podría haber resuelto con una simple tabla user_audit_log. En cambio, el sistema tenía un event store, tres proyecciones, una estrategia de snapshots, y un pipeline de upcasting para cuando se agregó un campo display_name. Arquitecturalmente interesante y prácticamente terrible.

Cuándo Event Sourcing Brilla

Los sistemas donde event sourcing tiene éxito tienden a compartir rasgos comunes:

  • Lógica de dominio compleja con muchas transiciones de estado y reglas de negocio
  • Requerimientos de auditoría que iban más allá de "quién cambió qué" a "cuál era el estado en el momento T y por qué"
  • Integraciones event-driven donde sistemas downstream necesitaban reaccionar a eventos de dominio
  • Queries temporales como "mostrame la orden como estaba ayer a las 3pm"

Considérese una plataforma de trading donde requisitos regulatorios demandan un audit trail completo e inmutable de cada cambio de estado. O un sistema de logística donde docenas de servicios downstream necesitan reaccionar a eventos de envío. En ambos casos, event sourcing es el fit natural, y la alternativa (change data capture sobre una base relacional) sería más compleja, no menos.

La realidad es que la mayoría de los sistemas no necesitan event sourcing. Pero cuando el caso de uso lo demanda, nada más se acerca.

event-sourcingcqrsevent-storeprojectionssagasarchitectureddd

Herramientas mencionadas en este artículo

AWSProbá AWS
SupabaseProbá Supabase
Divulgación: Algunos enlaces en este artículo son enlaces de afiliado. Si te registrás a través de ellos, puedo recibir una comisión sin costo adicional para vos. Solo recomiendo herramientas que uso y en las que confío personalmente.
Compartir
Seguime