mqtt_bus / historian_worker.md
1 contributor
429 lines | 11.458kb

Historian Bus Worker

Definition

A historian worker is a consumer component that subscribes to one or more canonical semantic MQTT buses and writes normalized measurements into the PostgreSQL telemetry historian.

Its role is the inverse of an adapter:

  • adapters translate vendor-specific inputs into canonical bus topics
  • historian workers translate canonical bus topics into historian API calls

The worker is a consumer of the semantic contract, not an owner of it.


Purpose

The worker exists to decouple historian persistence from:

  • vendor topic structures
  • protocol-specific payload formats
  • source-specific device identities
  • source-side timing quirks

This keeps the historian integration generic enough to ingest any bus that follows the shared MQTT contract.


Architectural Position

The historian worker operates at the egress side of the semantic bus.

Pipeline:

Device / External System ↓ Protocol Adapter ↓ Canonical MQTT Bus ↓ Historian Worker ↓ telemetry.ingest_measurement(...) ↓ Historian tables

The worker consumes canonical MQTT only.

It MUST NOT subscribe to raw vendor topics such as zigbee2mqtt/... for normal historian ingestion.


Shared Inputs

The worker consumes data defined by:

  • mqtt_contract.md
  • bus-specific contracts such as home_bus.md and energy_bus.md
  • tdb_ingestion/mqtt_ingestion_api.md
  • tdb_ingestion/counter_ingestion_api.md when cumulative counter metrics are present on the bus

The worker should treat the semantic bus as the source of truth for topic grammar and stream meaning.

The database API is the source of truth for persistence behavior.

Boundary decisions are recorded in consolidated_spec.md §10.


Responsibilities

The historian worker is responsible for:

  1. Topic subscription

Subscribe to canonical topics for the target buses.

  1. Topic parsing

Extract metric_name, device_id, stream, and bus-specific dimensions from the topic.

  1. Payload decoding

Decode scalar payloads and Profile B envelopes according to mqtt_contract.md.

  1. Meta caching

Consume retained meta topics and keep a local cache keyed by topic stem.

  1. Historian policy enforcement

Respect meta.historian.enabled and meta.historian.mode when deciding what should be persisted.

  1. Type selection

Choose the numeric or boolean PostgreSQL overload correctly for measurement-style metrics.

  1. Timestamp resolution

Use observed_at from the envelope when present, otherwise fall back to ingestion time.

  1. Per-stream ordering

Preserve source order for the same (metric_name, device_id) path when sending measurements to PostgreSQL.

  1. Error handling

Separate permanent message errors from transient infrastructure failures.

  1. Observability

Emit worker health, counters, and persistence failures on operational topics.


Explicit Non-Responsibilities

The historian worker must NOT:

  • parse vendor-specific topics as part of the normal pipeline
  • redefine semantic topic contracts
  • invent new device identities outside the bus contract
  • perform automation logic
  • aggregate unrelated sensors into synthetic measurements
  • reimplement historian segment logic already handled in PostgreSQL
  • repair malformed semantic messages silently

If semantic messages are invalid, the worker should surface them operationally and skip persistence.


Subscription Model

Recommended subscriptions per site:

  • +/home/+/+/+/value
  • +/home/+/+/+/last
  • +/energy/+/+/+/value
  • +/energy/+/+/+/last
  • +/home/+/+/+/meta
  • +/energy/+/+/+/meta

Optional observability subscriptions:

  • +/home/+/+/+/availability
  • +/energy/+/+/+/availability
  • +/sys/adapter/+/availability
  • +/sys/adapter/+/error

Rules:

  • meta SHOULD be subscribed and cached before relying on enrichment
  • set topics MUST be ignored by the historian worker
  • last topics MUST be ignored for normal time-series ingestion
  • availability SHOULD NOT be persisted as telemetry samples unless a separate policy explicitly requires it

Topic-to-Historian Mapping

Home Bus

Topic:

<site>/home/<location>/<capability>/<device_id>/<stream>

Mapped fields:

  • metric_name = <capability>
  • device_id = <location>.<device_id>

Energy Bus

Topic:

<site>/energy/<entity_type>/<entity_id>/<metric>/<stream>

Mapped fields:

  • metric_name = <metric>
  • device_id = <entity_type>.<entity_id>

The worker MUST NOT require database-side topic parsing.


Payload Handling

Scalar Payload

If the stream uses Profile A:

  • parse the scalar payload as number, boolean, or enum string
  • resolve unit from cached retained meta
  • set observed_at to ingestion time unless source time is available out of band

Envelope Payload

If the stream uses Profile B:

  • read value
  • use observed_at when present
  • use unit from payload, falling back to cached meta.unit
  • use quality when present for logs or side metrics

Type Compatibility Rule

The current PostgreSQL historian API directly supports:

  • numeric values
  • boolean values

String enum states are allowed on the semantic bus, but the worker SHOULD skip them unless there is an explicit encoding policy.

Examples:

  • open / closed may be mapped to boolean if a metric policy expects that
  • heat / cool / off MUST NOT be guessed into arbitrary numeric values

Counter Metric Boundary

