Skip to main content

Using CUE to unify IoT sensor data

·14 mins

I’ve been building a home automation system that processes motion sensor data from various IKEA Zigbee devices. Each sensor type sends a different data structure, and I needed to extract consistent information from all of them without sacrificing type safety.

I spent a long time trying to make Home Assistant work for my setup. The visual automation builder felt limiting once I needed conditional logic beyond simple triggers, and writing automations in YAML became tedious. Adding complex logic like correlating motion across multiple rooms or applying time-weighted logic was difficult. I ended up fighting the system rather than solving the problem.

My current home automation setup looks like this:

┌─────────────┐  ┌─────────────┐
│   TRÅDFRI   │  │  VALLHORN   │
│   Sensor    │  │   Sensor    │
└──────┬──────┘  └──────┬──────┘
       │                │
       └────────┬───────┘
         Zigbee │
                ▼
       ┌──────────────────┐
       │  zigbee2mqtt     │
       │  (Zigbee bridge) │
       └────────┬─────────┘
                │
                ▼
       ┌──────────────────┐
       │   MQTT Broker    │
       │                  │
       │ Topics:          │
       │  bedroom/motion  │
       │  hallway/motion  │
       │  kitchen/motion  │
       └────┬─────────┬───┘
            │         │
            │         │ JSON messages
    ┌───────┘         └───────┐
    ▼                         ▼
┌──────────────┐      ┌──────────────┐
│    Node-RED  │      │     Home     │
│              │      │   Assistant  │
│  - Visual    │      │              │
│    flows     │◄────►│  - Visual    │
│  - Custom    │      │    automata  │
│    logic     │      │  - YAML      │
└──────────────┘      └──────┬───────┘
                             │
                             │ HTTP API
                             ▼
                    ┌──────────────────┐
                    │ Philips Hue Hub  │
                    │                  │
                    │  ┌────┐  ┌────┐  │
                    │  │Bulb│  │Bulb│  │
                    │  └────┘  └────┘  │
                    │                  │
                    │  ┌──────┐        │
                    │  │Sensor│        │
                    │  └──────┘        │
                    └──────────────────┘

With Home Assistant and Node-RED handling some of my automation logic, and controlling devices through various hubs like Philips Hue. This works for many people, but I wanted more control and flexibility that I knew I could get with a custom Go service. I could build exactly what I wanted with the language I already knew.

I’m building a Go service to replace this entire automation layer—Home Assistant, Node-RED, and the device-specific hubs. The service subscribes directly to MQTT topics where zigbee2mqtt publishes sensor data, processes that data, and sends commands back to control lights and other devices.

The system architecture #

The sensors connect via Zigbee, a low-power wireless protocol common in home automation. I’m running zigbee2mqtt which bridges between Zigbee devices and an MQTT broker. My Go service subscribes to MQTT topics and processes the JSON messages that sensors publish whenever their state changes.

Currently, I’m using two different sensor types:

Eventually, I’ll add others including Philips Hue sensors. Each type has its own unique data structure with different fields. I’d seen this pattern before—small differences that compound into maintenance headaches as the codebase grows.

The zigbee2mqtt documentation shows what each sensor exposes. The TRÅDFRI reports light levels as a boolean illuminance_above_threshold, whilst the VALLHORN gives you actual lux readings in an illuminance field. The VALLHORN tells you precisely how many seconds have passed since motion stopped via no_occupancy_since; the TRÅDFRI doesn’t provide this at all. Both detect motion with an occupancy boolean, but handling these differences in pure Go means type switches, assertions, and duplicated logic across every method that touches sensor data.

From the beginning I knew CUE would let me validate sensor data at the boundary and transform it into consistent structures, meaning the rest of my code doesn’t need to know which sensor type it’s processing. CUE handles the differences between sensor types in both directions, but this article focuses on the inbound transformation (I’ll cover outbound MQTT messages in a follow-up article). This article shows my CUE implementation alongside what a pure Go approach might have looked like, demonstrating where CUE removes the boilerplate.

The pure Go approach and its limitations #

Here’s what processing sensor data might have looked like if I’d built this system in pure Go without CUE:

// Separate structs for each sensor's raw data
type TradfriRawData struct {
    Battery                   int  `json:"battery"`
    Occupancy                 bool `json:"occupancy"`
    IlluminanceAboveThreshold bool `json:"illuminance_above_threshold"`
    Linkquality               int  `json:"linkquality"`
}

type VallhornRawData struct {
    Battery          int   `json:"battery"`
    Occupancy        bool  `json:"occupancy"`
    Illuminance      int   `json:"illuminance"`
    NoOccupancySince *int  `json:"no_occupancy_since,omitempty"` // Optional, pointer to handle null
    Linkquality      int   `json:"linkquality"`
    Voltage          int   `json:"voltage"`
}

type MotionSensorMessage struct {
    SensorID   string `json:"sensor_id"`
    Timestamp  string `json:"timestamp"`
    Floor      string `json:"floor"`
    SensorType string `json:"sensor_type"`
    RawData    any    `json:"raw_data"`
}

// Processing requires routing based on sensor type
func ProcessMQTTMessage(topic string, payload []byte, metadata SensorMetadata) (*MotionSensorMessage, error) {
    var rawData any

    switch metadata.SensorType {
    case "ikea_tradfri_motion":
        var tradfri TradfriRawData
        if err := json.Unmarshal(payload, &tradfri); err != nil {
            return nil, fmt.Errorf("failed to unmarshal tradfri data: %w", err)
        }
        rawData = tradfri

    case "ikea_vallhorn_motion":
        var vallhorn VallhornRawData
        if err := json.Unmarshal(payload, &vallhorn); err != nil {
            return nil, fmt.Errorf("failed to unmarshal vallhorn data: %w", err)
        }
        rawData = vallhorn

    default:
        return nil, fmt.Errorf("unknown sensor type: %s", metadata.SensorType)
    }

    return &MotionSensorMessage{
        SensorID:   metadata.SensorID,
        Timestamp:  time.Now().Format(time.RFC3339),
        Floor:      metadata.Floor,
        SensorType: metadata.SensorType,
        RawData:    rawData,
    }, nil
}

// Every method needs type switches
func (m *MotionSensorMessage) IsMotionDetected() (bool, error) {
    switch data := m.RawData.(type) {
    case TradfriRawData:
        return data.Occupancy, nil
    case VallhornRawData:
        return data.Occupancy, nil
    default:
        return false, fmt.Errorf("unknown sensor data type")
    }
}

func (m *MotionSensorMessage) IsLightSufficient() (bool, bool, error) {
    switch data := m.RawData.(type) {
    case TradfriRawData:
        // Field is inverted - true means dark
        return true, !data.IlluminanceAboveThreshold, nil

    case VallhornRawData:
        // Consider light sufficient if illuminance is above 50 lux
        return true, data.Illuminance > 50, nil

    default:
        return false, false, fmt.Errorf("unknown sensor data type")
    }
}

func (m *MotionSensorMessage) GetSecondsSinceMotion() (int, bool, error) {
    vallhorn, ok := m.RawData.(VallhornRawData)
    if !ok {
        return 0, false, nil // Not supported on this sensor
    }

    if vallhorn.Occupancy {
        return 0, false, nil // Motion detected, field not present
    }

    if vallhorn.NoOccupancySince == nil {
        return 0, false, nil // Field is null
    }

    return *vallhorn.NoOccupancySince, true, nil
}

Typed structs don’t solve the fundamental problem. Every method that extracts data requires type switches to handle the union. Optional fields like NoOccupancySince need pointers to distinguish between “not present”, “null”, and “has a value”. The VALLHORN sensor provides precise timing about how long a room has been empty, but extracting this value requires checking the sensor type, verifying occupancy is false, checking the pointer isn’t nil, and dereferencing it.

Add a third sensor type and you extend every type switch with another case. When IKEA releases a firmware update that changes a field, you hunt through multiple functions to fix it. I knew I’d be adding more sensor types and dealing with firmware variations, so I chose CUE from the start rather than fighting this complexity.

You could build similar behaviour in pure Go with interfaces and careful abstraction. CUE handles this declaratively. You define schemas with constraints and transformations, and CUE validates the incoming data whilst extracting normalised structures.

What CUE provides #

