CQRS in Go — Part 4: PostgreSQL as an event store

CQRS in Go series:

The first three parts laid the groundwork: immutable aggregates, pure command handlers, sagas through choreography. One central question remains — where do we persist the events? This part answers that question using PostgreSQL as an append-only event store, with no additional dependencies.

Why PostgreSQL and not EventStoreDB or Kafka

EventStoreDB is an excellent tool, designed specifically for event sourcing. But it's an additional component to deploy, monitor, back up, and maintain. For a team already running PostgreSQL in production, adding EventStoreDB means doubling the operational footprint for a database.

Kafka is often mentioned in this context. It's a message bus, not an event store. Reading by aggregate ID is not its strong suit — Kafka is optimized for sequential partition consumption, not for "give me all events for order CMD-4521 in order". Default retention is time-limited. Reconstituting an aggregate from Kafka requires non-trivial work.

PostgreSQL, you already have it. Native ACID, JSONB with indexing, backups integrated into your existing infrastructure, monitoring your team knows, pg_dump, replication, point-in-time recovery. For 90% of projects, an append-only table in PostgreSQL is more than enough. Moving to a dedicated tool makes sense when you're processing millions of events per second — a threshold most projects never reach.

The es_events table — the core

Everything rests on a single table:

CREATE TABLE es_events (
    id            BIGSERIAL PRIMARY KEY,
    aggregate_id  UUID NOT NULL,
    aggregate_type VARCHAR(64) NOT NULL,
    event_type    VARCHAR(128) NOT NULL,
    event_data    JSONB NOT NULL,
    metadata      JSONB DEFAULT '{}',
    version       INT NOT NULL,
    created_at    TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    UNIQUE (aggregate_id, version)
);

CREATE INDEX idx_es_events_aggregate ON es_events (aggregate_id, version);

Each column has a specific role:

  • id (BIGSERIAL): global position in the stream. This is the cursor used by projectors to track where they are. Monotonically increasing, never reused.
  • aggregate_id: the identifier of the entity involved — the UUID of the order, payment, or shipment. This is the primary read key for reconstituting an aggregate.
  • aggregate_type: the type of the aggregate — "order", "payment", "shipping". Allows filtering events by domain and avoids UUID collisions between different types.
  • event_type: the name of the event — "OrderPlaced", "PaymentConfirmed", "ShipmentDispatched". Used to deserialize event_data into the correct Go type.
  • event_data (JSONB): the serialized event payload. JSONB enables indexing and field-level queries when needed, without sacrificing flexibility.
  • metadata (JSONB): traceability information — who triggered the action (user ID), correlation ID, causation ID, client timestamp. Not domain data, but indispensable in production.
  • version: sequence number per aggregate. The first event of an aggregate is at version 1, the second at version 2, and so on.

The UNIQUE (aggregate_id, version) constraint is the centerpiece of concurrent safety. It makes it impossible to insert two events with the same version on the same aggregate. This is the optimistic locking mechanism — no read lock, a constraint violation on write in case of conflict.

Append — writing an event

type EventStore struct {
    db *sqlx.DB
}

func (s *EventStore) Append(ctx context.Context, aggregateID uuid.UUID, aggregateType string, expectedVersion int, events []StorableEvent) error {
    tx, err := s.db.BeginTxx(ctx, nil)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback()

    for i, event := range events {
        version := expectedVersion + i + 1

        data, err := json.Marshal(event.Data)
        if err != nil {
            return fmt.Errorf("marshal event: %w", err)
        }

        _, err = tx.ExecContext(ctx, `
            INSERT INTO es_events (aggregate_id, aggregate_type, event_type, event_data, metadata, version)
            VALUES ($1, $2, $3, $4, $5, $6)
        `, aggregateID, aggregateType, event.Type, data, event.Metadata, version)

        if err != nil {
            // UNIQUE violation = version conflict = optimistic locking
            if isUniqueViolation(err) {
                return ErrConcurrencyConflict
            }
            return fmt.Errorf("insert event: %w", err)
        }
    }

    return tx.Commit()
}

The expectedVersion parameter is the number of the last known event at the time the aggregate was loaded. If the handler produces two events, they will be inserted at versions expectedVersion + 1 and expectedVersion + 2.

