The consumer inbox pattern: idempotent message processing under at-least-once delivery
Stop duplicate charges and double emails when brokers redeliver. How the inbox pattern records message intent in your database, pairs with the transactional outbox, and stays operable in production.
Your payment worker consumes PaymentCaptured from a queue. The handler charges the ledger, sends a receipt email, and acknowledges the message. The process crashes after the email API returns 202 Accepted but before the broker sees the ack. The message is delivered again. The customer receives two receipts; finance sees two ledger lines unless someone intervenes manually. The broker did nothing wrong—it guarantees at-least-once delivery. Your code assumed exactly-once side effects without encoding that assumption anywhere durable.
The consumer inbox (sometimes called the inbox pattern or idempotent consumer) fixes this by treating each incoming message as a first-class row in the same database that holds your business state. You record intent to process a message before irreversible work, commit side effects in the same transaction as marking the message handled, and on redelivery short-circuit to the stored outcome. In consulting work on event-driven backends, this is the companion pattern to the transactional outbox: outbox makes publishing reliable; inbox makes consumption safe.
Why at-least-once is the default—and what it breaks
Message brokers (SQS, RabbitMQ, Kafka consumers with manual commits, Google Pub/Sub pull) overwhelmingly deliver with at-least-once semantics when you need durability and horizontal scale. Duplicates appear from:
- Crash between side effect and ack — the classic partial-success window.
- Visibility timeout expiry — the broker assumes the consumer died and redelivers while the first attempt is still running.
- Producer retries — the outbox relay or HTTP webhook sender may publish the same logical event twice.
- Rebalances and consumer group changes — partition reassignment can cause in-flight messages to be read again.
Client-side idempotency keys on HTTP APIs protect inbound duplicate requests from partners. They do not protect async consumers unless you map each message to an equivalent key and persist outcomes the same way. The inbox is that mapping, optimized for stream processing rather than request/response.
Inbox vs other deduplication tools
| Approach | Best for | Limitation |
|---|---|---|
In-memory dedup cache (Redis SET NX) | Very high volume, short retention | Lost on eviction; cross-instance races without careful TTL; not a source of truth for audits |
| Broker deduplication id (SQS FIFO, some Kafka configs) | Transport-level suppression | Does not cover partial success inside your service; vendor-specific |
| Idempotency keys on HTTP | Synchronous APIs | Awkward for long-running consumers; payload compatibility rules differ |
| Consumer inbox table | Durable, auditable, transactional with domain writes | Requires schema and cleanup; adds latency on first sight of each message |
The inbox shines when side effects and deduplication state must commit atomically—the same property that motivates the outbox on the publish side.
Core design
Message identity
Every message needs a stable message_id scoped to your system:
- Prefer the broker’s deduplication id when present (SQS FIFO
MessageDeduplicationId, Kafka headers). - Otherwise derive from
(topic, partition, offset)or a business idempotency key embedded in the payload (event_id,correlation_id). - Document whether logical duplicates (same business event, new transport id) should collapse—usually yes for
OrderPaid, no for distinctLineItemAddedrows.
Inbox schema
A minimal PostgreSQL inbox:
| Column | Purpose |
|---|---|
message_id | Primary key (or unique with consumer_name) |
consumer_name | Disambiguates multiple handlers on one bus |
payload | JSON snapshot for replay/debug (optional if you can refetch) |
status | pending, processing, completed, failed |
result | JSON outcome or error summary for completed/failed |
received_at | Observability, retention policies |
processed_at | Lag metrics |
Use a unique constraint on (consumer_name, message_id) so concurrent deliveries race to one winner.
Processing flow
sequenceDiagram
participant Broker
participant Consumer
participant DB as Database
Broker->>Consumer: deliver message M
Consumer->>DB: INSERT inbox (pending) ON CONFLICT DO NOTHING
alt already completed
DB-->>Consumer: existing row completed
Consumer->>Broker: ack (no side effects)
else first sight
Consumer->>DB: BEGIN; mark processing; apply domain writes; mark completed; COMMIT
Consumer->>Broker: ack
end
The critical invariant: irreversible side effects happen only inside a transaction that also marks the inbox row completed. If the transaction rolls back, the message stays pending or failed and a redelivery can retry safely.
INSERT … ON CONFLICT as the front door
On delivery, attempt an insert:
INSERT INTO consumer_inbox (consumer_name, message_id, payload, status, received_at)
VALUES ($1, $2, $3::jsonb, 'pending', now())
ON CONFLICT (consumer_name, message_id) DO NOTHING
RETURNING *;
- No row returned, conflict — another worker owns or finished the message.
SELECTstatus: ifcompleted, ack and exit; ifprocessing, either wait/backoff or rely on a stale-processing sweeper. - Row returned — you are the primary processor; proceed inside a transaction.
This is cheaper than SELECT FOR UPDATE on every message and avoids application-level locks for the common “duplicate delivery after success” case.
Pairing with the transactional outbox
Teams often implement outbox on publish and forget inbox on consume. That asymmetry produces systems where events are never lost but handlers are not safe.
A robust pipeline:
- Producer service — business write + outbox row in one DB transaction; relay publishes to the broker.
- Consumer service — inbox insert + domain mutation in one DB transaction; ack after commit.
End-to-end effectively-once processing still requires idempotent downstream systems (payment gateways, email providers with dedup keys). The inbox guarantees your service does not amplify duplicates into the database.
Trade-offs and operational concerns
Retention and table growth
Inbox rows accumulate. Policies:
- TTL purge for
completedrows older than N days (keep aggregates in metrics first). - Partition by month if volume is high.
- Archive to object storage for compliance before delete.
Do not delete failed rows until someone resolves or explicitly discards them—those are your incident artifacts.
Stuck processing rows
A worker can die after setting processing but before commit. Run a reaper job: rows in processing older than T minutes flip to pending or failed with a reason, allowing redelivery. Tune T above your p99 handler duration.
Ordering
The inbox deduplicates; it does not order messages. If OrderCreated and OrderCancelled can arrive out of order, you still need version vectors, state machines, or partition keys so per-aggregate serialization holds.
Latency
One extra round-trip to the database on every message is acceptable for most business domains. For firehose telemetry, use batching or transport-level dedup instead.
Practical example: Node.js consumer with PostgreSQL
The following is a self-contained sketch of a consumer that processes InventoryReserved events. It uses pg and assumes a table created with:
CREATE TABLE consumer_inbox (
consumer_name text NOT NULL,
message_id text NOT NULL,
payload jsonb NOT NULL,
status text NOT NULL CHECK (status IN ('pending', 'processing', 'completed', 'failed')),
result jsonb,
received_at timestamptz NOT NULL DEFAULT now(),
processed_at timestamptz,
PRIMARY KEY (consumer_name, message_id)
);
CREATE INDEX consumer_inbox_status_received
ON consumer_inbox (status, received_at)
WHERE status IN ('pending', 'processing');
import { Pool, type PoolClient } from "pg";
const CONSUMER = "inventory-reservation-handler";
type InventoryReserved = {
messageId: string;
orderId: string;
sku: string;
quantity: number;
};
export function createInventoryConsumer(pool: Pool) {
return async function handle(raw: unknown): Promise<"ack" | "retry"> {
const event = parseInventoryReserved(raw);
const client = await pool.connect();
try {
const claim = await claimMessage(client, event);
if (claim === "already_done") return "ack";
if (claim === "in_flight_elsewhere") return "retry";
await client.query("BEGIN");
await client.query(
`UPDATE consumer_inbox
SET status = 'processing'
WHERE consumer_name = $1 AND message_id = $2`,
[CONSUMER, event.messageId]
);
await reserveStock(client, event);
await client.query(
`UPDATE consumer_inbox
SET status = 'completed',
result = $3::jsonb,
processed_at = now()
WHERE consumer_name = $1 AND message_id = $2`,
[CONSUMER, event.messageId, JSON.stringify({ ok: true })]
);
await client.query("COMMIT");
return "ack";
} catch (err) {
await client.query("ROLLBACK");
await markFailed(pool, event.messageId, err);
return "retry";
} finally {
client.release();
}
};
}
async function claimMessage(
client: PoolClient,
event: InventoryReserved
): Promise<"new" | "already_done" | "in_flight_elsewhere"> {
const inserted = await client.query<{ status: string }>(
`INSERT INTO consumer_inbox (consumer_name, message_id, payload, status)
VALUES ($1, $2, $3::jsonb, 'pending')
ON CONFLICT (consumer_name, message_id) DO NOTHING
RETURNING status`,
[CONSUMER, event.messageId, JSON.stringify(event)]
);
if (inserted.rowCount === 1) return "new";
const existing = await client.query<{ status: string }>(
`SELECT status FROM consumer_inbox
WHERE consumer_name = $1 AND message_id = $2`,
[CONSUMER, event.messageId]
);
const status = existing.rows[0]?.status;
if (status === "completed") return "already_done";
if (status === "processing") return "in_flight_elsewhere";
return "new"; // pending or failed — this worker may take over
}
async function reserveStock(client: PoolClient, event: InventoryReserved) {
const { rowCount } = await client.query(
`UPDATE inventory
SET reserved = reserved + $3
WHERE sku = $1 AND on_hand - reserved >= $3`,
[event.sku, event.orderId, event.quantity]
);
if (rowCount !== 1) {
throw new Error(`insufficient_stock:${event.sku}`);
}
}
async function markFailed(pool: Pool, messageId: string, err: unknown) {
await pool.query(
`UPDATE consumer_inbox
SET status = 'failed',
result = $3::jsonb,
processed_at = now()
WHERE consumer_name = $1 AND message_id = $2`,
[CONSUMER, messageId, JSON.stringify({ error: String(err) })]
);
}
function parseInventoryReserved(raw: unknown): InventoryReserved {
// Validate with Zod/JSON Schema in production
const o = raw as InventoryReserved;
if (!o?.messageId || !o?.orderId) throw new Error("invalid_payload");
return o;
}
Production hardening adds: structured logging with message_id, metrics on already_done vs new, a sweeper for stale processing, and alignment with your dead-letter policy when failed rows exceed a threshold.
Common mistakes and pitfalls
-
Acking before commit — If you acknowledge the broker before the database transaction commits, you can lose messages on crash. Ack only after
COMMIT(or use transactional consumption features your broker provides, which are rare). -
Side effects outside the transaction — Calling Stripe or SendGrid before marking
completedreintroduces partial success. Push external calls after commit with an outbox, or use provider idempotency keys and still record intent in the inbox first. -
No unique key discipline — Using only
orderIdwhen multiple event types exist causes unrelated messages to collapse. Scope ids per event type or includeevent_typein the key. -
Treating inbox as a queue — Workers should not
SELECT * FROM inbox WHERE pendingas a primary job scheduler unless you addFOR UPDATE SKIP LOCKEDand understand throughput limits. The broker remains the queue; the inbox is a dedup journal. -
Ignoring failed rows —
failedstatus without alerting becomes a silent data loss factory. Wire dashboards and runbooks; connect to DLQ redrive only after fixing root cause. -
Oversized payloads — Storing multi-megabyte blobs in
payloadbloats backups. Store a reference (s3://…,aggregate_id) and fetch when needed.
Conclusion
At-least-once delivery is not a bug—it is the price of durable, scalable messaging. The consumer inbox pattern makes that contract explicit: every message is recorded, processed at most once per consumer for side effects that matter, and redeliveries become cheap no-ops instead of duplicate charges.
Together with the transactional outbox, you get a practical end-to-end story: publish without losing events, consume without duplicating work. The implementation fits in a single table and a disciplined transaction boundary—exactly the kind of boring infrastructure that keeps event-driven products trustworthy as traffic and team size grow.
For background on related patterns on this site, see the articles on idempotency keys, the transactional outbox, and dead-letter queues. If you are designing consumption pipelines or hardening async workflows for production, About outlines the kind of systems work covered here; Contact is the place to start a conversation.
Assine a newsletter
Receba um e-mail quando novos artigos forem publicados. Sem spam — apenas novos posts deste blog.
Via Resend. Você pode cancelar a inscrição em qualquer e-mail.