mqtt_bus / consolidated_spec.md
Newer Older
693 lines | 23.52kb
Bogdan Timofte authored 2 weeks ago
1
# Consolidated Bus & Data Collection Specification
2

            
3
Data: 2026-03-20
4

            
5
Acest document consolidează specificațiile existente într-o referință unică pentru busuri, colectare de date și conformitate cu cerințele historian-ului. Nu înlocuiește documentele sursă, ci le leagă și stabilește deciziile rămase deschise.
6

            
7
Documentele sursă rămân canonice. Acest document este o hartă de navigare și un registru de decizii.
8

            
9
---
10

            
11
## 1. Arhitectura Pipeline-ului
12

            
13
```
14
Device / External System
15
    ↓
16
Protocol Adapter (Node-RED)
17
    ↓
18
Canonical MQTT Bus (home | energy)
19
    ↓
20
Historian Worker
21
    ↓
22
telemetry.ingest_measurement(...)   ← measurement path (implementat)
23
telemetry.ingest_counter(...)       ← counter path (contract stabilizat, neimplementat)
24
    ↓
25
PostgreSQL Historian
26
```
27

            
28
Documente relevante:
29
- `README.md` — arhitectura generală
30
- `addapters.md` — responsabilitățile adapterelor
31
- `historian_worker.md` — responsabilitățile worker-ului
32

            
33
---
34

            
35
## 2. Contractul MQTT Partajat
36

            
37
Definit în `mqtt_contract.md`. Toate busurile moștenesc aceste reguli.
38

            
39
### 2.1 Namespace
40

            
41
- Busuri semantice: `<site>/<bus>/...`
42
- Namespace operațional: `<site>/sys/...`
43
- `<site>` = site-id stabil, kebab-case (ex: `vad`)
44
- Busuri v1: `home`, `energy`
45

            
46
### 2.2 Stream-uri Canonice
47

            
48
| Stream | Scop | Retain | QoS |
49
|---|---|---|---|
50
| `value` | sample live hot-path | false | 1 |
51
| `last` | ultimul sample cu timestamp, pentru cold-start | true | 1 |
52
| `set` | comandă/request | false | 1 |
53
| `meta` | metadata statică sau slow-changing | true | 1 |
54
| `availability` | online/offline | true (+ LWT) | 1 |
55

            
56
Reguli:
57
- adaptoarele emit date live doar pe `value`
58
- adaptoarele deduplică `value` când valoarea semantică nu s-a schimbat
59
- adaptoarele actualizează `last` (retained) la fiecare schimbare de timestamp
60
- `state` și `event` sunt legacy, nu se introduc de adaptoare noi
61
- `set` nu se reține niciodată
62
- historian-ul ingestează doar `value`, ignoră `last`, `set`, `meta`, `availability`
63

            
64
### 2.3 Payload Profiles
65

            
66
**Profile A — Scalar (default pentru hot paths):**
67

            
68
```
69
23.6
70
41
71
true
72
on
73
```
74

            
75
- metadata pe `meta` separat (retained)
76
- `observed_at` = ingestion time (acceptabil pentru Phase 1)
77

            
78
**Profile B — Envelope JSON (opțional):**
79

            
80
```json
81
{
82
  "value": 23.6,
83
  "unit": "C",
84
  "observed_at": "2026-03-08T10:15:12Z",
85
  "quality": "good"
86
}
87
```
88

            
89
- câmpuri opționale: `published_at`, `source_seq`, `annotations`
90
- utilizat când fidelitatea timestamp-ului contează sau când quality trebuie propagat per-sample
91

            
92
### 2.4 Meta Contract
93

            
94
Fiecare topic retained `meta` descrie familia sibling de `value`/`last`.
95

            
96
Formă minimă recomandată:
97

            
98
```json
99
{
100
  "schema_ref": "mqbus.home.v1",
101
  "payload_profile": "scalar",
102
  "data_type": "number",
103
  "unit": "C",
104
  "adapter_id": "z2m-main",
105
  "source": "zigbee2mqtt",
106
  "source_ref": "0x00158d0008aa1111",
107
  "source_topic": "zigbee2mqtt/SENSOR/vad/bedroom/bedroom-sensor",
108
  "precision": 0.1,
109
  "historian": {
110
    "enabled": true,
111
    "mode": "sample"
112
  }
113
}
114
```
115

            
116
Câmpuri historian:
117
- `historian.enabled`: boolean
118
- `historian.mode`: `sample` | `state` | `event` | `ignore`
119
- `historian.retention_class`: opțional (`short`, `default`, `long`)
120
- `historian.sample_period_hint_s`: opțional
121

            
122
### 2.5 Time Semantics
123

            
124
Trei timestamp-uri distincte:
125

            
126
| Timestamp | Semnificație |
127
|---|---|
128
| `observed_at` | când sursa a observat/măsurat valoarea |
129
| `published_at` | când adapterul a publicat mesajul |
130
| `ingested_at` | când worker-ul a procesat mesajul |
131

            
132
Reguli:
133
- dacă sursa are timestamp, adapterul îl păstrează ca `observed_at`
134
- dacă nu, adapterul omite `observed_at`; worker-ul folosește `ingested_at`
135
- adaptoarele nu fabrică timestamp-uri
136
- dacă adapterul estimează timpul, folosește Profile B cu `quality=estimated`
137

            
138
### 2.6 Quality Model
139

            
140
Valori: `good`, `estimated`, `degraded`, `stale`, `invalid`
141

            
142
- `invalid` nu se emite pe bus semantic; merge pe `sys/.../error`
143
- `quality` se omite doar dacă `good` este implicit
144

            
145
---
146

            
147
## 3. Home Bus
148

            
149
Definit în `home_bus.md`. Bus room-centric pentru senzori, control, automatizare.
150

            
151
### 3.1 Topic Grammar
152

            
153
```
154
<site>/home/<location>/<capability>/<device_id>/<stream>
155
```
156

            
157
Exemple:
158
- `vad/home/bedroom/temperature/bedroom-sensor/value`
159
- `vad/home/living-room/motion/radar-south/value`
160
- `vad/home/kitchen/light/ceiling-switch/set`
161

            
162
### 3.2 Capability Catalog
163

            
164
**Environmental:** `temperature`, `humidity`, `pressure`, `illuminance`, `co2`, `voc`, `pm25`, `pm10`
165

            
166
**Presence/Safety:** `motion`, `presence`, `contact`, `water_leak`, `smoke`, `gas`, `tamper`
167

            
168
**Control:** `light`, `power`, `lock`, `cover_position`, `thermostat_mode`, `target_temperature`, `fan_mode`, `button`
169

            
170
**Device health:** `battery`, `battery_low`
171

            
172
Reguli:
173
- `power` pe `home` doar pentru control semantics (on/off)
174
- metrici electrice (`active_power`, `energy_total`) pe `energy`
175
- `linkquality` nu pe `home` (merge pe `sys` sau viitor `network`)
176
- `presence` = stare derivată de nivel mai înalt; detecția brută mmWave cu `fading_time=0` se publică ca `motion`
177

            
178
### 3.3 Historian Mapping
179

            
180
```
181
metric_name = <capability>
182
device_id   = <location>.<device_id>
183
```
184

            
185
Exemplu:
186
- Topic: `vad/home/bedroom/temperature/bedroom-sensor/value`
187
- `metric_name = temperature`
188
- `device_id = bedroom.bedroom-sensor`
189
- `value = 23.4` (scalar)
190
- `observed_at = ingested_at` (Profile A fallback)
191

            
192
---
193

            
194
## 4. Energy Bus
195

            
196
Definit în `energy_bus.md`. Bus pentru topologia electrică: producție, stocare, grid, load.
197

            
198
### 4.1 Topic Grammar
199

            
200
```
201
<site>/energy/<entity_type>/<entity_id>/<metric>/<stream>
202
```
203

            
204
Entity types: `source`, `storage`, `grid`, `load`, `transfer`
205

            
206
Exemple:
207
- `vad/energy/source/pv-roof-1/active_power/value`
208
- `vad/energy/storage/battery-main/soc/value`
209
- `vad/energy/grid/main-meter/import_power/value`
210
- `vad/energy/load/living-room-tv/active_power/value`
211

            
212
### 4.2 Metric Classes
213

            
214
**Measurement-style** (compatibile cu `ingest_measurement()` acum):
215

            
216
`active_power`, `voltage`, `current`, `frequency`, `soc`, `charge_power`, `discharge_power`
217

            
218
**Counter-style** (pe bus, dar NU prin measurement API — urmează `ingest_counter()`):
219

            
220
`energy_total`, `import_energy_total`, `export_energy_total`
221

            
222
### 4.3 Units Canonice
223

            
224
| Metric | Unit |
225
|---|---|
226
| power | `W` |
227
| energy | `Wh` sau `kWh` |
228
| voltage | `V` |
229
| current | `A` |
230
| frequency | `Hz` |
231
| state of charge | `%` |
232

            
233
### 4.4 Historian Mapping
234

            
235
```
236
metric_name = <metric>
237
device_id   = <entity_type>.<entity_id>
238
```
239

            
240
Exemplu:
241
- Topic: `vad/energy/storage/battery-main/soc/value`
242
- `metric_name = soc`
243
- `device_id = storage.battery-main`
244

            
245
### 4.5 Ownership Rule
246

            
247
- Dacă valoarea e pentru control utilizator într-o cameră → `home`
248
- Dacă valoarea e pentru contabilitate electrică → `energy`
249
- Un device fizic poate proiecta pe ambele busuri (smart plug: `power` pe `home`, `active_power` pe `energy`)
250

            
251
---
252

            
253
## 5. Namespace Operațional (sys)
254

            
255
Definit în `sys_bus.md`. Nu este un bus semantic.
256

            
257
### 5.1 Topic Grammar
258

            
259
```
260
<site>/sys/<producer_kind>/<instance_id>/<stream>
261
```
262

            
263
Producer kinds v1: `adapter`, `historian`
264

            
265
### 5.2 Stream-uri Operaționale
266

            
267
| Stream | Scop | Retain |
268
|---|---|---|
269
| `availability` | liveness-ul componentei | true + LWT |
270
| `stats` | contori operaționali, snapshot periodic | true |
271
| `error` | erori structurate pentru operator | false |
272
| `dlq` | mesaje dead-letter cu context | false |
273

            
274
Exemple:
275
- `vad/sys/adapter/z2m-main/availability` → `online`
276
- `vad/sys/historian/main/error` → `{"code":"unknown_metric",...}`
277

            
278
---
279

            
280
## 6. Adaptoare
281

            
282
Definit în `addapters.md` și `adapter_implementation_examples.md`.
283

            
284
### 6.1 Responsabilități
285

            
286
1. Traducere topic-uri din format vendor în bus canonic
287
2. Normalizare payload (JSON → scalar, conversie unități)
288
3. Păstrare timestamp sursă (dacă există)
289
4. Fan-out: 1 mesaj inbound → N publicațiuni canonice
290
5. Proiecție multi-bus (smart plug → `home` + `energy`)
291
6. Publicare `meta` retained înaintea traficului live
292
7. Publicare `availability` retained + LWT
293
8. Routing erori pe `sys/.../error` și `sys/.../dlq`
294

            
295
### 6.2 NON-Responsabilități
296

            
297
- Nu implementează logica HomeKit
298
- Nu implementează reguli de automatizare
299
- Nu agregă senzori
300
- Nu stochează date istorice
301
- Nu repară mesaje malformed (le expune pe `sys`)
302

            
303
### 6.3 Publish Boundary
304

            
305
La publicare, mesajul conține DOAR:
306

            
307
```javascript
308
{ topic: normalizedTopic, payload: normalizedValue }
309
```
310

            
311
Structurile interne de normalizare se șterg înainte de publish.
312

            
313
### 6.4 Z2M Topic Convention
314

            
315
Friendly name: `<device_type>/<site>/<location>/<device_id>`
316

            
317
Exemplu:
318
- Z2M: `zigbee2mqtt/ZG-204ZV/vad/balcon/south`
319
- Canonical: `vad/home/balcon/illuminance/south/value`
320

            
321
Permite traducere deterministă fără lookup tables.
322

            
323
### 6.5 Z2M Field Mapping
324

            
325
| Z2M field | Bus | Canonical capability/metric |
326
|---|---|---|
327
| `temperature` | home | `temperature` |
328
| `humidity` | home | `humidity` |
329
| `pressure` | home | `pressure` |
330
| `illuminance` | home | `illuminance` |
331
| `contact` | home | `contact` |
332
| `occupancy` (PIR) | home | `motion` |
333
| `presence` (mmWave, `fading_time=0`) | home | `motion` |
334
| `battery` | home | `battery` |
335
| `state` (plug/switch) | home | `power` |
336
| `action` (remote) | home | `button` |
337
| `power` (smart plug) | energy | `active_power` |
338
| `voltage` (smart plug) | energy | `voltage` |
339
| `current` (smart plug) | energy | `current` |
340
| `energy` (smart plug) | energy | `energy_total` |
341

            
342
### 6.6 Mapping Registry
343

            
344
Adapoarele folosesc configurație declarativă. Câmpuri recomandate:
345

            
346
`source_system`, `source_topic_match`, `source_field`, `target_bus`, `target_location` / `target_entity_id`, `target_capability` / `target_metric`, `target_device_id`, `stream`, `payload_profile`, `unit`, `historian_enabled`, `historian_mode`
347

            
348
### 6.7 Auto-Provisioning
349

            
350
Adapoarele derivă dimensiunile semantice din structura topic-ului Z2M. Override-uri se aplică doar pentru excepții. Default-uri: `bus=home`, `location=unknown`, `stream=value`.
351

            
352
---
353

            
354
## 7. Historian Worker
355

            
356
Definit în `historian_worker.md`.
357

            
358
### 7.1 Subscription Model
359

            
360
```
361
+/home/+/+/+/value       ← ingestie primară
362
+/energy/+/+/+/value      ← ingestie primară
363
+/home/+/+/+/meta         ← cache local
364
+/energy/+/+/+/meta       ← cache local
365
+/home/+/+/+/last         ← ignorat pentru ingestie
366
+/energy/+/+/+/last       ← ignorat pentru ingestie
367
```
368

            
369
### 7.2 Topic → Historian Mapping (sumar)
370

            
371
| Bus | `metric_name` | `device_id` |
372
|---|---|---|
373
| `home` | `<capability>` | `<location>.<device_id>` |
374
| `energy` | `<metric>` | `<entity_type>.<entity_id>` |
375

            
376
### 7.3 Payload Handling
377

            
378
**Scalar (Profile A):**
379
- `value` = payload parsat ca number/boolean
380
- `observed_at` = ingestion time
381
- `unit` din meta cache
382

            
383
**Envelope (Profile B):**
384
- `value` = `payload.value`
385
- `observed_at` = `payload.observed_at` sau ingestion time
386
- `unit` = `payload.unit` sau meta cache
387

            
388
### 7.4 Type Routing
389

            
390
| Tip payload | Overload DB |
391
|---|---|
392
| numeric | `ingest_measurement(..., value::double precision, ...)` |
393
| boolean | `ingest_measurement(..., value::boolean, ...)` |
394
| string enum | SKIP (nu se ingestează fără encoding policy explicit) |
395
| counter cumulative | SKIP measurement path; rutare pe counter pipeline |
396

            
397
### 7.5 Ordering
398

            
399
- Serializare scrieri per `(metric_name, device_id)` — obligatoriu
400
- Paralelism acceptabil între device-uri/metrici diferite
401
- Retry-urile păstrează ordinea pe stream
402

            
403
### 7.6 Error Handling
404

            
405
**Erori permanente** (log + DLQ, nu retry):
406
- topic incompatibil cu grammar-ul busului
407
- tip payload incompatibil
408
- metric necunoscut
409
- out-of-order measurement
410
- valoare respinsă de policy
411

            
412
**Erori tranziente** (retry cu ordine):
413
- PostgreSQL indisponibil
414
- deconectare rețea
415
- deconectare broker temporară
416

            
417
### 7.7 Meta Cache
418

            
419
- Cheie: stem topic (fără ultimul segment stream)
420
- Câmpuri cache: `payload_profile`, `data_type`, `unit`, `historian.enabled`, `historian.mode`, `schema_ref`, `adapter_id`, `source`, `source_ref`
421
- Missing meta = mod degradat, nu stop
422

            
423
### 7.8 Deployment
424

            
425
Model recomandat inițial: un singur worker per site consumând `home` + `energy`.
426

            
427
---
428

            
429
## 8. Measurement Ingestion API
430

            
431
Definit în `tdb_ingestion/mqtt_ingestion_api.md`. Implementat și funcțional.
432

            
433
### 8.1 Semnătură
434

            
435
```sql
436
-- Numeric
437
SELECT * FROM telemetry.ingest_measurement(
438
    p_metric_name => $1,
439
    p_device_id   => $2,
440
    p_value       => $3::double precision,
441
    p_observed_at => $4::timestamptz
442
);
443

            
444
-- Boolean
445
SELECT * FROM telemetry.ingest_measurement(
446
    p_metric_name => $1,
447
    p_device_id   => $2,
448
    p_value       => $3::boolean,
449
    p_observed_at => $4::timestamptz
450
);
451
```
452

            
453
### 8.2 Comportament Automat
454

            
455
Gestionat de PostgreSQL, worker-ul nu reimplementează:
456

            
457
- Auto-provisioning device (`ensure_device()`)
458
- Append-only enforcement (watermark per `metric_name, device_id`)
459
- Policy lookup din `telemetry.metric_policies`
460
- Gap detection la sosirea următorului measurement
461
- Segment splitting (value change, NULL, policy change, gap)
462
- Tail NULL simulation la query time
463

            
464
### 8.3 Response Actions
465

            
466
`opened`, `opened_null`, `extended`, `extended_null`, `split`, `null_to_value`, `value_to_null`, `gap_split`, `gap_to_null`
467

            
468
Worker-ul le loghează, nu branșează logica pe ele.
469

            
470
### 8.4 Idempotency
471

            
472
Limitare curentă: `ingest_measurement()` nu acceptă `idempotency_key`. Replay după network failure ambiguă poate produce eroare out-of-order. Worker-ul tratează aceasta ca duplicate operațional.
473

            
474
### 8.5 Advisory Locking
475

            
476
- Un advisory lock per `(metric_name, device_id)` pe tranzacție
477
- Implementat ca wait (nu NOWAIT)
478
- Recomandare: writer affinity per `(metric_name, device_id)` via routing determinist
479

            
480
---
481

            
482
## 9. Counter Ingestion API
483

            
484
Definit în `tdb_ingestion/counter_ingestion_api.md`. Contractul este acum materializat în repo și reprezintă ținta de implementare pentru worker și backend.
485

            
486
### 9.1 Semnătură
487

            
488
```sql
489
SELECT * FROM telemetry.ingest_counter(
490
    p_metric_name     => $1,
491
    p_device_id       => $2,
492
    p_counter_value   => $3::numeric,
493
    p_observed_at     => $4::timestamptz,
494
    p_source_sequence => $5,  -- nullable
495
    p_idempotency_key => $6,  -- nullable
496
    p_snapshot_id     => $7   -- nullable
497
);
498
```
499

            
500
### 9.2 Reguli Core
501

            
502
- Append-only, ordonat per `(metric_name, device_id)`
503
- `counter_value` nu acceptă NULL; dacă sursa nu are valoare, observația se omite
504
- Gap-uri derivate la query time din freshness, nu scrieri sintetice NULL
505
- Scăderea valorii = boundary semantic (`reset_boundary`, `rollover_boundary`, `invalid_drop`)
506
- Delta și rate nu traversează boundary-uri de reset/rollover
507
- Worker-ul rutează aceste metrici pe `ingest_counter()`, nu pe `ingest_measurement()`
508

            
509
### 9.3 Stream Identity
510

            
511
```
512
stream = (metric_name, device_id)
513
```
514

            
515
Nu există `stream_id` suplimentar. Ordering, freshness, replay metadata — toate per `(metric_name, device_id)`.
516

            
517
### 9.4 Reliability Levels
518

            
519
| Nivel | Cerințe | Garanții |
520
|---|---|---|
521
| `degraded / at-most-once` | fără `source_sequence`, fără `idempotency_key` | retry nesigur |
522
| `recommended / at-least-once` | `source_sequence` sau `idempotency_key` | replay detectabil |
523
| `stronger replay safety` | ambele | deduplicare explicită |
524

            
525
### 9.5 Reporting Modes
526

            
527
| Mode | Semantică |
528
|---|---|
529
| `periodic` | cadență așteptată; lipsa update-ului = gap |
530
| `on_change` | publicare la schimbare; tăcerea nu = delta 0 |
531
| `hybrid` | on_change + heartbeat periodic |
532

            
533
### 9.6 Freshness Defaults
534

            
535
| Reporting mode | Input | Default `stale_after_s` |
536
|---|---|---|
537
| `periodic` | `expected_interval_s` | `expected_interval_s * 2` |
538
| `on_change` | `heartbeat_interval_s` | `heartbeat_interval_s * 2` |
539
| `hybrid` | `heartbeat_interval_s` | `heartbeat_interval_s * 2` |
540

            
541
`stale_after_s` explicit în policy overridează default-ul.
542

            
543
---
544

            
545
## 10. Decizii Luate
546

            
547
### 10.1 Closed for Phase 1
548

            
549
Aceste decizii sunt stabile și se poate implementa pe baza lor.
550

            
551
| # | Decizie | Status |
552
|---|---|---|
553
| 1 | Measurement path: numeric + boolean prin `ingest_measurement()` | Acceptat, implementat |
554
| 2 | Counter path: separat de measurement, prin `ingest_counter()` | Contract stabilizat, neimplementat |
555
| 3 | `last` nu se ingestează ca time-series | Închis |
556
| 4 | Ordering per `(metric_name, device_id)` obligatoriu | Închis |
557
| 5 | Missing source timestamp → fallback la `ingested_at` | Închis |
558
| 6 | Missing `meta` = mod degradat, nu hard stop | Închis |
559
| 7 | String enum states: nu se ingestează fără encoding policy | Închis |
560
| 8 | Counter `stream_id = (metric_name, device_id)` — fără ID suplimentar | Închis |
561
| 9 | Counter: NULL never, gaps derivate la query time | Închis |
562
| 10 | Counter: un model comun cu profile de domeniu, nu separare energy/traffic | Închis |
563
| 11 | Permanent errors → DLQ + log, nu retry | Închis |
564
| 12 | Transient errors → retry cu ordine păstrată | Închis |
565

            
566
### 10.2 Decizii Luate Acum (în acest document)
567

            
568
| # | Decizie | Alegere | Rațiune |
569
|---|---|---|---|
570
| D1 | Snapshot pe bus vs poller→worker | **Snapshot rămâne pe boundary-ul poller→worker.** Bus-ul semantic păstrează publicații per-stream. | Busul semantic e optimizat pentru publicații lightweight per-topic. Snapshot-ul e un artefact de colectare, nu o semantică de bus. Dacă mai târziu se dovedește necesar, se poate evolua contractul explicit. |
571
| D2 | Freshness defaults | **`stale_after_s = expected_interval * 2`** (conform cu draft-ul existent) | Multiplicatorul 2x din draft e suficient de conservator. Override explicit per policy rămâne disponibil. |
572
| D3 | Profile energy/traffic counter | **Amânat până la date reale.** | Nu avem network bus; profilele de traffic nu sunt relevante acum. Energy counter profile se definesc când implementăm `ingest_counter()`. |
573

            
574
---
575

            
576
## 11. Puncte Deschise (non-blocante)
577

            
578
Aceste puncte nu blochează Phase 1 dar vor necesita atenție:
579

            
580
| # | Punct | Când devine relevant |
581
|---|---|---|
582
| O1 | `idempotency_key` pe measurement API | Când avem nevoie de replay sigur pe measurement path |
583
| O2 | Encoding policy pentru string enum states | Când vrem să persistăm HVAC modes sau alte enums |
584
| O3 | Forma fizică a reset/rollover boundary în counter storage | La implementarea `ingest_counter()` |
585
| O4 | Batch SQL pentru counter ingestion | Când volumul de counters justifică optimizarea |
586
| O5 | Completitudine snapshot (cum se declară explicit) | Când se adaugă surse bulk polling |
587

            
588
---
589

            
590
## 12. Testing & Validation
591

            
592
### 12.1 Prima Matrice de Test
593

            
594
| # | Device | Capabilities | Bus | Historian |
595
|---|---|---|---|---|
596
| 1 | Room sensor | `temperature`, `humidity`, `battery` | home | Da (numeric) |
597
| 2 | Door sensor | `contact`, `battery` | home | Da (boolean + numeric) |
598
| 3 | Smart plug | `power` (control) | home | Nu (string enum) |
599
| 3 | Smart plug | `active_power`, `voltage`, `current` | energy | Da (numeric) |
600
| 3 | Smart plug | `energy_total` | energy | Nu (counter path) |
601
| 4 | Remote | `button` / `action` | home | Nu (event, skip default) |
602

            
603
### 12.2 Acceptance Criteria Phase 1
604

            
605
1. Historian stochează samples `home` + `energy` (measurement) fără parsing vendor topics
606
2. `value` topics ingestabile doar cu topic parsing + opțional `meta` cache
607
3. Smart plug proiectat pe ambele busuri fără ambiguitate
608
4. `meta.historian.mode` respectat (sample/state/event/ignore)
609
5. Missing timestamps nu blochează ingestia
610
6. Missing `meta` degradează, nu blochează
611
7. Erori adapter pe `sys`, nu pe bus semantic
612
8. Counter totals coexistă pe bus fără a fi confundate cu measurement path
613

            
614
---
615

            
616
## 13. Hartă de Navigare a Documentelor
617

            
618
| Document | Ce conține | Când îl citești |
619
|---|---|---|
620
| `mqtt_contract.md` | Contract partajat: payload, meta, time, quality, delivery | Când construiești orice componentă care publică sau consumă de pe bus |
621
| `home_bus.md` | Topic grammar, capabilities, historian mapping home | Când lucrezi pe adaptoare home sau pe historian worker |
622
| `energy_bus.md` | Topic grammar, entity types, metrics, historian mapping energy | Când lucrezi pe surse de energie |
623
| `sys_bus.md` | Namespace operațional, stream-uri sys | Când adaugi observabilitate la adaptoare sau worker |
624
| `addapters.md` | Responsabilități adapter, fan-out, multi-bus, Z2M convention | Când construiești un adapter |
625
| `adapter_implementation_examples.md` | Patterns Node-RED, cold-start, failure modes | Când implementezi concret în Node-RED |
626
| `historian_worker.md` | Subscription, mapping, payload handling, ordering, errors | Când construiești historian worker-ul |
627
| `tdb_ingestion/mqtt_ingestion_api.md` | API measurement implementat, error handling, ordering | Când scrii cod de ingestie measurement |
628
| `tdb_ingestion/counter_ingestion_api.md` | Contract counter stabilizat (neimplementat) | Când implementezi counter ingestion |
629

            
630
---
631

            
632
## 14. End-to-End Trace: De la Device la PostgreSQL
633

            
634
### Exemplu 1: Temperatură cameră (measurement, Profile A)
635

            
636
```
637
[Z2M Device] zigbee2mqtt/SENSOR/vad/bedroom/bedroom-sensor
638
  payload: {"temperature": 23.4, "humidity": 41, "battery": 87}
639

            
640
[Adapter] fan-out → 3 publicațiuni:
641
  vad/home/bedroom/temperature/bedroom-sensor/value  → 23.4
642
  vad/home/bedroom/humidity/bedroom-sensor/value     → 41
643
  vad/home/bedroom/battery/bedroom-sensor/value      → 87
644

            
645
[Worker] topic parse:
646
  metric_name = "temperature"
647
  device_id   = "bedroom.bedroom-sensor"
648
  value       = 23.4
649
  observed_at = ingested_at (Profile A, no source timestamp)
650

            
651
[PostgreSQL]
652
  SELECT * FROM telemetry.ingest_measurement(
653
    'temperature', 'bedroom.bedroom-sensor',
654
    23.4::double precision, now()
655
  );
656
  → action: extended
657
```
658

            
659
### Exemplu 2: Smart plug dual projection (measurement + counter)
660

            
661
```
662
[Z2M Device] zigbee2mqtt/PLUG/vad/living-room/tv-plug
663
  payload: {"state":"on", "power":126.8, "energy":4.72, "voltage":229.4, "current":0.58}
664

            
665
[Adapter] fan-out → 5 publicațiuni pe 2 busuri:
666
  vad/home/living-room/power/tv-plug/value                → "on"          (home, string enum, skip historian)
667
  vad/energy/load/living-room-tv/active_power/value       → 126.8         (energy, measurement)
668
  vad/energy/load/living-room-tv/voltage/value            → 229.4         (energy, measurement)
669
  vad/energy/load/living-room-tv/current/value            → 0.58          (energy, measurement)
670
  vad/energy/load/living-room-tv/energy_total/value       → 4.72          (energy, counter — skip measurement path)
671

            
672
[Worker]
673
  active_power → ingest_measurement('active_power', 'load.living-room-tv', 126.8, now())  ✓
674
  voltage      → ingest_measurement('voltage', 'load.living-room-tv', 229.4, now())       ✓
675
  current      → ingest_measurement('current', 'load.living-room-tv', 0.58, now())        ✓
676
  energy_total → skip measurement path, route to counter pipeline                         ⏳
677
  "on"         → skip (string enum, no encoding policy)                                   ⊘
678
```
679

            
680
### Exemplu 3: Contact sensor (boolean measurement)
681

            
682
```
683
[Z2M Device] zigbee2mqtt/SNZB-04P/vad/entrance/front-door
684
  payload: {"contact": false, "battery": 94}
685

            
686
[Adapter] fan-out → 2 publicațiuni:
687
  vad/home/entrance/contact/front-door/value  → false
688
  vad/home/entrance/battery/front-door/value  → 94
689

            
690
[Worker]
691
  contact → ingest_measurement('contact', 'entrance.front-door', false, now())  ✓
692
  battery → ingest_measurement('battery', 'entrance.front-door', 94, now())     ✓
693
```