In HTTP, idempotency is straightforward: store the key, return the cache. In a CQRS/Event Sourcing system, it's more subtle. The command may be idempotent, but what about the event it generates? The projection consuming it? Idempotency must cross the entire stack.
If you haven't read part 1, it covers the basics and the HTTP implementation in Go — with a complete PostgreSQL store that replaces Redis. Here we go one level deeper: command store, optimistic locking, idempotent projections, outbox pattern. These are the patterns found in financial systems, e-commerce platforms, anywhere a processing duplicate costs money or trust. Everything runs on PostgreSQL — no Redis, no external broker for idempotency.
The problem specific to Event Sourcing
In Event Sourcing, the application state is the event stream. An account balance is not a
column in the database — it's the sum of AccountCredited and AccountDebited events since
opening. If the same event is recorded twice, the reconstructed state is wrong. And often irreparable
without manual intervention.
Concrete scenario on a payment system:
- The command
DebitAccount(amount=100, idempotencyKey=abc123)arrives - The event
AccountDebited(amount=100)is recorded, balance goes from 200 to 100 - Network drops. Client retries. The same command arrives a second time
- Without protection: a second
AccountDebited(amount=100)is recorded, balance drops to 0 - The user has been charged twice for a single purchase
In a classic relational database, you could fix this with an UPDATE. In Event Sourcing, you never modify the past — you can only append a corrective event, which complicates audit and support. Better to prevent the duplicate from getting in at all.
Command idempotency with the idempotency key
The principle is the same as in HTTP: associate each command with a unique key provided by the client, and check before processing whether that key has already been seen. The difference is that you store the result in a dedicated command store, and the save must be atomic with the events.
CREATE TABLE processed_commands (
idempotency_key UUID PRIMARY KEY,
aggregate_id UUID NOT NULL,
command_type VARCHAR(100) NOT NULL,
result_event_id UUID,
processed_at TIMESTAMPTZ DEFAULT NOW()
);
The handler follows a three-step schema:
func (h *PaymentHandler) Handle(ctx context.Context, cmd DébiterCompte) error {
// 1. Check if already processed
existing, err := h.commandStore.Get(ctx, cmd.IdempotencyKey)
if err == nil {
// Already processed — return silent success
_ = existing
return nil
}
if !errors.Is(err, ErrNotFound) {
return fmt.Errorf("checking idempotency: %w", err)
}
// 2. Process the command
events, err := h.aggregate.Handle(ctx, cmd)
if err != nil {
return err
}
// 3. Save events + mark command as processed — ATOMICALLY
return h.store.SaveEventsAndMarkCommand(ctx, events, cmd.IdempotencyKey)
}
The atomicity of step 3 is non-negotiable. If you save the events in one transaction and mark the command in a second separate transaction, a crash between the two gives you: events in the database, command not marked. On the next retry, the handler doesn't see the key, processes again, records a duplicate event. Both operations must live in the same PostgreSQL transaction.
Optimistic locking — detecting concurrent conflicts
The idempotency key protects against duplicates of the same command. It doesn't protect against two different commands modifying the same aggregate at the same time. That's the role of optimistic locking.
Each aggregate has a version number. When a client reads the state of an aggregate, it receives its current version. When it sends a command, it includes that version. If in the meantime someone else has written to the same aggregate, the versions no longer match and the command is rejected — it's up to the client to re-read the state and decide whether its command still holds.
CREATE TABLE account_events (
id UUID PRIMARY KEY,
aggregate_id UUID NOT NULL,
version INT NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(aggregate_id, version) -- key constraint
);
The UNIQUE(aggregate_id, version) constraint does all the work. If two concurrent commands
try to write version 5 of the same aggregate, PostgreSQL lets one through and rejects the other with a
uniqueness violation.
type DébiterCompte struct {
AggregateID uuid.UUID
ExpectedVersion int // version the client read
Montant float64
IdempotencyKey uuid.UUID
}
func (s *EventStore) AppendEvents(
ctx context.Context,
aggregateID uuid.UUID,
expectedVersion int,
events []Event,
) error {
for i, event := range events {
_, err := s.db.ExecContext(ctx,
`INSERT INTO account_events (id, aggregate_id, version, event_type, payload)
VALUES ($1, $2, $3, $4, $5)`,
event.ID,
aggregateID,
expectedVersion+i+1,
event.Type,
event.Payload,
)
if isUniqueViolation(err) {
return ErrVersionConflict // caller can retry with the new version
}
if err != nil {
return fmt.Errorf("appending event: %w", err)
}
}
return nil
}
By returning ErrVersionConflict, we leave the decision to retry or surface
the error to the user to the caller. In a financial system, you often surface an explicit error
("your operation was cancelled because the account was modified in the meantime") rather than retrying silently.
Idempotent projections
Projections consume the event stream to build denormalized views — the current balance of an account, the order list for a customer, etc. With Kafka or any at-least-once system, the same event can arrive multiple times. The projection must produce the same result whether it sees it once or ten times.
Approach 1: tracking position in the stream
Each projection remembers the ID of the last event it processed. Events older than or equal to the checkpoint are ignored.
CREATE TABLE projection_checkpoints (
projection_name VARCHAR(100) PRIMARY KEY,
last_event_id UUID NOT NULL,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
This approach works well when events are ordered and IDs can be compared (UUIDs v7 or sequences). It assumes the event store guarantees a stable order, which is generally true per aggregate but may be less so across different aggregates.
Approach 2: idempotent upsert
Rather than filtering duplicates upstream, write in a way that rewriting the same data is harmless.
PostgreSQL's INSERT ON CONFLICT DO UPDATE instruction is built for this:
INSERT INTO account_balances (account_id, balance, last_event_id)
VALUES ($1, $2, $3)
ON CONFLICT (account_id) DO UPDATE
SET balance = EXCLUDED.balance,
last_event_id = EXCLUDED.last_event_id
WHERE account_balances.last_event_id < EXCLUDED.last_event_id;
-- The WHERE clause prevents overwriting a more recent state with an older event
The WHERE clause matters. Without it, if an old event arrives after a recent event (which
can happen with multiple Kafka partitions), you overwrite a correct state with a stale one. With it,
you only update if the incoming event is newer than what you already have.
In practice, both approaches are complementary: the checkpoint avoids replaying thousands of events unnecessarily, and the idempotent upsert acts as a safety net for duplicates that slip through anyway.
The Outbox pattern — publishing events without data loss
Suppose your system saves events in PostgreSQL and publishes them to Kafka for other services. If the process crashes after the PostgreSQL commit but before publishing to Kafka, the event is lost from the perspective of consumers. If you reverse the order, you might publish an event that the transaction will then rollback. It's impossible to do an atomic commit across two independent systems without a distributed coordinator.
The Outbox pattern sidesteps the problem without two-phase commit:
- In the same PostgreSQL transaction: save the event and write it to an
outboxtable - A dedicated worker reads
outboxand publishes to Kafka - Once successfully published, mark the row as sent
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
published BOOLEAN DEFAULT FALSE,
created_at TIMESTAMPTZ DEFAULT NOW(),
published_at TIMESTAMPTZ
);
func (s *Store) SaveEventAndOutbox(ctx context.Context, tx *sqlx.Tx, event Event) error {
// 1. Save the event in the event store
if err := s.saveEvent(ctx, tx, event); err != nil {
return err
}
// 2. Write to the outbox — same transaction, same commit or same rollback
_, err := tx.ExecContext(ctx,
`INSERT INTO outbox (event_type, payload) VALUES ($1, $2)`,
event.Type,
event.Payload,
)
return err
}
// Separate worker — runs in the background
func (w *OutboxWorker) Run(ctx context.Context) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := w.publishPending(ctx); err != nil {
slog.Error("outbox publish failed", "error", err)
}
case <-ctx.Done():
return
}
}
}
func (w *OutboxWorker) publishPending(ctx context.Context) error {
rows, err := w.db.QueryContext(ctx,
`SELECT id, event_type, payload FROM outbox
WHERE published = FALSE
ORDER BY created_at
LIMIT 100`,
)
if err != nil {
return fmt.Errorf("querying outbox: %w", err)
}
defer rows.Close()
for rows.Next() {
var id uuid.UUID
var eventType string
var payload []byte
if err := rows.Scan(&id, &eventType, &payload); err != nil {
return fmt.Errorf("scanning outbox row: %w", err)
}
if err := w.kafka.Publish(ctx, eventType, payload); err != nil {
return fmt.Errorf("publishing event %s: %w", id, err)
}
_, err = w.db.ExecContext(ctx,
`UPDATE outbox SET published = TRUE, published_at = NOW() WHERE id = $1`,
id,
)
if err != nil {
return fmt.Errorf("marking event published %s: %w", id, err)
}
}
return rows.Err()
}
The worker may publish the same event twice if it crashes after the Kafka publish but before the
UPDATE outbox. That's at-least-once delivery, not exactly-once. Kafka consumers
must therefore be idempotent — which the upsert approach described in the previous section guarantees.
For high-volume systems, polling every 100ms can be replaced by PostgreSQL LISTEN/NOTIFY to trigger the worker immediately after each outbox insertion, without waiting for the next tick.
Summary — what protects what
| Problem | Solution |
|---|---|
| Command received twice (client retry) | Idempotency key in processed_commands |
| Two concurrent commands on the same aggregate | Optimistic locking (UNIQUE constraint on version) |
| Event published twice by the broker | Idempotent projection (checkpoint or upsert) |
| Crash between save and publish | Outbox pattern (atomicity via transaction) |
Each layer protects against a different type of duplicate. The idempotency key doesn't replace optimistic locking, and the outbox doesn't make the projection idempotent — these are orthogonal guarantees that each cover a distinct failure point.
Conclusion
Idempotency in CQRS/Event Sourcing is not a detail you add after the fact — it's what separates a system that works in a demo from one that holds up in production. Retries happen. Processes crash. Kafka re-delivers messages. That's not paranoia, it's the normal behavior of any distributed system under load.
The good news: these four patterns are well-known, well-supported by PostgreSQL, and assemble cleanly in Go. Once in place, they work silently — and that's exactly what you expect from them. The "duplicate payment" ticket that never comes in, that's the real success.