|
Bogdan Timofte
authored
2 weeks ago
|
1
|
# Historian Bus Worker
|
|
|
2
|
|
|
|
3
|
## Definition
|
|
|
4
|
|
|
|
5
|
A historian worker is a consumer component that subscribes to one or more canonical semantic MQTT buses and writes normalized measurements into the PostgreSQL telemetry historian.
|
|
|
6
|
|
|
|
7
|
Its role is the inverse of an adapter:
|
|
|
8
|
|
|
|
9
|
- adapters translate vendor-specific inputs into canonical bus topics
|
|
|
10
|
- historian workers translate canonical bus topics into historian API calls
|
|
|
11
|
|
|
|
12
|
The worker is a consumer of the semantic contract, not an owner of it.
|
|
|
13
|
|
|
|
14
|
---
|
|
|
15
|
|
|
|
16
|
## Purpose
|
|
|
17
|
|
|
|
18
|
The worker exists to decouple historian persistence from:
|
|
|
19
|
|
|
|
20
|
- vendor topic structures
|
|
|
21
|
- protocol-specific payload formats
|
|
|
22
|
- source-specific device identities
|
|
|
23
|
- source-side timing quirks
|
|
|
24
|
|
|
|
25
|
This keeps the historian integration generic enough to ingest any bus that follows the shared MQTT contract.
|
|
|
26
|
|
|
|
27
|
---
|
|
|
28
|
|
|
|
29
|
## Architectural Position
|
|
|
30
|
|
|
|
31
|
The historian worker operates at the egress side of the semantic bus.
|
|
|
32
|
|
|
|
33
|
Pipeline:
|
|
|
34
|
|
|
|
35
|
Device / External System
|
|
|
36
|
↓
|
|
|
37
|
Protocol Adapter
|
|
|
38
|
↓
|
|
|
39
|
Canonical MQTT Bus
|
|
|
40
|
↓
|
|
|
41
|
Historian Worker
|
|
|
42
|
↓
|
|
|
43
|
`telemetry.ingest_measurement(...)`
|
|
|
44
|
↓
|
|
|
45
|
Historian tables
|
|
|
46
|
|
|
|
47
|
The worker consumes canonical MQTT only.
|
|
|
48
|
|
|
|
49
|
It MUST NOT subscribe to raw vendor topics such as `zigbee2mqtt/...` for normal historian ingestion.
|
|
|
50
|
|
|
|
51
|
---
|
|
|
52
|
|
|
|
53
|
## Shared Inputs
|
|
|
54
|
|
|
|
55
|
The worker consumes data defined by:
|
|
|
56
|
|
|
|
57
|
- `mqtt_contract.md`
|
|
|
58
|
- bus-specific contracts such as `home_bus.md` and `energy_bus.md`
|
|
|
59
|
- `tdb_ingestion/mqtt_ingestion_api.md`
|
|
|
60
|
- `tdb_ingestion/counter_ingestion_api.md` when cumulative counter metrics are present on the bus
|
|
|
61
|
|
|
|
62
|
The worker should treat the semantic bus as the source of truth for topic grammar and stream meaning.
|
|
|
63
|
|
|
|
64
|
The database API is the source of truth for persistence behavior.
|
|
|
65
|
|
|
|
66
|
Boundary decisions are recorded in `consolidated_spec.md` §10.
|
|
|
67
|
|
|
|
68
|
---
|
|
|
69
|
|
|
|
70
|
## Responsibilities
|
|
|
71
|
|
|
|
72
|
The historian worker is responsible for:
|
|
|
73
|
|
|
|
74
|
1. Topic subscription
|
|
|
75
|
|
|
|
76
|
Subscribe to canonical topics for the target buses.
|
|
|
77
|
|
|
|
78
|
2. Topic parsing
|
|
|
79
|
|
|
|
80
|
Extract `metric_name`, `device_id`, `stream`, and bus-specific dimensions from the topic.
|
|
|
81
|
|
|
|
82
|
3. Payload decoding
|
|
|
83
|
|
|
|
84
|
Decode scalar payloads and Profile B envelopes according to `mqtt_contract.md`.
|
|
|
85
|
|
|
|
86
|
4. Meta caching
|
|
|
87
|
|
|
|
88
|
Consume retained `meta` topics and keep a local cache keyed by topic stem.
|
|
|
89
|
|
|
|
90
|
5. Historian policy enforcement
|
|
|
91
|
|
|
|
92
|
Respect `meta.historian.enabled` and `meta.historian.mode` when deciding what should be persisted.
|
|
|
93
|
|
|
|
94
|
6. Type selection
|
|
|
95
|
|
|
|
96
|
Choose the numeric or boolean PostgreSQL overload correctly for measurement-style metrics.
|
|
|
97
|
|
|
|
98
|
7. Timestamp resolution
|
|
|
99
|
|
|
|
100
|
Use `observed_at` from the envelope when present, otherwise fall back to ingestion time.
|
|
|
101
|
|
|
|
102
|
8. Per-stream ordering
|
|
|
103
|
|
|
|
104
|
Preserve source order for the same `(metric_name, device_id)` path when sending measurements to PostgreSQL.
|
|
|
105
|
|
|
|
106
|
9. Error handling
|
|
|
107
|
|
|
|
108
|
Separate permanent message errors from transient infrastructure failures.
|
|
|
109
|
|
|
|
110
|
10. Observability
|
|
|
111
|
|
|
|
112
|
Emit worker health, counters, and persistence failures on operational topics.
|
|
|
113
|
|
|
|
114
|
---
|
|
|
115
|
|
|
|
116
|
## Explicit Non-Responsibilities
|
|
|
117
|
|
|
|
118
|
The historian worker must NOT:
|
|
|
119
|
|
|
|
120
|
- parse vendor-specific topics as part of the normal pipeline
|
|
|
121
|
- redefine semantic topic contracts
|
|
|
122
|
- invent new device identities outside the bus contract
|
|
|
123
|
- perform automation logic
|
|
|
124
|
- aggregate unrelated sensors into synthetic measurements
|
|
|
125
|
- reimplement historian segment logic already handled in PostgreSQL
|
|
|
126
|
- repair malformed semantic messages silently
|
|
|
127
|
|
|
|
128
|
If semantic messages are invalid, the worker should surface them operationally and skip persistence.
|
|
|
129
|
|
|
|
130
|
---
|
|
|
131
|
|
|
|
132
|
## Subscription Model
|
|
|
133
|
|
|
|
134
|
Recommended subscriptions per site:
|
|
|
135
|
|
|
|
136
|
- `+/home/+/+/+/value`
|
|
|
137
|
- `+/home/+/+/+/last`
|
|
|
138
|
- `+/energy/+/+/+/value`
|
|
|
139
|
- `+/energy/+/+/+/last`
|
|
|
140
|
- `+/home/+/+/+/meta`
|
|
|
141
|
- `+/energy/+/+/+/meta`
|
|
|
142
|
|
|
|
143
|
Optional observability subscriptions:
|
|
|
144
|
|
|
|
145
|
- `+/home/+/+/+/availability`
|
|
|
146
|
- `+/energy/+/+/+/availability`
|
|
|
147
|
- `+/sys/adapter/+/availability`
|
|
|
148
|
- `+/sys/adapter/+/error`
|
|
|
149
|
|
|
|
150
|
Rules:
|
|
|
151
|
|
|
|
152
|
- `meta` SHOULD be subscribed and cached before relying on enrichment
|
|
|
153
|
- `set` topics MUST be ignored by the historian worker
|
|
|
154
|
- `last` topics MUST be ignored for normal time-series ingestion
|
|
|
155
|
- `availability` SHOULD NOT be persisted as telemetry samples unless a separate policy explicitly requires it
|
|
|
156
|
|
|
|
157
|
---
|
|
|
158
|
|
|
|
159
|
## Topic-to-Historian Mapping
|
|
|
160
|
|
|
|
161
|
### Home Bus
|
|
|
162
|
|
|
|
163
|
Topic:
|
|
|
164
|
|
|
|
165
|
`<site>/home/<location>/<capability>/<device_id>/<stream>`
|
|
|
166
|
|
|
|
167
|
Mapped fields:
|
|
|
168
|
|
|
|
169
|
- `metric_name = <capability>`
|
|
|
170
|
- `device_id = <location>.<device_id>`
|
|
|
171
|
|
|
|
172
|
### Energy Bus
|
|
|
173
|
|
|
|
174
|
Topic:
|
|
|
175
|
|
|
|
176
|
`<site>/energy/<entity_type>/<entity_id>/<metric>/<stream>`
|
|
|
177
|
|
|
|
178
|
Mapped fields:
|
|
|
179
|
|
|
|
180
|
- `metric_name = <metric>`
|
|
|
181
|
- `device_id = <entity_type>.<entity_id>`
|
|
|
182
|
|
|
|
183
|
The worker MUST NOT require database-side topic parsing.
|
|
|
184
|
|
|
|
185
|
---
|
|
|
186
|
|
|
|
187
|
## Payload Handling
|
|
|
188
|
|
|
|
189
|
### Scalar Payload
|
|
|
190
|
|
|
|
191
|
If the stream uses Profile A:
|
|
|
192
|
|
|
|
193
|
- parse the scalar payload as number, boolean, or enum string
|
|
|
194
|
- resolve `unit` from cached retained `meta`
|
|
|
195
|
- set `observed_at` to ingestion time unless source time is available out of band
|
|
|
196
|
|
|
|
197
|
### Envelope Payload
|
|
|
198
|
|
|
|
199
|
If the stream uses Profile B:
|
|
|
200
|
|
|
|
201
|
- read `value`
|
|
|
202
|
- use `observed_at` when present
|
|
|
203
|
- use `unit` from payload, falling back to cached `meta.unit`
|
|
|
204
|
- use `quality` when present for logs or side metrics
|
|
|
205
|
|
|
|
206
|
### Type Compatibility Rule
|
|
|
207
|
|
|
|
208
|
The current PostgreSQL historian API directly supports:
|
|
|
209
|
|
|
|
210
|
- numeric values
|
|
|
211
|
- boolean values
|
|
|
212
|
|
|
|
213
|
String enum states are allowed on the semantic bus, but the worker SHOULD skip them unless there is an explicit encoding policy.
|
|
|
214
|
|
|
|
215
|
Examples:
|
|
|
216
|
|
|
|
217
|
- `open` / `closed` may be mapped to boolean if a metric policy expects that
|
|
|
218
|
- `heat` / `cool` / `off` MUST NOT be guessed into arbitrary numeric values
|
|
|
219
|
|
|
|
220
|
### Counter Metric Boundary
|
|
|
221
|
|
|
|
222
|
Counter-style cumulative metrics such as `energy_total`, `import_energy_total`, `export_energy_total`, `rx_bytes_total`, or `tx_packets_total` are valid semantic bus values, but they are not covered by the current measurement ingestion API.
|
|
|
223
|
|
|
|
224
|
Rules:
|
|
|
225
|
|
|
|
226
|
- the worker MUST NOT force cumulative counters through `telemetry.ingest_measurement(...)` just because the payload is numeric
|
|
|
227
|
- the worker SHOULD route them to the separate counter ingestion path defined in `tdb_ingestion/counter_ingestion_api.md`
|
|
|
228
|
- if the counter path is temporarily unavailable, the worker SHOULD skip them explicitly and expose that through operational stats or DLQ
|
|
|
229
|
- measurement-style metrics such as `active_power`, `voltage`, `current`, `temperature`, and `soc` continue through the existing measurement API
|
|
|
230
|
|
|
|
231
|
---
|
|
|
232
|
|
|
|
233
|
## Stream Policy
|
|
|
234
|
|
|
|
235
|
Default persistence policy:
|
|
|
236
|
|
|
|
237
|
- ingest `value` by default
|
|
|
238
|
- ignore `last`
|
|
|
239
|
- use `meta.historian.mode` to interpret whether a `value` stream represents `sample`, `state`, or `event` semantics
|
|
|
240
|
- ignore `set`
|
|
|
241
|
- never treat `meta` itself as a measurement
|
|
|
242
|
|
|
|
243
|
Additional rules:
|
|
|
244
|
|
|
|
245
|
- if `meta.historian.enabled=false`, the worker MUST skip persistence
|
|
|
246
|
- if `meta` is missing, the worker MAY continue with degraded defaults
|
|
|
247
|
- missing `meta` MUST NOT block ingestion of otherwise valid numeric or boolean `value` samples
|
|
|
248
|
|
|
|
249
|
---
|
|
|
250
|
|
|
|
251
|
## Ordering and Delivery
|
|
|
252
|
|
|
|
253
|
Ordering is critical because PostgreSQL enforces append-only writes per `(metric_name, device_id)`.
|
|
|
254
|
|
|
|
255
|
Rules:
|
|
|
256
|
|
|
|
257
|
- the worker MUST serialize writes for the same `(metric_name, device_id)`
|
|
|
258
|
- retries MUST preserve order
|
|
|
259
|
- the worker SHOULD partition concurrency by `(metric_name, device_id)` or an equivalent stable shard
|
|
|
260
|
- out-of-order database errors SHOULD be treated as permanent message errors unless the worker can prove replay ordering was preserved
|
|
|
261
|
|
|
|
262
|
Operational implication:
|
|
|
263
|
|
|
|
264
|
- broad parallelism is acceptable across distinct devices or metrics
|
|
|
265
|
- unordered fan-out for the same logical stream is not acceptable
|
|
|
266
|
|
|
|
267
|
---
|
|
|
268
|
|
|
|
269
|
## Meta Cache
|
|
|
270
|
|
|
|
271
|
The worker should maintain a retained metadata cache.
|
|
|
272
|
|
|
|
273
|
Recommended cache key:
|
|
|
274
|
|
|
|
275
|
- topic stem without the final stream segment
|
|
|
276
|
|
|
|
277
|
Example:
|
|
|
278
|
|
|
|
279
|
- topic: `vad/home/bedroom/temperature/bedroom-sensor/meta`
|
|
|
280
|
- cache key: `vad/home/bedroom/temperature/bedroom-sensor`
|
|
|
281
|
|
|
|
282
|
Recommended cached fields:
|
|
|
283
|
|
|
|
284
|
- `payload_profile`
|
|
|
285
|
- `data_type`
|
|
|
286
|
- `unit`
|
|
|
287
|
- `historian.enabled`
|
|
|
288
|
- `historian.mode`
|
|
|
289
|
- `schema_ref`
|
|
|
290
|
- `adapter_id`
|
|
|
291
|
- `source`
|
|
|
292
|
- `source_ref`
|
|
|
293
|
|
|
|
294
|
Rules:
|
|
|
295
|
|
|
|
296
|
- cached `meta` SHOULD be replaced atomically when newer retained data arrives
|
|
|
297
|
- worker startup SHOULD tolerate receiving live `value` traffic before the corresponding retained `meta`
|
|
|
298
|
- if `meta` is deleted, the cache entry SHOULD be removed as well
|
|
|
299
|
|
|
|
300
|
---
|
|
|
301
|
|
|
|
302
|
## Database Interaction
|
|
|
303
|
|
|
|
304
|
The worker writes measurement-style samples through `telemetry.ingest_measurement(...)` as defined in `tdb_ingestion/mqtt_ingestion_api.md`.
|
|
|
305
|
|
|
|
306
|
Rules:
|
|
|
307
|
|
|
|
308
|
- use the numeric overload for numeric values
|
|
|
309
|
- use the boolean overload for boolean values
|
|
|
310
|
- type `NULL` explicitly if unknown values are ever supported by bus policy
|
|
|
311
|
- log the returned `action` for observability
|
|
|
312
|
- do not duplicate historian logic already implemented in PostgreSQL
|
|
|
313
|
- do not send counter-style cumulative totals through this API; those follow the separate contract in `tdb_ingestion/counter_ingestion_api.md`
|
|
|
314
|
|
|
|
315
|
The worker should assume the database is responsible for:
|
|
|
316
|
|
|
|
317
|
- deduplication
|
|
|
318
|
- gap detection
|
|
|
319
|
- segment splitting
|
|
|
320
|
- policy boundary handling
|
|
|
321
|
|
|
|
322
|
---
|
|
|
323
|
|
|
|
324
|
## Operational Topics
|
|
|
325
|
|
|
|
326
|
Shared operational namespace rules are defined in `sys_bus.md`.
|
|
|
327
|
|
|
|
328
|
The worker should expose its own operational topics under:
|
|
|
329
|
|
|
|
330
|
- `<site>/sys/historian/<worker_id>/availability`
|
|
|
331
|
- `<site>/sys/historian/<worker_id>/stats`
|
|
|
332
|
- `<site>/sys/historian/<worker_id>/error`
|
|
|
333
|
- `<site>/sys/historian/<worker_id>/dlq`
|
|
|
334
|
|
|
|
335
|
Recommended uses:
|
|
|
336
|
|
|
|
337
|
- `availability`: retained online/offline state
|
|
|
338
|
- `stats`: low-rate counters such as ingested messages, skipped samples, and retry counts
|
|
|
339
|
- `error`: structured infrastructure or persistence failures
|
|
|
340
|
- `dlq`: semantic messages rejected by the worker
|
|
|
341
|
|
|
|
342
|
This keeps historian worker observability separate from adapter observability.
|
|
|
343
|
|
|
|
344
|
---
|
|
|
345
|
|
|
|
346
|
## Failure Handling
|
|
|
347
|
|
|
|
348
|
Permanent message failures include:
|
|
|
349
|
|
|
|
350
|
- topic does not match a supported bus grammar
|
|
|
351
|
- payload type cannot be mapped to the target historian type
|
|
|
352
|
- unknown metric in PostgreSQL
|
|
|
353
|
- out-of-order measurement
|
|
|
354
|
- value violates metric policy
|
|
|
355
|
|
|
|
356
|
Transient failures include:
|
|
|
357
|
|
|
|
358
|
- PostgreSQL unavailable
|
|
|
359
|
- network interruption between worker and database
|
|
|
360
|
- temporary broker disconnect
|
|
|
361
|
|
|
|
362
|
Rules:
|
|
|
363
|
|
|
|
364
|
- permanent message failures SHOULD be logged and dead-lettered
|
|
|
365
|
- transient failures SHOULD be retried without reordering per stream
|
|
|
366
|
- ambiguous retries that later return out-of-order errors SHOULD be logged as likely duplicates
|
|
|
367
|
|
|
|
368
|
---
|
|
|
369
|
|
|
|
370
|
## Deployment Model
|
|
|
371
|
|
|
|
372
|
A historian worker may be deployed in one of the following ways:
|
|
|
373
|
|
|
|
374
|
- one worker for all buses
|
|
|
375
|
- one worker per bus
|
|
|
376
|
- one worker per site and bus
|
|
|
377
|
|
|
|
378
|
Recommended starting model:
|
|
|
379
|
|
|
|
380
|
- one worker per site consuming both `home` and `energy`
|
|
|
381
|
|
|
|
382
|
This keeps the first implementation simple while preserving the option to split workers later for throughput or failure-domain reasons.
|
|
|
383
|
|
|
|
384
|
---
|
|
|
385
|
|
|
|
386
|
## Relationship with Adapters
|
|
|
387
|
|
|
|
388
|
Adapters and historian workers are symmetrical integration roles.
|
|
|
389
|
|
|
|
390
|
Adapter direction:
|
|
|
391
|
|
|
|
392
|
- external protocol -> canonical bus
|
|
|
393
|
|
|
|
394
|
Historian worker direction:
|
|
|
395
|
|
|
|
396
|
- canonical bus -> historian API
|
|
|
397
|
|
|
|
398
|
Both should be:
|
|
|
399
|
|
|
|
400
|
- contract-driven
|
|
|
401
|
- deterministic
|
|
|
402
|
- observable
|
|
|
403
|
- easy to replay in tests
|
|
|
404
|
|
|
|
405
|
Neither should contain business automation logic.
|
|
|
406
|
|
|
|
407
|
---
|
|
|
408
|
|
|
|
409
|
## Design Principles
|
|
|
410
|
|
|
|
411
|
1. Consume canonical topics only
|
|
|
412
|
|
|
|
413
|
The worker should depend on semantic contracts, not vendor payloads.
|
|
|
414
|
|
|
|
415
|
2. Keep persistence generic
|
|
|
416
|
|
|
|
417
|
The same worker model should work for `home`, `energy`, and future buses.
|
|
|
418
|
|
|
|
419
|
3. Preserve ordering
|
|
|
420
|
|
|
|
421
|
Correct historian writes matter more than maximum ingest parallelism.
|
|
|
422
|
|
|
|
423
|
4. Fail visibly
|
|
|
424
|
|
|
|
425
|
Bad messages and persistence problems must be observable on operational topics.
|
|
|
426
|
|
|
|
427
|
5. Reuse shared contract rules
|
|
|
428
|
|
|
|
429
|
The worker should not invent alternate payload or metadata semantics.
|