Posted on ::

When you're ingesting a continuous external stream — live transit data, market ticks, IoT telemetry, whatever — the consumer's job and the job of interpreting what was consumed are tempting to bundle together. They shouldn't be. This is the pattern behind the Darwin Kafka ingestion in Headcode, my UK rail data API, though it generalises well beyond rail data: a dumb, fast consumer that lands raw messages in Postgres, and a separate process (not covered here) that interprets them later. This section covers just the consumer.

Wiring: one process, two cooperating components

A single process runs a Kafka consumer and a Postgres writer side by side, connected by a channel:

msgs := make(chan RawMessage, 1000)
saver := &Saver{Pool: pool}

consumer, err := kafka.NewConsumer(&kafka.Config{
    Broker:        cfg.Broker,
    Topic:         cfg.Topic,
    Group:         cfg.ConsumerGroup,
    MessageChan:   msgs,
    ConsumeOffset: kgo.NewOffset().AtEnd(),
})

group := errgroup.WithCancelOnError(ctx)
group.Go(consumer.Run)
group.Go(func(ctx context.Context) error { return saver.Run(ctx, msgs) })

The consumer decodes Kafka records into a lightweight RawMessage struct and pushes them onto a buffered channel. The saver drains that channel, batches what it finds, and writes it to Postgres. Neither side knows anything about the other's internals — the channel is the entire contract.