If another goroutine wrote an event between loading and writing, the UNIQUE constraint kicks in — PostgreSQL rejects the insert with a constraint violation, translated into ErrConcurrencyConflict. The client can retry: it will reload the aggregate with the new event, recalculate the state, and re-execute the handler. This is optimistic locking — no read lock, just a check at write time.

Load — reloading an aggregate

func (s *EventStore) Load(ctx context.Context, aggregateID uuid.UUID) ([]StoredEvent, error) {
    var events []StoredEvent

    err := s.db.SelectContext(ctx, &events, `
        SELECT id, aggregate_id, aggregate_type, event_type, event_data, metadata, version, created_at
        FROM es_events
        WHERE aggregate_id = $1
        ORDER BY version ASC
    `, aggregateID)

    if err != nil {
        return nil, fmt.Errorf("loading events: %w", err)
    }

    return events, nil
}

The aggregate's state is never stored directly. It's always recalculated by replaying events in order, calling Transition() on each one (see Part 1). This is the replay. The result is deterministic — the same events always produce the same state. This is what makes the system testable and auditable.

Subscriptions — feeding projections

Projectors — the components that maintain read views — must be notified of new events. The simplest solution: a positions table.

CREATE TABLE es_subscriptions (
    subscriber_id  VARCHAR(128) PRIMARY KEY,
    last_event_id  BIGINT NOT NULL DEFAULT 0,
    updated_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

Each projector records the identifier of the last event it processed. On startup or after a crash, it resumes from this position. The polling pattern:

type Subscriber struct {
    db           *sqlx.DB
    subscriberID string
    handler      EventHandler
}

func (s *Subscriber) Poll(ctx context.Context) error {
    // 1. Read the last processed position
    var lastID int64
    s.db.GetContext(ctx, &lastID,
        "SELECT last_event_id FROM es_subscriptions WHERE subscriber_id = $1",
        s.subscriberID,
    )

    // 2. Load new events
    var events []StoredEvent
    s.db.SelectContext(ctx, &events,
        "SELECT * FROM es_events WHERE id > $1 ORDER BY id ASC LIMIT 100",
        lastID,
    )

    // 3. Process within a transaction
    for _, event := range events {
        tx, _ := s.db.BeginTxx(ctx, nil)

        if err := s.handler.Handle(ctx, tx, event); err != nil {
            tx.Rollback()
            return fmt.Errorf("handling event %d: %w", event.ID, err)
        }

        // 4. Update the position in the same transaction
        tx.ExecContext(ctx,
            "UPDATE es_subscriptions SET last_event_id = $1, updated_at = NOW() WHERE subscriber_id = $2",
            event.ID, s.subscriberID,
        )

        tx.Commit()
    }

    return nil
}

The critical detail is in point 4: the position update and event processing are in the same transaction. If the process crashes between processing and updating the position, the event will be reprocessed on the next poll. This is at-least-once delivery — handlers must be idempotent to absorb duplicates without side effects.

Idempotency in practice: a handler that inserts a row into a read view can use INSERT ... ON CONFLICT DO NOTHING, or check whether the row already exists. The deduplication key is generally the event_id or a combination of unique business fields.

Real-time notifications with LISTEN/NOTIFY

Polling introduces latency — at best a few hundred milliseconds if the poll runs frequently, at worst several seconds. For read views that need to be reactive, PostgreSQL offers a native mechanism: LISTEN/NOTIFY.

// Event store side: notify after each INSERT
func (s *EventStore) Append(ctx context.Context, aggregateID uuid.UUID, aggregateType string, expectedVersion int, events []StorableEvent) error {
    tx, err := s.db.BeginTxx(ctx, nil)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback()

    // ... insert events ...

    // Notify subscribers
    if _, err := tx.ExecContext(ctx, "NOTIFY es_events_channel"); err != nil {
        return fmt.Errorf("notify: %w", err)
    }

    return tx.Commit()
}

// Subscriber side: listen instead of polling
func (s *Subscriber) Listen(ctx context.Context) error {
    conn, err := s.db.Conn(ctx)
    if err != nil {
        return fmt.Errorf("acquire conn: %w", err)
    }
    defer conn.Close()

    if _, err := conn.ExecContext(ctx, "LISTEN es_events_channel"); err != nil {
        return fmt.Errorf("listen: %w", err)
    }

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        // Wait for notification (blocking with timeout)
        _, err := conn.WaitForNotification(ctx)
        if err != nil {
            return fmt.Errorf("wait notification: %w", err)
        }

        // Poll immediately after the notification
        if err := s.Poll(ctx); err != nil {
            return err
        }
    }
}

