CQRS in Go series:
- Part 1: the aggregate, Transition() and Clone()
- Part 2: command handlers without side effects
- Part 3: sagas and event choreography
- Part 4: PostgreSQL as an event store
The first three parts laid the groundwork: immutable aggregates, pure command handlers, sagas through choreography. One central question remains — where do we persist the events? This part answers that question using PostgreSQL as an append-only event store, with no additional dependencies.
Why PostgreSQL and not EventStoreDB or Kafka
EventStoreDB is an excellent tool, designed specifically for event sourcing. But it's an additional component to deploy, monitor, back up, and maintain. For a team already running PostgreSQL in production, adding EventStoreDB means doubling the operational footprint for a database.
Kafka is often mentioned in this context. It's a message bus, not an event store. Reading by aggregate ID is not its strong suit — Kafka is optimized for sequential partition consumption, not for "give me all events for order CMD-4521 in order". Default retention is time-limited. Reconstituting an aggregate from Kafka requires non-trivial work.
PostgreSQL, you already have it. Native ACID, JSONB with indexing, backups integrated into your existing infrastructure, monitoring your team knows, pg_dump, replication, point-in-time recovery. For 90% of projects, an append-only table in PostgreSQL is more than enough. Moving to a dedicated tool makes sense when you're processing millions of events per second — a threshold most projects never reach.
The es_events table — the core
Everything rests on a single table:
CREATE TABLE es_events (
id BIGSERIAL PRIMARY KEY,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(64) NOT NULL,
event_type VARCHAR(128) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
version INT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (aggregate_id, version)
);
CREATE INDEX idx_es_events_aggregate ON es_events (aggregate_id, version);
Each column has a specific role:
- id (BIGSERIAL): global position in the stream. This is the cursor used by projectors to track where they are. Monotonically increasing, never reused.
- aggregate_id: the identifier of the entity involved — the UUID of the order, payment, or shipment. This is the primary read key for reconstituting an aggregate.
- aggregate_type: the type of the aggregate — "order", "payment", "shipping". Allows filtering events by domain and avoids UUID collisions between different types.
-
event_type: the name of the event — "OrderPlaced", "PaymentConfirmed", "ShipmentDispatched".
Used to deserialize
event_datainto the correct Go type. - event_data (JSONB): the serialized event payload. JSONB enables indexing and field-level queries when needed, without sacrificing flexibility.
- metadata (JSONB): traceability information — who triggered the action (user ID), correlation ID, causation ID, client timestamp. Not domain data, but indispensable in production.
- version: sequence number per aggregate. The first event of an aggregate is at version 1, the second at version 2, and so on.
The UNIQUE (aggregate_id, version) constraint is the centerpiece of concurrent safety.
It makes it impossible to insert two events with the same version on the same aggregate. This is
the optimistic locking mechanism — no read lock, a constraint violation on write in case of conflict.
Append — writing an event
type EventStore struct {
db *sqlx.DB
}
func (s *EventStore) Append(ctx context.Context, aggregateID uuid.UUID, aggregateType string, expectedVersion int, events []StorableEvent) error {
tx, err := s.db.BeginTxx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback()
for i, event := range events {
version := expectedVersion + i + 1
data, err := json.Marshal(event.Data)
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
_, err = tx.ExecContext(ctx, `
INSERT INTO es_events (aggregate_id, aggregate_type, event_type, event_data, metadata, version)
VALUES ($1, $2, $3, $4, $5, $6)
`, aggregateID, aggregateType, event.Type, data, event.Metadata, version)
if err != nil {
// UNIQUE violation = version conflict = optimistic locking
if isUniqueViolation(err) {
return ErrConcurrencyConflict
}
return fmt.Errorf("insert event: %w", err)
}
}
return tx.Commit()
}
The expectedVersion parameter is the number of the last known event at the time the aggregate
was loaded. If the handler produces two events, they will be inserted at versions
expectedVersion + 1 and expectedVersion + 2.
If another goroutine wrote an event between loading and writing, the UNIQUE constraint kicks in —
PostgreSQL rejects the insert with a constraint violation, translated into
ErrConcurrencyConflict. The client can retry: it will reload the aggregate with the new
event, recalculate the state, and re-execute the handler. This is optimistic locking — no read lock,
just a check at write time.
Load — reloading an aggregate
func (s *EventStore) Load(ctx context.Context, aggregateID uuid.UUID) ([]StoredEvent, error) {
var events []StoredEvent
err := s.db.SelectContext(ctx, &events, `
SELECT id, aggregate_id, aggregate_type, event_type, event_data, metadata, version, created_at
FROM es_events
WHERE aggregate_id = $1
ORDER BY version ASC
`, aggregateID)
if err != nil {
return nil, fmt.Errorf("loading events: %w", err)
}
return events, nil
}
The aggregate's state is never stored directly. It's always recalculated by replaying events
in order, calling Transition() on each one (see Part 1). This is the replay.
The result is deterministic — the same events always produce the same state. This is what makes the
system testable and auditable.
Subscriptions — feeding projections
Projectors — the components that maintain read views — must be notified of new events. The simplest solution: a positions table.
CREATE TABLE es_subscriptions (
subscriber_id VARCHAR(128) PRIMARY KEY,
last_event_id BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Each projector records the identifier of the last event it processed. On startup or after a crash, it resumes from this position. The polling pattern:
type Subscriber struct {
db *sqlx.DB
subscriberID string
handler EventHandler
}
func (s *Subscriber) Poll(ctx context.Context) error {
// 1. Read the last processed position
var lastID int64
s.db.GetContext(ctx, &lastID,
"SELECT last_event_id FROM es_subscriptions WHERE subscriber_id = $1",
s.subscriberID,
)
// 2. Load new events
var events []StoredEvent
s.db.SelectContext(ctx, &events,
"SELECT * FROM es_events WHERE id > $1 ORDER BY id ASC LIMIT 100",
lastID,
)
// 3. Process within a transaction
for _, event := range events {
tx, _ := s.db.BeginTxx(ctx, nil)
if err := s.handler.Handle(ctx, tx, event); err != nil {
tx.Rollback()
return fmt.Errorf("handling event %d: %w", event.ID, err)
}
// 4. Update the position in the same transaction
tx.ExecContext(ctx,
"UPDATE es_subscriptions SET last_event_id = $1, updated_at = NOW() WHERE subscriber_id = $2",
event.ID, s.subscriberID,
)
tx.Commit()
}
return nil
}
The critical detail is in point 4: the position update and event processing are in the same transaction. If the process crashes between processing and updating the position, the event will be reprocessed on the next poll. This is at-least-once delivery — handlers must be idempotent to absorb duplicates without side effects.
Idempotency in practice: a handler that inserts a row into a read view can use
INSERT ... ON CONFLICT DO NOTHING, or check whether the row already exists. The deduplication
key is generally the event_id or a combination of unique business fields.
Real-time notifications with LISTEN/NOTIFY
Polling introduces latency — at best a few hundred milliseconds if the poll runs frequently, at worst several seconds. For read views that need to be reactive, PostgreSQL offers a native mechanism: LISTEN/NOTIFY.
// Event store side: notify after each INSERT
func (s *EventStore) Append(ctx context.Context, aggregateID uuid.UUID, aggregateType string, expectedVersion int, events []StorableEvent) error {
tx, err := s.db.BeginTxx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback()
// ... insert events ...
// Notify subscribers
if _, err := tx.ExecContext(ctx, "NOTIFY es_events_channel"); err != nil {
return fmt.Errorf("notify: %w", err)
}
return tx.Commit()
}
// Subscriber side: listen instead of polling
func (s *Subscriber) Listen(ctx context.Context) error {
conn, err := s.db.Conn(ctx)
if err != nil {
return fmt.Errorf("acquire conn: %w", err)
}
defer conn.Close()
if _, err := conn.ExecContext(ctx, "LISTEN es_events_channel"); err != nil {
return fmt.Errorf("listen: %w", err)
}
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Wait for notification (blocking with timeout)
_, err := conn.WaitForNotification(ctx)
if err != nil {
return fmt.Errorf("wait notification: %w", err)
}
// Poll immediately after the notification
if err := s.Poll(ctx); err != nil {
return err
}
}
}
The notification is sent inside the Append transaction — it's only delivered to listeners once the transaction commits. No false alarms on rollback.
In production, combine both approaches: LISTEN for real-time reactivity under normal conditions, periodic polling (every 5 to 30 seconds) as a safety net in case of reconnection or missed notification. The dedicated LISTEN connection must not be taken from the standard connection pool — it occupies a connection permanently.
The command gateway — putting it all together
With the event store in place, the complete flow for a command becomes explicit:
type CommandGateway struct {
store *EventStore
}
func (gw *CommandGateway) Execute(ctx context.Context, aggregateID uuid.UUID, cmd Command, handler CommandHandler) error {
// 1. Load events
storedEvents, err := gw.store.Load(ctx, aggregateID)
if err != nil {
return fmt.Errorf("load aggregate: %w", err)
}
// 2. Reconstruct state
state := Replay(storedEvents)
currentVersion := len(storedEvents)
// 3. Execute the handler (pure, no side effects)
newEvents, err := handler.Handle(ctx, state, cmd)
if err != nil {
return err
}
// 4. Persist the new events
return gw.store.Append(ctx, aggregateID, cmd.AggregateType(), currentVersion, newEvents)
}
If two commands arrive simultaneously on the same aggregate, the second fails with
ErrConcurrencyConflict. The client can retry — it will reload the aggregate including the
events from the first command, recalculate the state, and re-execute the handler on the updated state. In
practice, real conflicts are rare on well-bounded aggregates; most concurrent operations touch different aggregates.
This is also the pattern that makes command handlers testable without infrastructure (Part 2): the gateway injects the state, the handler produces events, the gateway persists. Each responsibility is isolated.
Performance — when events accumulate
An aggregate with a few dozen events reloads in microseconds. An aggregate with 10,000 events — a long-lived order, a very active user account — starts to weigh on the replay. The solution is snapshots.
CREATE TABLE es_snapshots (
aggregate_id UUID PRIMARY KEY,
aggregate_type VARCHAR(64) NOT NULL,
version INT NOT NULL,
state_data JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
A snapshot is a captured, serialized state at a given point in time, associated with its version. Loading with a snapshot proceeds in two steps:
- Load the most recent snapshot for the aggregate (if it exists)
- Load only the events after the snapshot's version
- Replay the partial events on the snapshot's state
The simplest practical rule: create a snapshot every 100 events. Most aggregates never reach this limit — a typical e-commerce aggregate generates 5 to 20 events over its lifetime. Snapshots are only necessary for high-volume aggregates: accounts, wallets, inventories updated hundreds of times per day.
Important: snapshots are an optimization, not a change to the data model. Events remain the source of truth. A corrupted or deleted snapshot is not data loss — the aggregate can be reconstituted from the beginning of its events.
Series summary
Across four parts, we built a complete CQRS implementation in Go:
-
Part 1 — The aggregate: immutable state,
Transition()for applying events,Clone()for dry-runs. The foundation that makes everything else testable. - Part 2 — Command handlers: pure functions that take a state and a command, return events or an error. No database, no HTTP, testable in complete isolation.
- Part 3 — Sagas and choreography: coordination between aggregates through published events. Each service reacts to what interests it, without direct coupling. Sagas handle compensation in case of distributed failure.
- Part 4 — The event store: PostgreSQL as the persistence infrastructure. Append-only table, optimistic locking through UNIQUE constraint, subscriptions for projectors, LISTEN/NOTIFY for reactivity, snapshots for large-scale performance.
These four patterns combine: the command gateway loads via the event store, calls the pure handler, persists the produced events. Subscribers read the global stream and maintain read views. Sagas listen for certain event types and trigger commands on other aggregates.
The result is a system that is auditable by construction — the complete history is in the events —, testable at every layer, and operable with the PostgreSQL infrastructure you already manage.