(errgroup.WithCancelOnError here is a thin wrapper around the standard golang.org/x/sync/errgroup, not a function from the package itself. It's errgroup.WithContext(ctx) under the hood, which already cancels the group's context on the first non-nil error. kafka.NewConsumer/kafka.Config are likewise thin wrappers, this time from headcode, a Kafka setup package of mine. It's not stdlib or a third-party client, but it builds on real franz-go (kgo) types underneath, which is why ConsumeOffset takes a kgo.Offset directly.)

ConsumeOffset: AtEnd() is worth calling out: a fresh consumer group starts at the live end of the topic, not the beginning. A real-time feed is a stream, not a bounded dataset — there's no value in replaying historical Kafka retention on first boot. Once the consumer group has committed an offset, restarts resume from there as normal.

Per-partition fan-out

Rather than a single poll-and-process loop, the consumer follows a rebalance-callback model. When the consumer group assigns partitions, a dedicated goroutine spins up per partition:

func (c *Consumer) onPartitionsAssigned(assigned map[string][]int32) {
    for topic, partitions := range assigned {
        for _, partition := range partitions {
            worker := &partitionWorker{records: make(chan []Record, 10), out: c.out}
            if c.workers[topic] == nil {
                c.workers[topic] = make(map[int32]*partitionWorker)
            }
            c.workers[topic][partition] = worker
            go worker.consume()
        }
    }
}

The main poll loop just fetches and routes each batch to the right partition's channel:

fetches.EachPartition(func(p FetchPartition) {
    worker := c.workers[p.Topic][p.Partition]
    worker.records <- p.Records
})

This keeps partitions independent — a slow or stuck worker for one partition can't block the others — and it falls out naturally from how Kafka already parallelises a topic. When a rebalance takes a partition away, the corresponding goroutine is torn down via a quit channel.

Unwrapping the envelope

Many real-time feeds don't put a clean JSON object on the wire — messages arrive wrapped in some transport envelope (SOAP, JMS-style headers, a custom framing format) with the useful fields nested a few levels down. Each partition worker unwraps this before anything else happens:

func (w *partitionWorker) consume() {
    for recs := range w.records {
        for _, rec := range recs {
            decoded, err := envelope.Unwrap(rec.Value)
            if err != nil {
                log.Warn("discarding undecodable record", "offset", rec.Offset, "err", err)
                continue
            }

            w.out <- RawMessage{
                ID:             uuid.New(),
                KafkaOffset:    rec.Offset,
                KafkaPartition: rec.Partition,
                MessageKind:    decoded.Kind,
                Payload:        decoded.Payload, // untouched
                ReceivedAt:     time.Now().UTC(),
            }
        }
    }
}

Three things to note: undecodable records are logged and dropped rather than killing the consumer (a malformed message shouldn't take down the feed); the payload itself is kept as raw bytes, not parsed into a domain struct here, since interpreting it is the next stage's problem; and the id is generated here, at construction time, rather than left to a Postgres default. That matters once a batch gets retried: the same RawMessage carries the same id on every retry attempt, so the primary key acts as a second line of defense alongside the unique index below. If id were instead generated by the database (DEFAULT gen_random_uuid()), a retried insert would get a fresh id each time and the primary key would do nothing to stop duplicates. The consumer's only opinions are about envelope metadata: what kind of message is this, where did it sit in the Kafka log.

Batched, idempotent writes

The saver doesn't insert one row per message — it accumulates a batch and flushes on whichever comes first: N messages or a short timeout:

const (
    batchSize    = 500
    batchTimeout = 250 * time.Millisecond
)

That's the standard small-batch-or-timeout pattern for trading a little latency for a lot of write throughput. The more interesting design choice is what happens when a flush fails: it doesn't propagate the error — it retries forever with capped exponential backoff.

The consumer process runs its goroutines under a cancel-on-error supervisor — if any goroutine returns an error, the whole process shuts down. A transient Postgres blip (a restart, a dropped connection) shouldn't be allowed to kill Kafka consumption as a side effect. So the saver swallows those errors locally and keeps retrying, and the insert itself is written to make that safe:

INSERT INTO raw_message (id, kafka_offset, kafka_partition, message_kind, payload, received_at)
VALUES (...)
ON CONFLICT (kafka_partition, kafka_offset, received_at) DO NOTHING

Retrying a batch that partially succeeded is a no-op for the rows that already landed. This is the same idempotency principle Kafka consumers always need — "at least once delivery, deduplicated by something the producer can't accidentally vary" — applied at the storage layer instead of via consumer-group offset tricks.

That guarantee is narrower than it sounds, though. It covers retries of an in-flight batch, where the same RawMessage structs (and so the same received_at values) get reinserted. It doesn't cover a consumer restart. On restart, a partition worker resumes from the last committed offset and re-decodes those records from scratch, calling time.Now().UTC() again for messages it may have already landed under the previous run. Since received_at is part of the conflict target, the new rows don't collide with the old ones; they insert as duplicates of the same (kafka_partition, kafka_offset).

The clean fix would be a unique index on (kafka_partition, kafka_offset) alone, but Postgres requires a partitioned table's unique indexes to include the partition key, which here is received_at. So that index isn't available without restructuring the table away from range-partitioning by received_at. For now this gap is accepted: the landing table can end up with rare restart-triggered duplicate rows for the same Kafka offset, and the downstream interpretation stage is expected to tolerate that the same way it already tolerates at-least-once delivery from Kafka itself. A feed that exposes a stable producer-side message ID would close the gap properly — but only by repartitioning on that ID instead of received_at, since whatever the partition key is still has to be part of the unique index either way.

The landing table is the contract

The raw landing table is deliberately unopinionated:

CREATE TABLE raw_message (
    id              UUID        NOT NULL,
    kafka_offset    BIGINT      NOT NULL,
    kafka_partition INT         NOT NULL,
    message_kind    TEXT        NOT NULL,
    payload         JSONB       NOT NULL,
    received_at     TIMESTAMPTZ NOT NULL DEFAULT now(),
    PRIMARY KEY (id, received_at)
) PARTITION BY RANGE (received_at);

CREATE UNIQUE INDEX ON raw_message (kafka_partition, kafka_offset, received_at);

Kafka offset, message kind, and a JSONB blob. No domain schema yet — just enough structure to find a message again later (by kind, by time) and to know where it sat in the Kafka log. The table is partitioned by received_at, which keeps both writes and later range-scans cheap as the feed accumulates.

A separate cursor table tracks, per named downstream consumer, the last Kafka partition/offset it has fully processed — distinct from Kafka's own consumer-group offsets, used by the interpretation stage to know where to resume.

Why split it this way

The architectural bet here is that capture and interpretation should fail independently. A real-time feed's message schema tends to have version quirks, optional fields, and edge cases that take real iteration to get right — exactly the kind of code that should be safe to redeploy, roll back, and fix forward without any risk to the underlying data. By making the consumer's only job "get bytes off Kafka and into a JSONB column, durably, in order," that stage becomes simple enough to basically never need to change. All of the business logic — and all of the future bugs — live downstream, where they can be replayed against the raw table as many times as needed without ever going back to Kafka. For Headcode, that's where the genuinely hard part lives: reconciling CRS, TIPLOC, and STANOX codes into a single identifier, exactly the kind of logic that benefits most from being safe to iterate on without re-touching the Kafka offsets underneath it. The Kafka topic's retention window stops being a hard deadline; the real replay buffer is Postgres, and it doesn't expire.