A historian worker is a consumer component that subscribes to one or more canonical semantic MQTT buses and writes normalized measurements into the PostgreSQL telemetry historian.
Its role is the inverse of an adapter:
The worker is a consumer of the semantic contract, not an owner of it.
The worker exists to decouple historian persistence from:
This keeps the historian integration generic enough to ingest any bus that follows the shared MQTT contract.
The historian worker operates at the egress side of the semantic bus.
Pipeline:
Device / External System
↓
Protocol Adapter
↓
Canonical MQTT Bus
↓
Historian Worker
↓
telemetry.ingest_measurement(...)
↓
Historian tables
The worker consumes canonical MQTT only.
It MUST NOT subscribe to raw vendor topics such as zigbee2mqtt/... for normal historian ingestion.
The worker consumes data defined by:
mqtt_contract.mdhome_bus.md and energy_bus.mdtdb_ingestion/mqtt_ingestion_api.mdtdb_ingestion/counter_ingestion_api.md when cumulative counter metrics are present on the busThe worker should treat the semantic bus as the source of truth for topic grammar and stream meaning.
The database API is the source of truth for persistence behavior.
Boundary decisions are recorded in consolidated_spec.md §10.
The historian worker is responsible for:
Subscribe to canonical topics for the target buses.
Extract metric_name, device_id, stream, and bus-specific dimensions from the topic.
Decode scalar payloads and Profile B envelopes according to mqtt_contract.md.
Consume retained meta topics and keep a local cache keyed by topic stem.
Respect meta.historian.enabled and meta.historian.mode when deciding what should be persisted.
Choose the numeric or boolean PostgreSQL overload correctly for measurement-style metrics.
Use observed_at from the envelope when present, otherwise fall back to ingestion time.
Preserve source order for the same (metric_name, device_id) path when sending measurements to PostgreSQL.
Separate permanent message errors from transient infrastructure failures.
Emit worker health, counters, and persistence failures on operational topics.
The historian worker must NOT:
If semantic messages are invalid, the worker should surface them operationally and skip persistence.
Recommended subscriptions per site:
+/home/+/+/+/value+/home/+/+/+/last+/energy/+/+/+/value+/energy/+/+/+/last+/home/+/+/+/meta+/energy/+/+/+/metaOptional observability subscriptions:
+/home/+/+/+/availability+/energy/+/+/+/availability+/sys/adapter/+/availability+/sys/adapter/+/errorRules:
meta SHOULD be subscribed and cached before relying on enrichmentset topics MUST be ignored by the historian workerlast topics MUST be ignored for normal time-series ingestionavailability SHOULD NOT be persisted as telemetry samples unless a separate policy explicitly requires itTopic:
<site>/home/<location>/<capability>/<device_id>/<stream>
Mapped fields:
metric_name = <capability>device_id = <location>.<device_id>Topic:
<site>/energy/<entity_type>/<entity_id>/<metric>/<stream>
Mapped fields:
metric_name = <metric>device_id = <entity_type>.<entity_id>The worker MUST NOT require database-side topic parsing.
If the stream uses Profile A:
unit from cached retained metaobserved_at to ingestion time unless source time is available out of bandIf the stream uses Profile B:
valueobserved_at when presentunit from payload, falling back to cached meta.unitquality when present for logs or side metricsThe current PostgreSQL historian API directly supports:
String enum states are allowed on the semantic bus, but the worker SHOULD skip them unless there is an explicit encoding policy.
Examples:
open / closed may be mapped to boolean if a metric policy expects thatheat / cool / off MUST NOT be guessed into arbitrary numeric valuesCounter-style cumulative metrics such as energy_total, import_energy_total, export_energy_total, rx_bytes_total, or tx_packets_total are valid semantic bus values, but they are not covered by the current measurement ingestion API.
Rules:
telemetry.ingest_measurement(...) just because the payload is numerictdb_ingestion/counter_ingestion_api.mdactive_power, voltage, current, temperature, and soc continue through the existing measurement APIDefault persistence policy:
value by defaultlastmeta.historian.mode to interpret whether a value stream represents sample, state, or event semanticssetmeta itself as a measurementAdditional rules:
meta.historian.enabled=false, the worker MUST skip persistencemeta is missing, the worker MAY continue with degraded defaultsmeta MUST NOT block ingestion of otherwise valid numeric or boolean value samplesOrdering is critical because PostgreSQL enforces append-only writes per (metric_name, device_id).
Rules:
(metric_name, device_id)(metric_name, device_id) or an equivalent stable shardOperational implication:
The worker should maintain a retained metadata cache.
Recommended cache key:
Example:
vad/home/bedroom/temperature/bedroom-sensor/metavad/home/bedroom/temperature/bedroom-sensorRecommended cached fields:
payload_profiledata_typeunithistorian.enabledhistorian.modeschema_refadapter_idsourcesource_refRules:
meta SHOULD be replaced atomically when newer retained data arrivesvalue traffic before the corresponding retained metameta is deleted, the cache entry SHOULD be removed as wellThe worker writes measurement-style samples through telemetry.ingest_measurement(...) as defined in tdb_ingestion/mqtt_ingestion_api.md.
Rules:
NULL explicitly if unknown values are ever supported by bus policyaction for observabilitytdb_ingestion/counter_ingestion_api.mdThe worker should assume the database is responsible for:
Shared operational namespace rules are defined in sys_bus.md.
The worker should expose its own operational topics under:
<site>/sys/historian/<worker_id>/availability<site>/sys/historian/<worker_id>/stats<site>/sys/historian/<worker_id>/error<site>/sys/historian/<worker_id>/dlqRecommended uses:
availability: retained online/offline statestats: low-rate counters such as ingested messages, skipped samples, and retry countserror: structured infrastructure or persistence failuresdlq: semantic messages rejected by the workerThis keeps historian worker observability separate from adapter observability.
Permanent message failures include:
Transient failures include:
Rules:
A historian worker may be deployed in one of the following ways:
Recommended starting model:
home and energyThis keeps the first implementation simple while preserving the option to split workers later for throughput or failure-domain reasons.
Adapters and historian workers are symmetrical integration roles.
Adapter direction:
Historian worker direction:
Both should be:
Neither should contain business automation logic.
The worker should depend on semantic contracts, not vendor payloads.
The same worker model should work for home, energy, and future buses.
Correct historian writes matter more than maximum ingest parallelism.
Bad messages and persistence problems must be observable on operational topics.
The worker should not invent alternate payload or metadata semantics.