Vai al contenuto principale
Architecture

Event Sourcing + CQRS: Design dell'Event Store, Proiezioni, Snapshot e Quando NON Usarlo

6 min lettura
LD
Lucio Durán
Engineering Manager & AI Solutions Architect
Disponibile anche in: English, Español

Fondamenti dell'Event Sourcing

Invece di memorizzare lo stato attuale di un'entità, si memorizza la sequenza di eventi che ha portato a quello stato. Lo stato attuale si deriva riproducendo gli eventi. Questo pattern viene tipicamente implementato con PostgreSQL o un event store dedicato.

# event_store.py
from dataclasses import dataclass, field
from datetime import datetime
import json, uuid

@dataclass(frozen=True)
class Event:
 event_id: str
 stream_id: str
 event_type: str
 data: dict
 metadata: dict
 version: int
 timestamp: datetime
 schema_version: int = 1

class EventStore:
 def __init__(self, connection):
 self.conn = connection

 def append(self, stream_id: str, events: list[Event],
 expected_version: int) -> None:
 with self.conn.cursor() as cur:
 cur.execute(
 "SELECT MAX(version) FROM events WHERE stream_id = %s FOR UPDATE",
 (stream_id,)
 )
 current_version = cur.fetchone()[0] or 0
 if current_version != expected_version:
 raise ConcurrencyError(
 f"Versione attesa {expected_version}, "
 f"ottenuta {current_version} per stream {stream_id}"
 )
 for i, event in enumerate(events):
 new_version = expected_version + i + 1
 cur.execute("""
 INSERT INTO events
 (event_id, stream_id, event_type, data, metadata,
 version, timestamp, schema_version)
 VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
 """, (
 event.event_id, stream_id, event.event_type,
 json.dumps(event.data), json.dumps(event.metadata),
 new_version, event.timestamp, event.schema_version
 ))
 self.conn.commit()

Lo Schema dell'Event Store

CREATE TABLE events (
 event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
 stream_id TEXT NOT NULL,
 event_type TEXT NOT NULL,
 data JSONB NOT NULL,
 metadata JSONB NOT NULL DEFAULT '{}',
 version INTEGER NOT NULL,
 timestamp TIMESTAMPTZ NOT NULL DEFAULT now(),
 schema_version INTEGER NOT NULL DEFAULT 1,
 UNIQUE (stream_id, version)
);

CREATE INDEX idx_events_stream ON events (stream_id, version);
CREATE INDEX idx_events_type ON events (event_type, timestamp);

CREATE TABLE snapshots (
 stream_id TEXT NOT NULL,
 version INTEGER NOT NULL,
 state JSONB NOT NULL,
 created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
 PRIMARY KEY (stream_id, version)
);

Aggregati: Dove Vive la Logica di Business

class OrderAggregate:
 def __init__(self):
 self.order_id = None
 self.status = None
 self.items = []
 self.total = 0
 self.version = 0

 def place_order(self, order_id, items, customer_id):
 if self.status is not None:
 raise InvalidOperationError("L'ordine esiste già")
 if not items:
 raise ValidationError("L'ordine deve avere almeno un articolo")
 total = sum(item['price'] * item['quantity'] for item in items)
 return [Event(
 event_id=str(uuid.uuid4()),
 stream_id=f"order-{order_id}",
 event_type="OrderPlaced",
 data={'order_id': order_id, 'customer_id': customer_id,
 'items': items, 'total': total},
 metadata={'causation_id': str(uuid.uuid4())},
 version=self.version + 1,
 timestamp=datetime.utcnow()
 )]

 def apply(self, event: Event):
 handler = getattr(self, f'_on_{event.event_type}', None)
 if handler:
 handler(event.data)
 self.version = event.version

 def _on_OrderPlaced(self, data):
 self.order_id = data['order_id']
 self.status = 'placed'
 self.items = data['items']
 self.total = data['total']

 def _on_PaymentConfirmed(self, data):
 self.status = 'paid'

 def _on_OrderCancelled(self, data):
 self.status = 'cancelled'

 @classmethod
 def from_events(cls, events):
 aggregate = cls()
 for event in events:
 aggregate.apply(event)
 return aggregate

Proiezioni: Costruire Read Model

class OrderDashboardProjection:
 def __init__(self, read_db):
 self.db = read_db

 def handle(self, event: Event):
 handler = getattr(self, f'_handle_{event.event_type}', None)
 if handler:
 handler(event)

 def _handle_OrderPlaced(self, event):
 data = event.data
 self.db.execute("""
 INSERT INTO order_dashboard
 (order_id, customer_id, status, total, item_count, placed_at)
 VALUES (%s, %s, 'placed', %s, %s, %s)
 ON CONFLICT (order_id) DO UPDATE SET
 status = 'placed', total = EXCLUDED.total
 """, (data['order_id'], data['customer_id'],
 data['total'], len(data['items']), event.timestamp))

 def rebuild(self, event_store):
 """Ricostruisci l'intera proiezione da zero."""
 self.db.execute("TRUNCATE order_dashboard")
 for batch in event_store.read_all(batch_size=1000):
 for event in batch:
 self.handle(event)
 self.db.commit()

