# Historian Bus Worker

## Definition

A historian worker is a consumer component that subscribes to one or more canonical semantic MQTT buses and writes normalized measurements into the PostgreSQL telemetry historian.

Its role is the inverse of an adapter:

- adapters translate vendor-specific inputs into canonical bus topics
- historian workers translate canonical bus topics into historian API calls

The worker is a consumer of the semantic contract, not an owner of it.

---

## Purpose

The worker exists to decouple historian persistence from:

- vendor topic structures
- protocol-specific payload formats
- source-specific device identities
- source-side timing quirks

This keeps the historian integration generic enough to ingest any bus that follows the shared MQTT contract.

---

## Architectural Position

The historian worker operates at the egress side of the semantic bus.

Pipeline:

Device / External System
    ↓
Protocol Adapter
    ↓
Canonical MQTT Bus
    ↓
Historian Worker
    ↓
`telemetry.ingest_measurement(...)`
    ↓
Historian tables

The worker consumes canonical MQTT only.

It MUST NOT subscribe to raw vendor topics such as `zigbee2mqtt/...` for normal historian ingestion.

---

## Shared Inputs

The worker consumes data defined by:

- `mqtt_contract.md`
- bus-specific contracts such as `home_bus.md` and `energy_bus.md`
- `tdb_ingestion/mqtt_ingestion_api.md`
- `tdb_ingestion/counter_ingestion_api.md` when cumulative counter metrics are present on the bus

The worker should treat the semantic bus as the source of truth for topic grammar and stream meaning.

The database API is the source of truth for persistence behavior.

Boundary decisions are recorded in `consolidated_spec.md` §10.

---

## Responsibilities

The historian worker is responsible for:

1. Topic subscription

Subscribe to canonical topics for the target buses.

2. Topic parsing

Extract `metric_name`, `device_id`, `stream`, and bus-specific dimensions from the topic.

3. Payload decoding

Decode scalar payloads and Profile B envelopes according to `mqtt_contract.md`.

4. Meta caching

Consume retained `meta` topics and keep a local cache keyed by topic stem.

5. Historian policy enforcement

Respect `meta.historian.enabled` and `meta.historian.mode` when deciding what should be persisted.

6. Type selection

Choose the numeric or boolean PostgreSQL overload correctly for measurement-style metrics.

7. Timestamp resolution

Use `observed_at` from the envelope when present, otherwise fall back to ingestion time.

8. Per-stream ordering

Preserve source order for the same `(metric_name, device_id)` path when sending measurements to PostgreSQL.

9. Error handling

Separate permanent message errors from transient infrastructure failures.

10. Observability

Emit worker health, counters, and persistence failures on operational topics.

---

## Explicit Non-Responsibilities

The historian worker must NOT:

- parse vendor-specific topics as part of the normal pipeline
- redefine semantic topic contracts
- invent new device identities outside the bus contract
- perform automation logic
- aggregate unrelated sensors into synthetic measurements
- reimplement historian segment logic already handled in PostgreSQL
- repair malformed semantic messages silently

If semantic messages are invalid, the worker should surface them operationally and skip persistence.

---

## Subscription Model

Recommended subscriptions per site:

- `+/home/+/+/+/value`
- `+/home/+/+/+/last`
- `+/energy/+/+/+/value`
- `+/energy/+/+/+/last`
- `+/home/+/+/+/meta`
- `+/energy/+/+/+/meta`

Optional observability subscriptions:

- `+/home/+/+/+/availability`
- `+/energy/+/+/+/availability`
- `+/sys/adapter/+/availability`
- `+/sys/adapter/+/error`

Rules:

- `meta` SHOULD be subscribed and cached before relying on enrichment
- `set` topics MUST be ignored by the historian worker
- `last` topics MUST be ignored for normal time-series ingestion
- `availability` SHOULD NOT be persisted as telemetry samples unless a separate policy explicitly requires it

---

## Topic-to-Historian Mapping

### Home Bus

Topic:

`<site>/home/<location>/<capability>/<device_id>/<stream>`

Mapped fields:

- `metric_name = <capability>`
- `device_id = <location>.<device_id>`

### Energy Bus

Topic:

`<site>/energy/<entity_type>/<entity_id>/<metric>/<stream>`

Mapped fields:

- `metric_name = <metric>`
- `device_id = <entity_type>.<entity_id>`

The worker MUST NOT require database-side topic parsing.

---

## Payload Handling

### Scalar Payload

If the stream uses Profile A:

- parse the scalar payload as number, boolean, or enum string
- resolve `unit` from cached retained `meta`
- set `observed_at` to ingestion time unless source time is available out of band

### Envelope Payload

If the stream uses Profile B:

- read `value`
- use `observed_at` when present
- use `unit` from payload, falling back to cached `meta.unit`
- use `quality` when present for logs or side metrics

### Type Compatibility Rule

The current PostgreSQL historian API directly supports:

- numeric values
- boolean values

String enum states are allowed on the semantic bus, but the worker SHOULD skip them unless there is an explicit encoding policy.

Examples:

- `open` / `closed` may be mapped to boolean if a metric policy expects that
- `heat` / `cool` / `off` MUST NOT be guessed into arbitrary numeric values

### Counter Metric Boundary

