mqtt_bus / historian_worker.md
Newer Older
429 lines | 11.458kb
Bogdan Timofte authored 2 weeks ago
1
# Historian Bus Worker
2

            
3
## Definition
4

            
5
A historian worker is a consumer component that subscribes to one or more canonical semantic MQTT buses and writes normalized measurements into the PostgreSQL telemetry historian.
6

            
7
Its role is the inverse of an adapter:
8

            
9
- adapters translate vendor-specific inputs into canonical bus topics
10
- historian workers translate canonical bus topics into historian API calls
11

            
12
The worker is a consumer of the semantic contract, not an owner of it.
13

            
14
---
15

            
16
## Purpose
17

            
18
The worker exists to decouple historian persistence from:
19

            
20
- vendor topic structures
21
- protocol-specific payload formats
22
- source-specific device identities
23
- source-side timing quirks
24

            
25
This keeps the historian integration generic enough to ingest any bus that follows the shared MQTT contract.
26

            
27
---
28

            
29
## Architectural Position
30

            
31
The historian worker operates at the egress side of the semantic bus.
32

            
33
Pipeline:
34

            
35
Device / External System
36
    ↓
37
Protocol Adapter
38
    ↓
39
Canonical MQTT Bus
40
    ↓
41
Historian Worker
42
    ↓
43
`telemetry.ingest_measurement(...)`
44
    ↓
