Skip to main content

building a mqtt client in go

·9 mins

Building a MQTT client in Go #

MQTT (Message Queuing Telemetry Transport) connects millions of IoT devices worldwide, from smart home sensors to industrial monitoring systems. Whilst many Go developers reach for existing libraries, building your own client wrapper teaches you the protocol’s nuances and gives you precise control over connection handling, message routing, and error recovery.

This article walks through building a production-ready MQTT client wrapper using the Eclipse Paho Go library. You’ll see how to handle concurrent message processing, implement topic wildcard matching, and manage connection lifecycle properly.

The anatomy of MQTT communication #

MQTT operates as a publish-subscribe protocol where clients connect to a central broker. Publishers send messages to named topics, subscribers register interest in topic patterns, and the broker routes messages accordingly.

The protocol’s power lies in its topic hierarchy and wildcard system. Topics use forward slashes as separators (sensors/temperature/living-room), whilst wildcards enable flexible subscriptions: + matches exactly one level (sensors/+/living-room catches all sensor types in the living room), and # matches zero or more levels (sensors/# catches everything under sensors).

Quality of Service (QoS) levels control delivery guarantees: QoS 0 sends messages once with no confirmation, QoS 1 ensures at-least-once delivery through acknowledgements, and QoS 2 provides exactly-once delivery via a four-step handshake.

Design decisions and architecture #

The client wrapper needs to solve several problems that basic MQTT libraries don’t address well: concurrent message handling, automatic reconnection with subscription restoration, topic-based message routing, and graceful shutdown coordination.

The core design uses a connection manager from the autopaho library for automatic reconnection, a handler registry that maps topic patterns to functions, and a wait group to coordinate graceful shutdown of message processors.

type Client struct {
    config      ClientConfig
    conn        *autopaho.ConnectionManager
    handlers    map[string]MessageHandler
    wg          *conc.WaitGroup
    mu          sync.Mutex
    handlersMu  sync.RWMutex
    isConnected bool
}

Two separate mutexes prevent deadlocks: mu protects connection state changes, whilst handlersMu guards the handler registry during concurrent read operations when messages arrive.

The MessageHandler type accepts a context and payload, enabling handlers to access topic information and implement timeouts:

type MessageHandler func(ctx context.Context, payload []byte) error

Implementing connection management #

Connection handling requires balancing automatic reconnection with application control. The autopaho library provides connection management, but you need to coordinate this with your handler system. Local brokers like Mosquitto running on your Home Assistant instance offer significant advantages: sub-millisecond latency, no internet dependency, and predictable connection behaviour.

func (c *Client) Connect(ctx context.Context) error {
    c.mu.Lock()
    defer c.mu.Unlock()

    if c.isConnected {
        return nil
    }

    if err := c.conn.AwaitConnection(ctx); err != nil {
        return fmt.Errorf("connecting to MQTT broker: %w", err)
    }

    c.isConnected = true
    return nil
}

The AwaitConnection method blocks until the connection establishes or the context cancels. This approach lets callers implement their own timeout strategies whilst ensuring the client tracks connection state accurately. With a local Mosquitto broker, connections typically establish within milliseconds rather than the seconds required for internet-based brokers.

Authentication configuration happens during client creation. For local setups, you might skip authentication entirely or use simple username/password pairs:

// For local broker without authentication
brokerURL, _ := url.Parse("tcp://homeassistant.local:1883")

// For local broker with basic authentication
brokerURL, _ := url.Parse("tcp://username:password@homeassistant.local:1883")

Topic wildcard matching implementation #