The notification is sent inside the Append transaction — it's only delivered to listeners once the transaction commits. No false alarms on rollback.

In production, combine both approaches: LISTEN for real-time reactivity under normal conditions, periodic polling (every 5 to 30 seconds) as a safety net in case of reconnection or missed notification. The dedicated LISTEN connection must not be taken from the standard connection pool — it occupies a connection permanently.

The command gateway — putting it all together

With the event store in place, the complete flow for a command becomes explicit:

type CommandGateway struct {
    store *EventStore
}

func (gw *CommandGateway) Execute(ctx context.Context, aggregateID uuid.UUID, cmd Command, handler CommandHandler) error {
    // 1. Load events
    storedEvents, err := gw.store.Load(ctx, aggregateID)
    if err != nil {
        return fmt.Errorf("load aggregate: %w", err)
    }

    // 2. Reconstruct state
    state := Replay(storedEvents)
    currentVersion := len(storedEvents)

    // 3. Execute the handler (pure, no side effects)
    newEvents, err := handler.Handle(ctx, state, cmd)
    if err != nil {
        return err
    }

    // 4. Persist the new events
    return gw.store.Append(ctx, aggregateID, cmd.AggregateType(), currentVersion, newEvents)
}

If two commands arrive simultaneously on the same aggregate, the second fails with ErrConcurrencyConflict. The client can retry — it will reload the aggregate including the events from the first command, recalculate the state, and re-execute the handler on the updated state. In practice, real conflicts are rare on well-bounded aggregates; most concurrent operations touch different aggregates.

This is also the pattern that makes command handlers testable without infrastructure (Part 2): the gateway injects the state, the handler produces events, the gateway persists. Each responsibility is isolated.

Performance — when events accumulate

An aggregate with a few dozen events reloads in microseconds. An aggregate with 10,000 events — a long-lived order, a very active user account — starts to weigh on the replay. The solution is snapshots.

CREATE TABLE es_snapshots (
    aggregate_id   UUID PRIMARY KEY,
    aggregate_type VARCHAR(64) NOT NULL,
    version        INT NOT NULL,
    state_data     JSONB NOT NULL,
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

A snapshot is a captured, serialized state at a given point in time, associated with its version. Loading with a snapshot proceeds in two steps:

  • Load the most recent snapshot for the aggregate (if it exists)
  • Load only the events after the snapshot's version
  • Replay the partial events on the snapshot's state

The simplest practical rule: create a snapshot every 100 events. Most aggregates never reach this limit — a typical e-commerce aggregate generates 5 to 20 events over its lifetime. Snapshots are only necessary for high-volume aggregates: accounts, wallets, inventories updated hundreds of times per day.

Important: snapshots are an optimization, not a change to the data model. Events remain the source of truth. A corrupted or deleted snapshot is not data loss — the aggregate can be reconstituted from the beginning of its events.

Series summary

Across four parts, we built a complete CQRS implementation in Go:

  • Part 1 — The aggregate: immutable state, Transition() for applying events, Clone() for dry-runs. The foundation that makes everything else testable.
  • Part 2 — Command handlers: pure functions that take a state and a command, return events or an error. No database, no HTTP, testable in complete isolation.
  • Part 3 — Sagas and choreography: coordination between aggregates through published events. Each service reacts to what interests it, without direct coupling. Sagas handle compensation in case of distributed failure.
  • Part 4 — The event store: PostgreSQL as the persistence infrastructure. Append-only table, optimistic locking through UNIQUE constraint, subscriptions for projectors, LISTEN/NOTIFY for reactivity, snapshots for large-scale performance.

These four patterns combine: the command gateway loads via the event store, calls the pure handler, persists the produced events. Subscribers read the global stream and maintain read views. Sagas listen for certain event types and trigger commands on other aggregates.

The result is a system that is auditable by construction — the complete history is in the events —, testable at every layer, and operable with the PostgreSQL infrastructure you already manage.

📄 Associated CLAUDE.md

Comments (0)