The consumer inbox pattern: idempotent message processing under at-least-once delivery

Stop duplicate charges and double emails when brokers redeliver. How the inbox pattern records message intent in your database, pairs with the transactional outbox, and stays operable in production.

Author: Matheus Palma9 min read
Software engineeringBackendDistributed systemsArchitecturePostgreSQLReliabilityMessaging

Your payment worker consumes PaymentCaptured from a queue. The handler charges the ledger, sends a receipt email, and acknowledges the message. The process crashes after the email API returns 202 Accepted but before the broker sees the ack. The message is delivered again. The customer receives two receipts; finance sees two ledger lines unless someone intervenes manually. The broker did nothing wrong—it guarantees at-least-once delivery. Your code assumed exactly-once side effects without encoding that assumption anywhere durable.

The consumer inbox (sometimes called the inbox pattern or idempotent consumer) fixes this by treating each incoming message as a first-class row in the same database that holds your business state. You record intent to process a message before irreversible work, commit side effects in the same transaction as marking the message handled, and on redelivery short-circuit to the stored outcome. In consulting work on event-driven backends, this is the companion pattern to the transactional outbox: outbox makes publishing reliable; inbox makes consumption safe.

Why at-least-once is the default—and what it breaks

Message brokers (SQS, RabbitMQ, Kafka consumers with manual commits, Google Pub/Sub pull) overwhelmingly deliver with at-least-once semantics when you need durability and horizontal scale. Duplicates appear from:

  • Crash between side effect and ack — the classic partial-success window.
  • Visibility timeout expiry — the broker assumes the consumer died and redelivers while the first attempt is still running.
  • Producer retries — the outbox relay or HTTP webhook sender may publish the same logical event twice.
  • Rebalances and consumer group changes — partition reassignment can cause in-flight messages to be read again.

Client-side idempotency keys on HTTP APIs protect inbound duplicate requests from partners. They do not protect async consumers unless you map each message to an equivalent key and persist outcomes the same way. The inbox is that mapping, optimized for stream processing rather than request/response.

Inbox vs other deduplication tools

ApproachBest forLimitation
In-memory dedup cache (Redis SET NX)Very high volume, short retentionLost on eviction; cross-instance races without careful TTL; not a source of truth for audits
Broker deduplication id (SQS FIFO, some Kafka configs)Transport-level suppressionDoes not cover partial success inside your service; vendor-specific
Idempotency keys on HTTPSynchronous APIsAwkward for long-running consumers; payload compatibility rules differ
Consumer inbox tableDurable, auditable, transactional with domain writesRequires schema and cleanup; adds latency on first sight of each message

The inbox shines when side effects and deduplication state must commit atomically—the same property that motivates the outbox on the publish side.

Core design

Message identity

Every message needs a stable message_id scoped to your system:

  • Prefer the broker’s deduplication id when present (SQS FIFO MessageDeduplicationId, Kafka headers).
  • Otherwise derive from (topic, partition, offset) or a business idempotency key embedded in the payload (event_id, correlation_id).
  • Document whether logical duplicates (same business event, new transport id) should collapse—usually yes for OrderPaid, no for distinct LineItemAdded rows.

Inbox schema

A minimal PostgreSQL inbox:

ColumnPurpose
message_idPrimary key (or unique with consumer_name)
consumer_nameDisambiguates multiple handlers on one bus
payloadJSON snapshot for replay/debug (optional if you can refetch)
statuspending, processing, completed, failed
resultJSON outcome or error summary for completed/failed
received_atObservability, retention policies
processed_atLag metrics

Use a unique constraint on (consumer_name, message_id) so concurrent deliveries race to one winner.

Processing flow

sequenceDiagram
  participant Broker
  participant Consumer
  participant DB as Database

  Broker->>Consumer: deliver message M
  Consumer->>DB: INSERT inbox (pending) ON CONFLICT DO NOTHING
  alt already completed
    DB-->>Consumer: existing row completed
    Consumer->>Broker: ack (no side effects)
  else first sight
    Consumer->>DB: BEGIN; mark processing; apply domain writes; mark completed; COMMIT
    Consumer->>Broker: ack
  end