MQTT’s wildcard system requires careful implementation to match the specification exactly. The topicMatches function handles three cases: exact matches, single-level wildcards (+), and multi-level wildcards (#).

func topicMatches(pattern, topic string) bool {
    if pattern == topic {
        return true
    }

    if strings.HasSuffix(pattern, "/#") {
        prefix := pattern[:len(pattern)-2]
        return topic == prefix || strings.HasPrefix(topic, prefix+"/")
    }

    patternParts := strings.Split(pattern, "/")
    topicParts := strings.Split(topic, "/")

    // Implementation continues...
}

The function splits topics into segments and compares them level by level. Special handling for /# patterns ensures they match both the exact prefix and longer paths.

Testing wildcard matching requires comprehensive examples:

  • sensors/+/temperature matches sensors/bedroom/temperature but not sensors/bedroom/living-room/temperature
  • sensors/# matches sensors, sensors/bedroom, and sensors/bedroom/temperature
  • +/temperature matches bedroom/temperature but not sensors/bedroom/temperature

Concurrent message processing #

Message handlers run concurrently to prevent slow handlers from blocking others. Each incoming message spawns a goroutine that processes handlers matching the topic pattern.

func (c *Client) handleMessage(publish *paho.Publish) {
    topic := publish.Topic
    handlers := c.findMatchingHandlers(topic)

    for _, handler := range handlers {
        currentHandler := handler
        c.wg.Go(func() {
            ctx := WithTopic(context.Background(), topic)
            err := currentHandler(ctx, publish.Payload)
            if err != nil {
                // Handle error appropriately
            }
        })
    }
}

The currentHandler variable prevents closure capture issues where all goroutines might execute the same handler. The wait group tracks active handlers for graceful shutdown.

Context enhancement provides handlers with topic information without requiring additional parameters:

func WithTopic(ctx context.Context, topic string) context.Context {
    return context.WithValue(ctx, topicKey, topic)
}

func GetTopic(ctx context.Context) string {
    v, ok := ctx.Value(topicKey).(string)
    if !ok {
        return ""
    }
    return v
}

Graceful shutdown and cleanup #

Proper shutdown coordination ensures no messages are lost during application termination. The disconnect process waits for active handlers to complete before closing the connection.

func (c *Client) Disconnect(ctx context.Context) error {
    ctx, cancel := context.WithTimeout(ctx, 20*time.Second)
    defer cancel()

    waitDone := make(chan struct{})
    go func() {
        c.wg.Wait()
        close(waitDone)
    }()

    select {
    case <-waitDone:
        // Handlers completed
    case <-ctx.Done():
        return errors.New("timeout waiting for handlers")
    }

    return c.conn.Disconnect(ctx)
}

The timeout prevents indefinite blocking if handlers don’t complete promptly. Applications can choose appropriate timeout values based on their handler behaviour.

Practical usage patterns #

Real applications typically use multiple handlers for different message types. A home automation system might separate temperature, humidity, and motion sensor handlers:

client.AddHandler("sensors/+/temperature", handleTemperature)
client.AddHandler("sensors/+/humidity", handleHumidity)
client.AddHandler("motion/+", handleMotion)
client.AddHandler("alerts/#", handleAlerts)

Handler registration automatically subscribes to topics when the client is connected. This eliminates the common pattern of manual subscription after connection establishment.

Error handling in handlers should be defensive. Network issues, malformed payloads, or database connection problems shouldn’t crash the entire client:

func handleTemperature(ctx context.Context, payload []byte) error {
    topic := GetTopic(ctx)

    var reading TemperatureReading
    if err := json.Unmarshal(payload, &reading); err != nil {
        log.Printf("Invalid temperature data from %s: %v", topic, err)
        return nil // Don't retry malformed data
    }

    if err := database.Store(reading); err != nil {
        log.Printf("Failed to store temperature reading: %v", err)
        return err // Retry database errors
    }

    return nil
}

Testing strategies #

Testing MQTT clients requires either a test broker or careful mocking. The Eclipse Mosquitto project provides a public test broker at test.mosquitto.org:1883 for development and testing.

Integration tests should verify handler registration, message routing, and connection recovery:

func TestHandlerRouting(t *testing.T) {
    client, err := NewClient(ctx, testConfig)
    require.NoError(t, err)

    var received []string
    client.AddHandler("test/+", func(ctx context.Context, payload []byte) error {
        received = append(received, string(payload))
        return nil
    })

    client.Publish(ctx, "test/sensor1", []byte("data1"))
    client.Publish(ctx, "test/sensor2", []byte("data2"))

    // Verify received contains expected data
}

Unit tests for topic matching don’t require a broker connection and can run quickly:

func TestTopicMatching(t *testing.T) {
    cases := []struct {
        pattern, topic string
        expected       bool
    }{
        {"sensors/+/temperature", "sensors/bedroom/temperature", true},
        {"sensors/+/temperature", "sensors/bedroom/humidity", false},
        {"sensors/#", "sensors/bedroom/temperature", true},
    }

    for _, c := range cases {
        result := topicMatches(c.pattern, c.topic)
        assert.Equal(t, c.expected, result)
    }
}

Performance considerations and optimisations #

Message throughput depends on handler performance and goroutine overhead. For high-volume scenarios, consider message batching or worker pools instead of spawning unlimited goroutines.

The handler registry uses a simple map with read-write mutex protection. Applications with thousands of topic patterns might benefit from more sophisticated data structures like prefix trees or regular expression compilation.

Connection configuration affects performance and reliability. Shorter keepalive intervals detect network issues faster but increase bandwidth usage. Longer retry delays reduce broker load during outages but slow recovery.

Security and authentication #

Production deployments should use TLS encryption and authentication. MQTT supports username/password authentication, client certificates, and OAuth tokens depending on broker configuration.

URL-based authentication configuration keeps credentials in one place:

// For username/password authentication
brokerURL, _ := url.Parse("tcp://username:password@broker.example.com:1883")

// For TLS without authentication
brokerURL, _ := url.Parse("tls://broker.example.com:8883")

Client certificates require additional configuration in the autopaho client setup. OAuth token handling needs custom authentication callbacks.

Extending the client #

The basic client provides a foundation for more sophisticated features. Message persistence can store and replay messages during connection outages. Message transformation can validate, filter, or modify payloads before handler execution.

Rate limiting prevents handlers from overwhelming downstream systems during message bursts. Circuit breakers can disable handlers that repeatedly fail, preventing cascading failures.

Metrics collection helps monitor client health. Track connection uptime, message rates, handler execution times, and error frequencies to understand system behaviour.

Common pitfalls and solutions #

Handler goroutines without proper error handling can cause memory leaks through panic recovery. Always handle errors gracefully and avoid panics in handler code. I learned this the hard way when a malformed JSON payload from a failing Xiaomi sensor crashed my entire monitoring service.

Connection state races occur when multiple goroutines check IsConnected() simultaneously. The client’s internal locking prevents these issues, but application code should handle connection errors appropriately. During Home Assistant restarts, expect temporary connection failures and design handlers to gracefully handle them.

Topic subscription ordering matters for wildcard patterns. More specific patterns should be registered before general ones to ensure proper message routing precedence. In my setup, device-specific handlers like energy/evcharger/power register before the catch-all energy/# pattern.

Reconnection loops can overwhelm brokers if retry delays are too short. The autopaho library includes exponential backoff, but applications should monitor connection attempt frequency. Home Assistant’s built-in Mosquitto addon handles reconnections well, but external brokers might have stricter connection limits.

Battery-powered Aqara devices introduce their own challenges. These sensors often send irregular updates or enter sleep modes that affect message timing. Design your handlers to accommodate missing data and use reasonable timeouts when waiting for sensor responses.

Integrating with Home Assistant architecture #

My MQTT client sits alongside Home Assistant rather than replacing it, handling custom analytics and backup monitoring that complement the main automation system. Home Assistant manages device discovery, user interfaces, and real-time automation, whilst my Go services handle historical analysis, custom alerting, and data export.

This separation works well because each system excels at different tasks. Home Assistant’s visual interface makes it perfect for configuring automation rules and monitoring device status. Go services excel at data processing, custom calculations, and integration with external systems like energy supplier APIs.

The MQTT broker becomes the coordination layer between systems. Home Assistant publishes state changes and device readings, my Go services process this data and publish derived metrics, and both systems can subscribe to each other’s topics for coordination.

Within my home lab I run Home Assistant in a VM on a thin pc and my Go services run in separate VMs. If a system needs maintenance, the others can continue operating independently, with MQTT message retention ensuring no data loss during brief outages.

Conclusion #

Building an MQTT client wrapper teaches you the protocol’s subtleties whilst providing exactly the functionality your application needs. The combination of automatic reconnection, concurrent message processing, and graceful shutdown handling creates a robust foundation for IoT and messaging applications.

The complete implementation demonstrates Go’s strengths for network programming: excellent concurrency primitives, clear error handling, and strong typing that prevents common messaging bugs.

Start with the basic client and extend it based on your specific requirements. Monitor its behaviour in production and adjust timeouts, retry logic, and error handling based on real usage patterns.