CUE is a configuration language that handles everything from defining schemas and generating configuration files to validating data and computing transformations. It’s used for API definitions, build configurations, policy validation, and more. In this system, I use it to validate sensor data at the boundary and transform it into normalised structures that my Go code can work with consistently.

Instead of validating battery levels with scattered if statements through your Go code, the schema catches invalid data at the boundary:

battery: int & >=0 & <=100

CUE definitions (prefixed with #) work like type definitions in Go. You can reference them and compose them with other schemas, but they don’t produce output by themselves. This lets you build a union type for all supported sensors:

#MotionSensor: #IkeaTradfriMotionSensorStatus | #IkeaVallhornMotionSensorStatus

The language handles both validation and transformation. This means you can turn the TRÅDFRI’s boolean light sensor and the VALLHORN’s lux reading into a single light_sufficient field without writing Go type switches.

Building the schemas #

I started with a base schema that all sensors share:

#MotionSensorBase: {
    sensor_id!: string
    timestamp?: string
    floor!: "basement" | "ground" | "upstairs" | "attic"
    sensor_type!: string
    raw_data!: _
}

The ! operator marks required fields, whilst ? marks optional ones.

The raw_data field uses CUE’s top type _, which accepts any value. This allows each sensor type to define its own structure for raw_data.

Each sensor gets its own schema that composes the base with sensor-specific constraints:

#IkeaTradfriMotionSensorStatus: #MotionSensorBase & {
    sensor_type: "ikea_tradfri_motion"
    raw_data: {
        battery: int & >=0 & <=100
        occupancy: bool
        illuminance_above_threshold: bool
        linkquality: int & >=0 & <=255
    }
}

The & operator composes schemas. Each sensor gets its own constraints that match IKEA’s hardware specifications.

The VALLHORN schema includes additional fields that the older TRÅDFRI doesn’t support:

#IkeaVallhornMotionSensorStatus: #MotionSensorBase & {
    sensor_type: "ikea_vallhorn_motion"
    raw_data: {
        battery: int & >=0 & <=100
        occupancy: bool
        illuminance: int & >=0 & <=65535
        linkquality: int & >=0 & <=255

        // Only present when occupancy is false
        no_occupancy_since?: null | int & >=0

        voltage: int
    }
}

The no_occupancy_since field is optional (marked with ?) and can be either null or a positive integer. When motion stops, the sensor starts counting seconds and reports them.

Extracting normalised data #

The extraction schemas contain the conditional logic that would otherwise live in Go type switches. Here’s motion detection:

#ExtractMotionFromSensor: {
    sensor: #MotionSensor

    motion_detected: bool
    if sensor.sensor_type == "ikea_tradfri_motion" ||
       sensor.sensor_type == "ikea_vallhorn_motion" {
        motion_detected: sensor.raw_data.occupancy
    }

    // Optional field for precise timing on supported sensors
    no_occupancy_seconds?: int

    if sensor.sensor_type == "ikea_vallhorn_motion" {
        // Only present when occupancy is false and sensor supports it
        if sensor.raw_data.occupancy == false &&
           sensor.raw_data.no_occupancy_since != _|_ &&
           sensor.raw_data.no_occupancy_since != null {
            no_occupancy_seconds: sensor.raw_data.no_occupancy_since
        }
    }
}

In Go, you’d write a method on each sensor struct to extract this data. Add a new sensor and you update every extraction method. With CUE, you add one conditional block to the schema and the extraction works everywhere you use it.

The no_occupancy_seconds field shows how CUE handles sensor-specific features. The VALLHORN provides this timing information, the TRÅDFRI doesn’t. Making the field optional means sensors without this capability simply don’t include it. Automation logic can check if it exists and use it for precise timing (“turn off lights if no motion for 10 minutes”), or fall back to basic occupancy detection when it’s not available.

Light detection demonstrates how CUE normalises different measurement approaches:

#ExtractAmbientLightFromSensor: {
    sensor: #MotionSensor

    light_sufficient?: bool
    has_ambient_sensor: bool

    if sensor.sensor_type == "ikea_tradfri_motion" {
        if sensor.raw_data.illuminance_above_threshold != _|_ {
            has_ambient_sensor: true
            // This field is inverted - true means dark
            light_sufficient: !sensor.raw_data.illuminance_above_threshold
        }
    }

    if sensor.sensor_type == "ikea_vallhorn_motion" {
        if sensor.raw_data.illuminance != _|_ {
            has_ambient_sensor: true
            // 50 lux is sufficient for movement without lights
            light_sufficient: sensor.raw_data.illuminance > 50
        }
    }
}

The TRÅDFRI’s boolean gets inverted (it indicates darkness rather than light). The VALLHORN’s lux reading gets compared to a threshold. Application code doesn’t need to know about these differences. It just checks light_sufficient.

How the Go code works with CUE #

With CUE handling validation and transformation, the Go code becomes generic. In my Go code, I’ve decided to wrap optional fields in an Option type that provides convenience helpers like IsPresent() and MustGet() methods for safely handling values that may or may not exist.

Here’s the actual implementation:

// Generic processor that works for ALL sensor types
type MotionSensorProcessor struct {
    cueCtx                    *cue.Context
    extractMotionSchema       cue.Value  // #ExtractMotionFromSensor
    extractAmbientLightSchema cue.Value  // #ExtractAmbientLightFromSensor
}

// This function works for Vallhorn, Tradfri, or any future sensor
func (p *MotionSensorProcessor) ProcessSensorData(sensorData MotionSensor, statusSchema cue.Value) error {
    // Encode to CUE and validate against the sensor-specific schema
    sensorDataAsCUE := p.cueCtx.Encode(sensorData)
    if err := statusSchema.Unify(sensorDataAsCUE).Validate(); err != nil {
        return fmt.Errorf("schema validation failed: %w", err)
    }

    // Extract motion event using CUE's extraction schema
    extractMotion := map[string]any{"sensor": sensorData}
    extractMotionAsCUE := p.cueCtx.Encode(extractMotion)
    unifiedMotion := p.extractMotionSchema.Unify(extractMotionAsCUE)

    // Decode back to a clean Go struct
    var motionEvent MotionEvent
    if err := unifiedMotion.Decode(&motionEvent); err != nil {
        return fmt.Errorf("failed to decode motion event: %w", err)
    }

    // Identical code for all sensor types
    if motionEvent.MotionDetected {
        handleMotionDetected(sensorData.Floor)
    }

    // Works for Vallhorn (which has the field) and gracefully handles
    // Tradfri (which doesn't) via the Option type
    if motionEvent.NoOccupancySeconds.IsPresent() {
        seconds := motionEvent.NoOccupancySeconds.MustGet()
        if seconds > 600 {
            handleNoMotionTimeout(sensorData.Floor, seconds)
        }
    }

    // Extract ambient light reading using CUE
    extractAmbientLight := map[string]any{"sensor": sensorData}
    extractAmbientLightAsCUE := p.cueCtx.Encode(extractAmbientLight)
    unifiedAmbientLight := p.extractAmbientLightSchema.Unify(extractAmbientLightAsCUE)

    var lightReading AmbientLightReading
    if err := unifiedAmbientLight.Decode(&lightReading); err != nil {
        return fmt.Errorf("failed to decode ambient light: %w", err)
    }

    // Handles both Tradfri's boolean and Vallhorn's lux reading
    // CUE normalised them both into a single light_sufficient field
    if lightReading.HasAmbientSensor && lightReading.LightSufficient.IsPresent() {
        if !lightReading.LightSufficient.MustGet() {
            handleDarkness(sensorData.Floor)
        }
    }

    return nil
}

// Vallhorn-specific handler - ONLY handles unmarshalling
type VallhornHandler struct {
    processor    *MotionSensorProcessor
    statusSchema cue.Value  // #IkeaVallhornMotionSensorStatus
}

func (h *VallhornHandler) HandleMQTT(topic string, payload []byte) error {
    sensorID, floor, err := parseSensorTopic(topic)
    if err != nil {
        return fmt.Errorf("failed to parse topic: %w", err)
    }

    // THIS IS THE ONLY VALLHORN-SPECIFIC CODE
    var rawData VallhornRawData
    if err := json.Unmarshal(payload, &rawData); err != nil {
        return fmt.Errorf("failed to unmarshal: %w", err)
    }

    // Build the generic sensor data structure
    sensorData := MotionSensor{
        SensorID:   sensorID,
        Timestamp:  time.Now().Format(time.RFC3339),
        Floor:      floor,
        SensorType: "ikea_vallhorn_motion",
        RawData:    rawData,
    }

    // Everything from here is generic and reusable
    return h.processor.ProcessSensorData(sensorData, h.statusSchema)
}