The critical invariant: irreversible side effects happen only inside a transaction that also marks the inbox row completed. If the transaction rolls back, the message stays pending or failed and a redelivery can retry safely.

INSERT … ON CONFLICT as the front door

On delivery, attempt an insert:

INSERT INTO consumer_inbox (consumer_name, message_id, payload, status, received_at)
VALUES ($1, $2, $3::jsonb, 'pending', now())
ON CONFLICT (consumer_name, message_id) DO NOTHING
RETURNING *;
  • No row returned, conflict — another worker owns or finished the message. SELECT status: if completed, ack and exit; if processing, either wait/backoff or rely on a stale-processing sweeper.
  • Row returned — you are the primary processor; proceed inside a transaction.

This is cheaper than SELECT FOR UPDATE on every message and avoids application-level locks for the common “duplicate delivery after success” case.

Pairing with the transactional outbox

Teams often implement outbox on publish and forget inbox on consume. That asymmetry produces systems where events are never lost but handlers are not safe.

A robust pipeline:

  1. Producer service — business write + outbox row in one DB transaction; relay publishes to the broker.
  2. Consumer service — inbox insert + domain mutation in one DB transaction; ack after commit.

End-to-end effectively-once processing still requires idempotent downstream systems (payment gateways, email providers with dedup keys). The inbox guarantees your service does not amplify duplicates into the database.

Trade-offs and operational concerns

Retention and table growth

Inbox rows accumulate. Policies:

  • TTL purge for completed rows older than N days (keep aggregates in metrics first).
  • Partition by month if volume is high.
  • Archive to object storage for compliance before delete.

Do not delete failed rows until someone resolves or explicitly discards them—those are your incident artifacts.

Stuck processing rows

A worker can die after setting processing but before commit. Run a reaper job: rows in processing older than T minutes flip to pending or failed with a reason, allowing redelivery. Tune T above your p99 handler duration.

Ordering

The inbox deduplicates; it does not order messages. If OrderCreated and OrderCancelled can arrive out of order, you still need version vectors, state machines, or partition keys so per-aggregate serialization holds.

Latency

One extra round-trip to the database on every message is acceptable for most business domains. For firehose telemetry, use batching or transport-level dedup instead.

Practical example: Node.js consumer with PostgreSQL

The following is a self-contained sketch of a consumer that processes InventoryReserved events. It uses pg and assumes a table created with:

CREATE TABLE consumer_inbox (
  consumer_name text NOT NULL,
  message_id    text NOT NULL,
  payload       jsonb NOT NULL,
  status        text NOT NULL CHECK (status IN ('pending', 'processing', 'completed', 'failed')),
  result        jsonb,
  received_at   timestamptz NOT NULL DEFAULT now(),
  processed_at  timestamptz,
  PRIMARY KEY (consumer_name, message_id)
);

CREATE INDEX consumer_inbox_status_received
  ON consumer_inbox (status, received_at)
  WHERE status IN ('pending', 'processing');
import { Pool, type PoolClient } from "pg";

const CONSUMER = "inventory-reservation-handler";

type InventoryReserved = {
  messageId: string;
  orderId: string;
  sku: string;
  quantity: number;
};

export function createInventoryConsumer(pool: Pool) {
  return async function handle(raw: unknown): Promise<"ack" | "retry"> {
    const event = parseInventoryReserved(raw);
    const client = await pool.connect();

    try {
      const claim = await claimMessage(client, event);
      if (claim === "already_done") return "ack";
      if (claim === "in_flight_elsewhere") return "retry";

      await client.query("BEGIN");

      await client.query(
        `UPDATE consumer_inbox
         SET status = 'processing'
         WHERE consumer_name = $1 AND message_id = $2`,
        [CONSUMER, event.messageId]
      );

      await reserveStock(client, event);

      await client.query(
        `UPDATE consumer_inbox
         SET status = 'completed',
             result = $3::jsonb,
             processed_at = now()
         WHERE consumer_name = $1 AND message_id = $2`,
        [CONSUMER, event.messageId, JSON.stringify({ ok: true })]
      );

      await client.query("COMMIT");
      return "ack";
    } catch (err) {
      await client.query("ROLLBACK");
      await markFailed(pool, event.messageId, err);
      return "retry";
    } finally {
      client.release();
    }
  };
}

