Skip to main content

processing uk rail data in real-time

·6 mins

Railway systems generate an endless stream of real-time data that needs to be captured, processed, and stored efficiently. When I was approached for a freelance project to build a Kafka consumer in Go using franz-go and integrate it with PostgreSQL, I knew this would be an interesting challenge. The result was a service designed to reliably consume real-time train information and persist it into a database. What started as a straightforward Kafka consumer evolved into a robust system, capable of handling millions of daily messages about train movements, schedules, and disruptions across the rail network. This project demonstrates modern Go service architecture, robust data processing patterns, and integration with industry-standard messaging and storage systems.

Working with real-time railway data presents unique challenges. Trains don’t always run according to schedule, and tracking these deviations requires a system that’s both resilient and highly available. The solution leverages Go’s concurrency model to efficiently process messages whilst maintaining data consistency. Through careful consideration of failure scenarios, I’ve built a system that consumers of this data can rely on to receive accurate, timely information about train movements and delays for integration into their own applications.

In this post, I’ll walk through some of the key architectural decisions, challenges faced, and lessons learned while building this system. Whether you’re working with Kafka, building data processing pipelines, or interested in Go-based microservices, you’ll find practical insights from my experience developing this project.

Project Overview #

The service serves as a critical component in a rail information system, bridging the gap between real-time data streams and persistent storage. Its primary responsibilities are:

  1. Consuming train data from Darwin JSON Kafka Topic
  2. Processing and validating incoming messages
  3. Storing processed data in a PostgreSQL database using table partitioning
  4. Providing health checks and monitoring endpoints

System Architecture #

Message Processing Pipeline #

The system processes railway data through a series of well-defined stages, starting with Kafka consumption and ending with persistent storage. At its core, it uses the franz-go library for reliable Kafka interaction. The consumer supports SASL authentication and configurable consumer groups for secure, scalable message processing.

Message flow is managed through Go channels, providing natural backpressure and buffering. Messages accumulate in the channel before being passed to the database layer. While the system currently processes messages individually, it’s designed to support batch processing if needed. This approach keeps the implementation simple while leaving room for optimization.

func (c *Consumer) processMessages(ctx context.Context, msg kafka.Message) error {
   select {
   case c.messageChan <- msg:
       return nil
   case <-ctx.Done():
       return ctx.Err()
   }
}

Error handling and shutdown procedures ensure system reliability. When errors occur, detailed logs capture message context and error metadata. During shutdown, the system:

  • Stops consuming new messages
  • Processes remaining buffered messages
  • Commits Kafka offsets
  • Closes database connections gracefully

Database Management #

PostgreSQL serves as the persistent storage layer, using table partitioning for efficient data management. The base table structure uses RANGE partitioning on the created_at column:

CREATE TABLE IF NOT EXISTS messages (
    id UUID,
    message JSONB NOT NULL,
    created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (id, created_at)
)
PARTITION BY
    RANGE (created_at);

For a detailed explanation of PostgreSQL table partitioning patterns and benefits, see my article on PostgreSQL Table Partitioning.

The service manages partitions through a dedicated partitioner that runs both on startup and periodically:

type Partitioner struct {
    db *sql.DB
}

func (p *Partitioner) Ensure(ctx context.Context, date time.Time) error {
    tableName := fmt.Sprintf("messages_%04d_%02d_%02d",
        date.Year(), date.Month(), date.Day())

    startDate := time.Date(date.Year(), date.Month(), date.Day(),
        0, 0, 0, 0, time.UTC)
    endDate := startDate.AddDate(0, 0, 1)

    query := fmt.Sprintf(`
        CREATE TABLE IF NOT EXISTS %s PARTITION OF messages
        FOR VALUES FROM ('%s') TO ('%s')`,
        tableName, startDate.Format("2006-01-02 15:04:05"),
        endDate.Format("2006-01-02 15:04:05"))

    _, err := p.db.ExecContext(ctx, query)
    return err
}

func (p *Partitioner) Start(ctx context.Context) {
    // Create partitions for today and tomorrow on startup
    now := time.Now()
    p.Ensure(ctx, now)
    p.Ensure(ctx, now.AddDate(0, 0, 1))

    // Run partition creation every 6 hours
    ticker := time.NewTicker(6 * time.Hour)
    go func() {
        for {
            select {
            case <-ticker.C:
                if err := p.Ensure(ctx, time.Now()); err != nil {
                    log.Printf("failed to ensure partition: %v", err)
                }
            case <-ctx.Done():
                ticker.Stop()
                return
            }
        }
    }()
}

The 6-hour interval provides redundancy in partition creation, preventing gaps if the service experiences downtime. Creating tomorrow’s partition on startup ensures message processing continues seamlessly across midnight UTC.

Testing Strategy #

Integration tests form the backbone of the testing approach. Using Docker containers, tests run against real Kafka and PostgreSQL instances:

func TestMessageProcessing(t *testing.T) {
   kafka := test.NewKafkaContainer(t)
   postgres := test.NewPostgresContainer(t)

   // Test setup and assertions
}

Utility packages handle container lifecycle and test data setup, making it straightforward to write comprehensive tests. While unit tests with mocks supplement coverage, integration tests have proven more valuable for catching real-world issues, particularly around:

  • Transaction handling
  • Error scenarios
  • Message ordering
  • Database interactions

Deployment #

The service runs on a Virtual Private Server (VPS), deployed alongside its PostgreSQL instance. This co-location minimises latency for database operations. Kafka runs as an external service that the system connects to - I have no visibility into its configuration or infrastructure.

The VPS runs Fedora Linux, which uses systemd as its init system and service manager. With systemd being a core part of Fedora, there’s no need to install additional process managers like Supervisor or PM2. This reduces complexity and potential points of failure. systemd provides process supervision, automatic restarts on failure, and centralised logging through journald. Here’s the systemd service configuration:

[Unit]
Description=Railway Service Systemd
ConditionPathExists=/opt/railway/railway-linux-amd64
After=network.target

[Service]
ExecStart=/opt/railway/railway-linux-amd64 -config=/etc/railway/railway.yaml
Restart=always

Before moving to production, I needed to validate that the service worked correctly with the provided VPS architecture and infrastructure. To achieve this, the service underwent a thorough 7-day staging validation period.

This validation phase was crucial for verifying several critical aspects of the system. The service successfully handled server restarts, maintaining data consistency and resuming message processing automatically. Database writes remained reliable throughout the test period.

No manual intervention was needed during this validation phase, demonstrating the robustness of the systemd configuration and overall system design. This gave confidence that the service would operate reliably in the production environment using the provided infrastructure.

Conclusion #

Building reliable data pipelines requires careful attention to detail. This project showcases that through its clean architecture and robust Kafka consumer implementation. The production deployment on Fedora, with each component designed with reliability in mind, keeps the service running smoothly.

The testing approach proved particularly valuable. Integration tests with containerised dependencies caught real-world issues early. Error handling and monitoring ensure problems are visible and quickly resolved. systemd’s built-in features provide robust process management.

What started as a simple Kafka consumer evolved into a production-ready service. It reliably processes railway data day after day, demonstrating how Go’s simplicity and powerful concurrency model make it an excellent choice for data processing systems.