Counter-style cumulative metrics such as energy_total, import_energy_total, export_energy_total, rx_bytes_total, or tx_packets_total are valid semantic bus values, but they are not covered by the current measurement ingestion API.

Rules:

  • the worker MUST NOT force cumulative counters through telemetry.ingest_measurement(...) just because the payload is numeric
  • the worker SHOULD route them to the separate counter ingestion path defined in tdb_ingestion/counter_ingestion_api.md
  • if the counter path is temporarily unavailable, the worker SHOULD skip them explicitly and expose that through operational stats or DLQ
  • measurement-style metrics such as active_power, voltage, current, temperature, and soc continue through the existing measurement API

Stream Policy

Default persistence policy:

  • ingest value by default
  • ignore last
  • use meta.historian.mode to interpret whether a value stream represents sample, state, or event semantics
  • ignore set
  • never treat meta itself as a measurement

Additional rules:

  • if meta.historian.enabled=false, the worker MUST skip persistence
  • if meta is missing, the worker MAY continue with degraded defaults
  • missing meta MUST NOT block ingestion of otherwise valid numeric or boolean value samples

Ordering and Delivery

Ordering is critical because PostgreSQL enforces append-only writes per (metric_name, device_id).

Rules:

  • the worker MUST serialize writes for the same (metric_name, device_id)
  • retries MUST preserve order
  • the worker SHOULD partition concurrency by (metric_name, device_id) or an equivalent stable shard
  • out-of-order database errors SHOULD be treated as permanent message errors unless the worker can prove replay ordering was preserved

Operational implication:

  • broad parallelism is acceptable across distinct devices or metrics
  • unordered fan-out for the same logical stream is not acceptable

Meta Cache

The worker should maintain a retained metadata cache.

Recommended cache key:

  • topic stem without the final stream segment

Example:

  • topic: vad/home/bedroom/temperature/bedroom-sensor/meta
  • cache key: vad/home/bedroom/temperature/bedroom-sensor

Recommended cached fields:

  • payload_profile
  • data_type
  • unit
  • historian.enabled
  • historian.mode
  • schema_ref
  • adapter_id
  • source
  • source_ref

Rules:

  • cached meta SHOULD be replaced atomically when newer retained data arrives
  • worker startup SHOULD tolerate receiving live value traffic before the corresponding retained meta
  • if meta is deleted, the cache entry SHOULD be removed as well

Database Interaction

The worker writes measurement-style samples through telemetry.ingest_measurement(...) as defined in tdb_ingestion/mqtt_ingestion_api.md.

Rules:

  • use the numeric overload for numeric values
  • use the boolean overload for boolean values
  • type NULL explicitly if unknown values are ever supported by bus policy
  • log the returned action for observability
  • do not duplicate historian logic already implemented in PostgreSQL
  • do not send counter-style cumulative totals through this API; those follow the separate contract in tdb_ingestion/counter_ingestion_api.md

The worker should assume the database is responsible for:

  • deduplication
  • gap detection
  • segment splitting
  • policy boundary handling

Operational Topics

Shared operational namespace rules are defined in sys_bus.md.

The worker should expose its own operational topics under:

  • <site>/sys/historian/<worker_id>/availability
  • <site>/sys/historian/<worker_id>/stats
  • <site>/sys/historian/<worker_id>/error
  • <site>/sys/historian/<worker_id>/dlq

Recommended uses:

  • availability: retained online/offline state
  • stats: low-rate counters such as ingested messages, skipped samples, and retry counts
  • error: structured infrastructure or persistence failures
  • dlq: semantic messages rejected by the worker

This keeps historian worker observability separate from adapter observability.


Failure Handling

Permanent message failures include:

  • topic does not match a supported bus grammar
  • payload type cannot be mapped to the target historian type
  • unknown metric in PostgreSQL
  • out-of-order measurement
  • value violates metric policy

Transient failures include:

  • PostgreSQL unavailable
  • network interruption between worker and database
  • temporary broker disconnect

Rules:

  • permanent message failures SHOULD be logged and dead-lettered
  • transient failures SHOULD be retried without reordering per stream
  • ambiguous retries that later return out-of-order errors SHOULD be logged as likely duplicates

Deployment Model

A historian worker may be deployed in one of the following ways:

  • one worker for all buses
  • one worker per bus
  • one worker per site and bus

Recommended starting model:

  • one worker per site consuming both home and energy

This keeps the first implementation simple while preserving the option to split workers later for throughput or failure-domain reasons.


Relationship with Adapters

Adapters and historian workers are symmetrical integration roles.

Adapter direction:

  • external protocol -> canonical bus

Historian worker direction:

  • canonical bus -> historian API

Both should be:

  • contract-driven
  • deterministic
  • observable
  • easy to replay in tests

Neither should contain business automation logic.


Design Principles

  1. Consume canonical topics only

The worker should depend on semantic contracts, not vendor payloads.

  1. Keep persistence generic

The same worker model should work for home, energy, and future buses.

  1. Preserve ordering

Correct historian writes matter more than maximum ingest parallelism.

  1. Fail visibly

Bad messages and persistence problems must be observable on operational topics.

  1. Reuse shared contract rules

The worker should not invent alternate payload or metadata semantics.