Counter-style cumulative metrics such as `energy_total`, `import_energy_total`, `export_energy_total`, `rx_bytes_total`, or `tx_packets_total` are valid semantic bus values, but they are not covered by the current measurement ingestion API.

Rules:

- the worker MUST NOT force cumulative counters through `telemetry.ingest_measurement(...)` just because the payload is numeric
- the worker SHOULD route them to the separate counter ingestion path defined in `tdb_ingestion/counter_ingestion_api.md`
- if the counter path is temporarily unavailable, the worker SHOULD skip them explicitly and expose that through operational stats or DLQ
- measurement-style metrics such as `active_power`, `voltage`, `current`, `temperature`, and `soc` continue through the existing measurement API

---

## Stream Policy

Default persistence policy:

- ingest `value` by default
- ignore `last`
- use `meta.historian.mode` to interpret whether a `value` stream represents `sample`, `state`, or `event` semantics
- ignore `set`
- never treat `meta` itself as a measurement

Additional rules:

- if `meta.historian.enabled=false`, the worker MUST skip persistence
- if `meta` is missing, the worker MAY continue with degraded defaults
- missing `meta` MUST NOT block ingestion of otherwise valid numeric or boolean `value` samples

---

## Ordering and Delivery

Ordering is critical because PostgreSQL enforces append-only writes per `(metric_name, device_id)`.

Rules:

- the worker MUST serialize writes for the same `(metric_name, device_id)`
- retries MUST preserve order
- the worker SHOULD partition concurrency by `(metric_name, device_id)` or an equivalent stable shard
- out-of-order database errors SHOULD be treated as permanent message errors unless the worker can prove replay ordering was preserved

Operational implication:

- broad parallelism is acceptable across distinct devices or metrics
- unordered fan-out for the same logical stream is not acceptable

---

## Meta Cache

The worker should maintain a retained metadata cache.

Recommended cache key:

- topic stem without the final stream segment

Example:

- topic: `vad/home/bedroom/temperature/bedroom-sensor/meta`
- cache key: `vad/home/bedroom/temperature/bedroom-sensor`

Recommended cached fields:

- `payload_profile`
- `data_type`
- `unit`
- `historian.enabled`
- `historian.mode`
- `schema_ref`
- `adapter_id`
- `source`
- `source_ref`

Rules:

- cached `meta` SHOULD be replaced atomically when newer retained data arrives
- worker startup SHOULD tolerate receiving live `value` traffic before the corresponding retained `meta`
- if `meta` is deleted, the cache entry SHOULD be removed as well

---

## Database Interaction

The worker writes measurement-style samples through `telemetry.ingest_measurement(...)` as defined in `tdb_ingestion/mqtt_ingestion_api.md`.

Rules:

- use the numeric overload for numeric values
- use the boolean overload for boolean values
- type `NULL` explicitly if unknown values are ever supported by bus policy
- log the returned `action` for observability
- do not duplicate historian logic already implemented in PostgreSQL
- do not send counter-style cumulative totals through this API; those follow the separate contract in `tdb_ingestion/counter_ingestion_api.md`

The worker should assume the database is responsible for:

- deduplication
- gap detection
- segment splitting
- policy boundary handling

---

## Operational Topics

Shared operational namespace rules are defined in `sys_bus.md`.

The worker should expose its own operational topics under:

- `<site>/sys/historian/<worker_id>/availability`
- `<site>/sys/historian/<worker_id>/stats`
- `<site>/sys/historian/<worker_id>/error`
- `<site>/sys/historian/<worker_id>/dlq`

Recommended uses:

- `availability`: retained online/offline state
- `stats`: low-rate counters such as ingested messages, skipped samples, and retry counts
- `error`: structured infrastructure or persistence failures
- `dlq`: semantic messages rejected by the worker

This keeps historian worker observability separate from adapter observability.

---

## Failure Handling

Permanent message failures include:

- topic does not match a supported bus grammar
- payload type cannot be mapped to the target historian type
- unknown metric in PostgreSQL
- out-of-order measurement
- value violates metric policy

Transient failures include:

- PostgreSQL unavailable
- network interruption between worker and database
- temporary broker disconnect

Rules:

- permanent message failures SHOULD be logged and dead-lettered
- transient failures SHOULD be retried without reordering per stream
- ambiguous retries that later return out-of-order errors SHOULD be logged as likely duplicates

---

## Deployment Model

A historian worker may be deployed in one of the following ways:

- one worker for all buses
- one worker per bus
- one worker per site and bus

Recommended starting model:

- one worker per site consuming both `home` and `energy`

This keeps the first implementation simple while preserving the option to split workers later for throughput or failure-domain reasons.

---

## Relationship with Adapters

Adapters and historian workers are symmetrical integration roles.

Adapter direction:

- external protocol -> canonical bus

Historian worker direction:

- canonical bus -> historian API

Both should be:

- contract-driven
- deterministic
- observable
- easy to replay in tests

Neither should contain business automation logic.

---

## Design Principles

1. Consume canonical topics only

The worker should depend on semantic contracts, not vendor payloads.

2. Keep persistence generic

The same worker model should work for `home`, `energy`, and future buses.

3. Preserve ordering

Correct historian writes matter more than maximum ingest parallelism.

4. Fail visibly

Bad messages and persistence problems must be observable on operational topics.

5. Reuse shared contract rules

The worker should not invent alternate payload or metadata semantics.