// Tradfri-specific handler - ONLY handles unmarshalling
type TradfriHandler struct {
    processor    *MotionSensorProcessor
    statusSchema cue.Value  // #IkeaTradfriMotionSensorStatus
}

func (h *TradfriHandler) HandleMQTT(topic string, payload []byte) error {
    sensorID, floor, err := parseSensorTopic(topic)
    if err != nil {
        return fmt.Errorf("failed to parse topic: %w", err)
    }

    // THIS IS THE ONLY TRADFRI-SPECIFIC CODE
    var rawData TradfriRawData
    if err := json.Unmarshal(payload, &rawData); err != nil {
        return fmt.Errorf("failed to unmarshal: %w", err)
    }

    // Build the generic sensor data structure
    sensorData := MotionSensor{
        SensorID:   sensorID,
        Timestamp:  time.Now().Format(time.RFC3339),
        Floor:      floor,
        SensorType: "ikea_tradfri_motion",
        RawData:    rawData,
    }

    // Same generic processor works for Tradfri too
    return h.processor.ProcessSensorData(sensorData, h.statusSchema)
}

Each handler’s sensor-specific code is about five lines: just unmarshalling JSON into the correct struct type. Everything else is generic processing. The same ProcessSensorData function validates, extracts motion events, extracts ambient light readings, and executes business logic without knowing which specific sensor it’s processing.

The extraction schemas in CUE contain all the conditional logic. The Go code just encodes, unifies, and decodes. No type switches. No conditional chains.

What you gain #

When an MQTT message arrives, validation against the CUE schema happens at the boundary. Invalid data gets rejected with specific error messages about what’s wrong rather than causing runtime panics deep in the application.

The schema files document the exact structure and valid ranges for each sensor. Adding a new sensor type means defining one schema and making the necessary changes to the extraction schemas for motion and ambient light. The existing generic processing code continues to work.

That said, CUE validates structure but tracking application state (like whether a room has been unoccupied for 10 minutes) still requires Go code. CUE tells you what the current state is, not what to do with it. Time-series analysis, rate limiting, and state machines live in your application layer.

There’s a learning curve if you’re coming from Protobuf, JSON Schema or Go structs. CUE’s syntax looks familiar but behaves differently, especially around definitions versus concrete values. The documentation helps, but expect some initial friction.

Why CUE instead of alternatives #

JSON Schema can validate incoming data but can’t transform it. You’d still need Go type switches to extract normalised structures from different sensor types.

Protocol Buffers require a compilation step and don’t handle optional fields as elegantly. The no_occupancy_since field that’s sometimes present, sometimes null, and sometimes absent would need Protobuf’s wrapper types (like google.protobuf.Int32Value) or custom handling logic.

CUE does both validation and transformation in one tool. The same schemas that validate incoming MQTT messages also define the extraction logic that normalises the data. You maintain one set of schemas that handles both concerns.

What’s next #

I built this system with CUE from the start, and the flexibility it provides compared to a pure Go solution has worked well for this use case. The schemas sit at the boundary where MQTT messages enter the system. Every sensor type gets validated, and the rest of the codebase works with clean, normalised structures. The system currently handles four motion sensors across four floors, controlling lights on two floors.

Adding new sensor types means defining a schema and updating the extraction logic. When I add Philips Hue sensors, I’ll extend the #MotionSensor union type and make the necessary changes to the extraction schemas for motion and ambient light detection. The generic processing code won’t need to change.

This article covered inbound sensor data transformation. But in the coming weeks, I’ll be writing more about this system, starting with how CUE handles the outbound side: generating device-specific MQTT commands for different bulb types. There’s a lot more to explore around building automation rules, managing state, and handling edge cases in a distributed sensor network.

Thanks #

Thanks to Paul, Roger and Daniel from the CUE team for their help and support with CUE and this article.