async function claimMessage(
  client: PoolClient,
  event: InventoryReserved
): Promise<"new" | "already_done" | "in_flight_elsewhere"> {
  const inserted = await client.query<{ status: string }>(
    `INSERT INTO consumer_inbox (consumer_name, message_id, payload, status)
     VALUES ($1, $2, $3::jsonb, 'pending')
     ON CONFLICT (consumer_name, message_id) DO NOTHING
     RETURNING status`,
    [CONSUMER, event.messageId, JSON.stringify(event)]
  );

  if (inserted.rowCount === 1) return "new";

  const existing = await client.query<{ status: string }>(
    `SELECT status FROM consumer_inbox
     WHERE consumer_name = $1 AND message_id = $2`,
    [CONSUMER, event.messageId]
  );

  const status = existing.rows[0]?.status;
  if (status === "completed") return "already_done";
  if (status === "processing") return "in_flight_elsewhere";
  return "new"; // pending or failed — this worker may take over
}

async function reserveStock(client: PoolClient, event: InventoryReserved) {
  const { rowCount } = await client.query(
    `UPDATE inventory
     SET reserved = reserved + $3
     WHERE sku = $1 AND on_hand - reserved >= $3`,
    [event.sku, event.orderId, event.quantity]
  );
  if (rowCount !== 1) {
    throw new Error(`insufficient_stock:${event.sku}`);
  }
}

async function markFailed(pool: Pool, messageId: string, err: unknown) {
  await pool.query(
    `UPDATE consumer_inbox
     SET status = 'failed',
         result = $3::jsonb,
         processed_at = now()
     WHERE consumer_name = $1 AND message_id = $2`,
    [CONSUMER, messageId, JSON.stringify({ error: String(err) })]
  );
}

function parseInventoryReserved(raw: unknown): InventoryReserved {
  // Validate with Zod/JSON Schema in production
  const o = raw as InventoryReserved;
  if (!o?.messageId || !o?.orderId) throw new Error("invalid_payload");
  return o;
}

Production hardening adds: structured logging with message_id, metrics on already_done vs new, a sweeper for stale processing, and alignment with your dead-letter policy when failed rows exceed a threshold.

Common mistakes and pitfalls

  1. Acking before commit — If you acknowledge the broker before the database transaction commits, you can lose messages on crash. Ack only after COMMIT (or use transactional consumption features your broker provides, which are rare).

  2. Side effects outside the transaction — Calling Stripe or SendGrid before marking completed reintroduces partial success. Push external calls after commit with an outbox, or use provider idempotency keys and still record intent in the inbox first.

  3. No unique key discipline — Using only orderId when multiple event types exist causes unrelated messages to collapse. Scope ids per event type or include event_type in the key.

  4. Treating inbox as a queue — Workers should not SELECT * FROM inbox WHERE pending as a primary job scheduler unless you add FOR UPDATE SKIP LOCKED and understand throughput limits. The broker remains the queue; the inbox is a dedup journal.

  5. Ignoring failed rowsfailed status without alerting becomes a silent data loss factory. Wire dashboards and runbooks; connect to DLQ redrive only after fixing root cause.

  6. Oversized payloads — Storing multi-megabyte blobs in payload bloats backups. Store a reference (s3://…, aggregate_id) and fetch when needed.

Conclusion

At-least-once delivery is not a bug—it is the price of durable, scalable messaging. The consumer inbox pattern makes that contract explicit: every message is recorded, processed at most once per consumer for side effects that matter, and redeliveries become cheap no-ops instead of duplicate charges.

Together with the transactional outbox, you get a practical end-to-end story: publish without losing events, consume without duplicating work. The implementation fits in a single table and a disciplined transaction boundary—exactly the kind of boring infrastructure that keeps event-driven products trustworthy as traffic and team size grow.

For background on related patterns on this site, see the articles on idempotency keys, the transactional outbox, and dead-letter queues. If you are designing consumption pipelines or hardening async workflows for production, About outlines the kind of systems work covered here; Contact is the place to start a conversation.

Subscribe to the newsletter

Get an email when new articles are published. No spam — only new posts from this blog.

Powered by Resend. You can unsubscribe from any email.