45
Historian tables
46

            
47
The worker consumes canonical MQTT only.
48

            
49
It MUST NOT subscribe to raw vendor topics such as `zigbee2mqtt/...` for normal historian ingestion.
50

            
51
---
52

            
53
## Shared Inputs
54

            
55
The worker consumes data defined by:
56

            
57
- `mqtt_contract.md`
58
- bus-specific contracts such as `home_bus.md` and `energy_bus.md`
59
- `tdb_ingestion/mqtt_ingestion_api.md`
60
- `tdb_ingestion/counter_ingestion_api.md` when cumulative counter metrics are present on the bus
61

            
62
The worker should treat the semantic bus as the source of truth for topic grammar and stream meaning.
63

            
64
The database API is the source of truth for persistence behavior.
65

            
66
Boundary decisions are recorded in `consolidated_spec.md` §10.
67

            
68
---
69

            
70
## Responsibilities
71

            
72
The historian worker is responsible for:
73

            
74
1. Topic subscription
75

            
76
Subscribe to canonical topics for the target buses.
77

            
78
2. Topic parsing
79

            
80
Extract `metric_name`, `device_id`, `stream`, and bus-specific dimensions from the topic.
81

            
82
3. Payload decoding
83

            
84
Decode scalar payloads and Profile B envelopes according to `mqtt_contract.md`.
85

            
86
4. Meta caching
87

            
88
Consume retained `meta` topics and keep a local cache keyed by topic stem.
89

            
90
5. Historian policy enforcement
91

            
92
Respect `meta.historian.enabled` and `meta.historian.mode` when deciding what should be persisted.
93

            
94
6. Type selection
95

            
96
Choose the numeric or boolean PostgreSQL overload correctly for measurement-style metrics.
97

            
98
7. Timestamp resolution
99

            
100
Use `observed_at` from the envelope when present, otherwise fall back to ingestion time.
101

            
102
8. Per-stream ordering
103

            
104
Preserve source order for the same `(metric_name, device_id)` path when sending measurements to PostgreSQL.
105

            
106
9. Error handling
107

            
108
Separate permanent message errors from transient infrastructure failures.
109

            
110
10. Observability
111

            
112
Emit worker health, counters, and persistence failures on operational topics.
113

            
114
---
115

            
116
## Explicit Non-Responsibilities
117

            
118
The historian worker must NOT:
119

            
120
- parse vendor-specific topics as part of the normal pipeline
121
- redefine semantic topic contracts
122
- invent new device identities outside the bus contract
123
- perform automation logic
124
- aggregate unrelated sensors into synthetic measurements
125
- reimplement historian segment logic already handled in PostgreSQL
126
- repair malformed semantic messages silently
127

            
128
If semantic messages are invalid, the worker should surface them operationally and skip persistence.
129

            
130
---
131

            
132
## Subscription Model
133

            
134
Recommended subscriptions per site:
135

            
136
- `+/home/+/+/+/value`
137
- `+/home/+/+/+/last`
138
- `+/energy/+/+/+/value`
139
- `+/energy/+/+/+/last`
140
- `+/home/+/+/+/meta`
141
- `+/energy/+/+/+/meta`
142

            
143
Optional observability subscriptions:
144

            
145
- `+/home/+/+/+/availability`
146
- `+/energy/+/+/+/availability`
147
- `+/sys/adapter/+/availability`
148
- `+/sys/adapter/+/error`
149

            
150
Rules:
151

            
152
- `meta` SHOULD be subscribed and cached before relying on enrichment
153
- `set` topics MUST be ignored by the historian worker
154
- `last` topics MUST be ignored for normal time-series ingestion
155
- `availability` SHOULD NOT be persisted as telemetry samples unless a separate policy explicitly requires it
156

            
157
---
158

            
159
## Topic-to-Historian Mapping
160

            
161
### Home Bus
162

            
163
Topic:
164

            
165
`<site>/home/<location>/<capability>/<device_id>/<stream>`
166

            
167
Mapped fields:
168

            
169
- `metric_name = <capability>`
170
- `device_id = <location>.<device_id>`
171

            
172
### Energy Bus
173

            
174
Topic:
175

            
176
`<site>/energy/<entity_type>/<entity_id>/<metric>/<stream>`
177

            
178
Mapped fields:
179

            
180
- `metric_name = <metric>`
181
- `device_id = <entity_type>.<entity_id>`
182

            
183
The worker MUST NOT require database-side topic parsing.
184

            
185
---
186

            
187
## Payload Handling
188

            
189
### Scalar Payload
190

            
191
If the stream uses Profile A:
192

            
193
- parse the scalar payload as number, boolean, or enum string
194
- resolve `unit` from cached retained `meta`
195
- set `observed_at` to ingestion time unless source time is available out of band
196

            
197
### Envelope Payload
198

            
199
If the stream uses Profile B:
200

            
201
- read `value`
202
- use `observed_at` when present
203
- use `unit` from payload, falling back to cached `meta.unit`
204
- use `quality` when present for logs or side metrics
205

            
206
### Type Compatibility Rule
207

            
208
The current PostgreSQL historian API directly supports:
209

            
210
- numeric values
211
- boolean values
212

            
213
String enum states are allowed on the semantic bus, but the worker SHOULD skip them unless there is an explicit encoding policy.
214

            
215
Examples:
216

            
217
- `open` / `closed` may be mapped to boolean if a metric policy expects that
218
- `heat` / `cool` / `off` MUST NOT be guessed into arbitrary numeric values
219

            
220
### Counter Metric Boundary
221

            
222
Counter-style cumulative metrics such as `energy_total`, `import_energy_total`, `export_energy_total`, `rx_bytes_total`, or `tx_packets_total` are valid semantic bus values, but they are not covered by the current measurement ingestion API.
223

            
224
Rules:
225

            
226
- the worker MUST NOT force cumulative counters through `telemetry.ingest_measurement(...)` just because the payload is numeric
227
- the worker SHOULD route them to the separate counter ingestion path defined in `tdb_ingestion/counter_ingestion_api.md`
228
- if the counter path is temporarily unavailable, the worker SHOULD skip them explicitly and expose that through operational stats or DLQ
229
- measurement-style metrics such as `active_power`, `voltage`, `current`, `temperature`, and `soc` continue through the existing measurement API
230

            
231
---
232

            
233
## Stream Policy
234

            
235
Default persistence policy:
236

            
237
- ingest `value` by default
238
- ignore `last`
239
- use `meta.historian.mode` to interpret whether a `value` stream represents `sample`, `state`, or `event` semantics
240
- ignore `set`
241
- never treat `meta` itself as a measurement
242

            
243
Additional rules:
244

            
245
- if `meta.historian.enabled=false`, the worker MUST skip persistence
246
- if `meta` is missing, the worker MAY continue with degraded defaults
247
- missing `meta` MUST NOT block ingestion of otherwise valid numeric or boolean `value` samples
248

            
249
---
250

            
251
## Ordering and Delivery
252

            
253
Ordering is critical because PostgreSQL enforces append-only writes per `(metric_name, device_id)`.
254

            
255
Rules:
256

            
257
- the worker MUST serialize writes for the same `(metric_name, device_id)`
258
- retries MUST preserve order
259
- the worker SHOULD partition concurrency by `(metric_name, device_id)` or an equivalent stable shard
260
- out-of-order database errors SHOULD be treated as permanent message errors unless the worker can prove replay ordering was preserved
261

            
262
Operational implication:
263

            
264
- broad parallelism is acceptable across distinct devices or metrics
265
- unordered fan-out for the same logical stream is not acceptable
266

            
267
---
268

            
269
## Meta Cache
270

            
271
The worker should maintain a retained metadata cache.
272

            
273
Recommended cache key:
274

            
275
- topic stem without the final stream segment
276

            
277
Example:
278

            
279
- topic: `vad/home/bedroom/temperature/bedroom-sensor/meta`
280
- cache key: `vad/home/bedroom/temperature/bedroom-sensor`
281

            
282
Recommended cached fields:
283

            
284
- `payload_profile`
285
- `data_type`
286
- `unit`
287
- `historian.enabled`
288
- `historian.mode`
289
- `schema_ref`
290
- `adapter_id`
291
- `source`
292
- `source_ref`
293

            
294
Rules:
295

            
296
- cached `meta` SHOULD be replaced atomically when newer retained data arrives
297
- worker startup SHOULD tolerate receiving live `value` traffic before the corresponding retained `meta`
298
- if `meta` is deleted, the cache entry SHOULD be removed as well
299

            
300
---
301

            
302
## Database Interaction
303

            
304
The worker writes measurement-style samples through `telemetry.ingest_measurement(...)` as defined in `tdb_ingestion/mqtt_ingestion_api.md`.
305

            
306
Rules:
307

            
308
- use the numeric overload for numeric values
309
- use the boolean overload for boolean values
310
- type `NULL` explicitly if unknown values are ever supported by bus policy
311
- log the returned `action` for observability
312
- do not duplicate historian logic already implemented in PostgreSQL
313
- do not send counter-style cumulative totals through this API; those follow the separate contract in `tdb_ingestion/counter_ingestion_api.md`
314

            
315
The worker should assume the database is responsible for:
316

            
317
- deduplication
318
- gap detection
319
- segment splitting
320
- policy boundary handling
321

            
322
---
323

            
324
## Operational Topics
325

            
326
Shared operational namespace rules are defined in `sys_bus.md`.
327

            
328
The worker should expose its own operational topics under:
329

            
330
- `<site>/sys/historian/<worker_id>/availability`
331
- `<site>/sys/historian/<worker_id>/stats`
332
- `<site>/sys/historian/<worker_id>/error`
333
- `<site>/sys/historian/<worker_id>/dlq`
334

            
335
Recommended uses:
336

            
337
- `availability`: retained online/offline state
338
- `stats`: low-rate counters such as ingested messages, skipped samples, and retry counts
339
- `error`: structured infrastructure or persistence failures
340
- `dlq`: semantic messages rejected by the worker
341

            
342
This keeps historian worker observability separate from adapter observability.
343

            
344
---
345

            
346
## Failure Handling
347

            
348
Permanent message failures include:
349

            
350
- topic does not match a supported bus grammar
351
- payload type cannot be mapped to the target historian type
352
- unknown metric in PostgreSQL
353
- out-of-order measurement
354
- value violates metric policy
355

            
356
Transient failures include:
357

            
358
- PostgreSQL unavailable
359
- network interruption between worker and database
360
- temporary broker disconnect
361

            
362
Rules:
363

            
364
- permanent message failures SHOULD be logged and dead-lettered
365
- transient failures SHOULD be retried without reordering per stream
366
- ambiguous retries that later return out-of-order errors SHOULD be logged as likely duplicates
367

            
368
---
369

            
370
## Deployment Model
371

            
372
A historian worker may be deployed in one of the following ways:
373

            
374
- one worker for all buses
375
- one worker per bus
376
- one worker per site and bus
377

            
378
Recommended starting model:
379

            
380
- one worker per site consuming both `home` and `energy`
381

            
382
This keeps the first implementation simple while preserving the option to split workers later for throughput or failure-domain reasons.
383

            
384
---
385

            
386
## Relationship with Adapters
387

            
388
Adapters and historian workers are symmetrical integration roles.
389

            
390
Adapter direction:
391

            
392
- external protocol -> canonical bus
393

            
394
Historian worker direction:
395

            
396
- canonical bus -> historian API
397

            
398
Both should be:
399

            
400
- contract-driven
401
- deterministic
402
- observable
403
- easy to replay in tests
404

            
405
Neither should contain business automation logic.
406

            
407
---
408

            
409
## Design Principles
410

            
411
1. Consume canonical topics only
412

            
413
The worker should depend on semantic contracts, not vendor payloads.
414

            
415
2. Keep persistence generic
416

            
417
The same worker model should work for `home`, `energy`, and future buses.
418

            
419
3. Preserve ordering
420

            
421
Correct historian writes matter more than maximum ingest parallelism.
422

            
423
4. Fail visibly
424

            
425
Bad messages and persistence problems must be observable on operational topics.
426

            
427
5. Reuse shared contract rules
428

            
429
The worker should not invent alternate payload or metadata semantics.