Snapshot: Domare il Costo di Reidratazione

class SnapshotRepository:
 def __init__(self, event_store, snapshot_store, interval=100):
 self.event_store = event_store
 self.snapshot_store = snapshot_store
 self.interval = interval

 def load(self, stream_id):
 snapshot = self.snapshot_store.get_latest(stream_id)
 if snapshot:
 aggregate = OrderAggregate()
 aggregate.__dict__.update(snapshot['state'])
 aggregate.version = snapshot['version']
 events = self.event_store.read_stream(stream_id, from_version=snapshot['version'])
 else:
 aggregate = OrderAggregate()
 events = self.event_store.read_stream(stream_id)
 for event in events:
 aggregate.apply(event)
 return aggregate

 def save(self, stream_id, aggregate, new_events):
 self.event_store.append(stream_id, new_events, aggregate.version - len(new_events))
 if aggregate.version % self.interval == 0:
 self.snapshot_store.save(
 stream_id=stream_id, version=aggregate.version,
 state=aggregate.__dict__
 )

Quando NON Usare Event Sourcing

Questa sezione affronta gli scenari comuni in cui l'event sourcing peggiora le cose.

1. Applicazioni CRUD semplici. Se il tuo dominio è "l'utente compila un form, i dati vanno nel database, qualcuno li legge dopo," l'event sourcing aggiunge complessità senza benefici. Una tabella PostgreSQL con colonne created_at e updated_at copre il 90% delle necessità di audit.

2. Quando il tuo team non capisce il DDD. Event sourcing senza confini di aggregato corretti diventa un incubo. Se il tuo team non sa rispondere "qual è il confine di consistenza qui?", finirai con un monolite distribuito più difficile da ragionare del database che ha sostituito.

3. Quando servono query ad-hoc. Gli event store sono ottimizzati per leggere un singolo stream. Query cross-aggregato richiedono proiezioni. Se il tuo caso d'uso principale è "mostrami tutti gli ordini del cliente X di marzo con il prodotto Y," serve una proiezione per ogni nuovo pattern di query. Con un database relazionale, è una clausola WHERE.

4. Prototipi e MVP. Il tempo speso a costruire infrastruttura per l'event store è tempo non speso a validare l'ipotesi di business.

5. Quando la consistenza eventuale è inaccettabile. Le proiezioni sono eventualmente consistenti con l'event store. C'è sempre un ritardo tra la memorizzazione di un evento e l'aggiornamento del read model.

Un esempio illustrativo: un servizio di gestione utenti dove gli utenti si registrano, aggiornano il profilo e cambiano email. Con forse cinque tipi di eventi, il requisito "audit trail" che ha motivato l'event sourcing si sarebbe potuto risolvere con una semplice tabella user_audit_log. Invece il sistema aveva un event store, tre proiezioni, una strategia di snapshot e un pipeline di upcasting per quando è stato aggiunto un campo display_name. Architetturalmente interessante e praticamente terribile.

Quando l'Event Sourcing Brilla

I sistemi dove l'event sourcing ha successo tendono a condividere tratti comuni:

  • Logica di dominio complessa con molte transizioni di stato e regole di business
  • Requisiti di audit che andavano oltre "chi ha cambiato cosa" fino a "qual era lo stato al tempo T e perché"
  • Integrazioni event-driven dove sistemi downstream dovevano reagire a eventi di dominio
  • Query temporali come "mostrami l'ordine come era ieri alle 15"

Si consideri una piattaforma di trading dove requisiti normativi richiedono un audit trail completo e immutabile di ogni cambio di stato. O un sistema logistico dove decine di servizi downstream devono reagire a eventi di spedizione. In entrambi i casi, l'event sourcing è la scelta naturale, e l'alternativa (change data capture su database relazionale) sarebbe più complessa, non meno.

La realtà è che la maggior parte dei sistemi non ha bisogno dell'event sourcing. Ma quando il caso d'uso lo richiede, nient'altro si avvicina.

event-sourcingcqrsevent-storeprojectionssagasarchitectureddd

Strumenti menzionati in questo articolo

AWSProva AWS
SupabaseProva Supabase
Divulgazione: Alcuni link in questo articolo sono link di affiliazione. Se ti registri tramite questi, potrei guadagnare una commissione senza costi aggiuntivi per te. Raccomando solo strumenti che uso e di cui mi fido personalmente.
Compartir
Seguime