# Consolidated Bus & Data Collection Specification

Data: 2026-03-20

Acest document consolidează specificațiile existente într-o referință unică pentru busuri, colectare de date și conformitate cu cerințele historian-ului. Nu înlocuiește documentele sursă, ci le leagă și stabilește deciziile rămase deschise.

Documentele sursă rămân canonice. Acest document este o hartă de navigare și un registru de decizii.

---

## 1. Arhitectura Pipeline-ului

```
Device / External System
    ↓
Protocol Adapter (Node-RED)
    ↓
Canonical MQTT Bus (home | energy)
    ↓
Historian Worker
    ↓
telemetry.ingest_measurement(...)   ← measurement path (implementat)
telemetry.ingest_counter(...)       ← counter path (contract stabilizat, neimplementat)
    ↓
PostgreSQL Historian
```

Documente relevante:
- `README.md` — arhitectura generală
- `addapters.md` — responsabilitățile adapterelor
- `historian_worker.md` — responsabilitățile worker-ului

---

## 2. Contractul MQTT Partajat

Definit în `mqtt_contract.md`. Toate busurile moștenesc aceste reguli.

### 2.1 Namespace

- Busuri semantice: `<site>/<bus>/...`
- Namespace operațional: `<site>/sys/...`
- `<site>` = site-id stabil, kebab-case (ex: `vad`)
- Busuri v1: `home`, `energy`

### 2.2 Stream-uri Canonice

| Stream | Scop | Retain | QoS |
|---|---|---|---|
| `value` | sample live hot-path | false | 1 |
| `last` | ultimul sample cu timestamp, pentru cold-start | true | 1 |
| `set` | comandă/request | false | 1 |
| `meta` | metadata statică sau slow-changing | true | 1 |
| `availability` | online/offline | true (+ LWT) | 1 |

Reguli:
- adaptoarele emit date live doar pe `value`
- adaptoarele deduplică `value` când valoarea semantică nu s-a schimbat
- adaptoarele actualizează `last` (retained) la fiecare schimbare de timestamp
- `state` și `event` sunt legacy, nu se introduc de adaptoare noi
- `set` nu se reține niciodată
- historian-ul ingestează doar `value`, ignoră `last`, `set`, `meta`, `availability`

### 2.3 Payload Profiles

**Profile A — Scalar (default pentru hot paths):**

```
23.6
41
true
on
```

- metadata pe `meta` separat (retained)
- `observed_at` = ingestion time (acceptabil pentru Phase 1)

**Profile B — Envelope JSON (opțional):**

```json
{
  "value": 23.6,
  "unit": "C",
  "observed_at": "2026-03-08T10:15:12Z",
  "quality": "good"
}
```

- câmpuri opționale: `published_at`, `source_seq`, `annotations`
- utilizat când fidelitatea timestamp-ului contează sau când quality trebuie propagat per-sample

### 2.4 Meta Contract

Fiecare topic retained `meta` descrie familia sibling de `value`/`last`.

Formă minimă recomandată:

```json
{
  "schema_ref": "mqbus.home.v1",
  "payload_profile": "scalar",
  "data_type": "number",
  "unit": "C",
  "adapter_id": "z2m-main",
  "source": "zigbee2mqtt",
  "source_ref": "0x00158d0008aa1111",
  "source_topic": "zigbee2mqtt/SENSOR/vad/bedroom/bedroom-sensor",
  "precision": 0.1,
  "historian": {
    "enabled": true,
    "mode": "sample"
  }
}
```

Câmpuri historian:
- `historian.enabled`: boolean
- `historian.mode`: `sample` | `state` | `event` | `ignore`
- `historian.retention_class`: opțional (`short`, `default`, `long`)
- `historian.sample_period_hint_s`: opțional

### 2.5 Time Semantics

Trei timestamp-uri distincte:

| Timestamp | Semnificație |
|---|---|
| `observed_at` | când sursa a observat/măsurat valoarea |
| `published_at` | când adapterul a publicat mesajul |
| `ingested_at` | când worker-ul a procesat mesajul |

Reguli:
- dacă sursa are timestamp, adapterul îl păstrează ca `observed_at`
- dacă nu, adapterul omite `observed_at`; worker-ul folosește `ingested_at`
- adaptoarele nu fabrică timestamp-uri
- dacă adapterul estimează timpul, folosește Profile B cu `quality=estimated`

### 2.6 Quality Model

Valori: `good`, `estimated`, `degraded`, `stale`, `invalid`

- `invalid` nu se emite pe bus semantic; merge pe `sys/.../error`
- `quality` se omite doar dacă `good` este implicit

---

## 3. Home Bus

Definit în `home_bus.md`. Bus room-centric pentru senzori, control, automatizare.

### 3.1 Topic Grammar

```
<site>/home/<location>/<capability>/<device_id>/<stream>
```

Exemple:
- `vad/home/bedroom/temperature/bedroom-sensor/value`
- `vad/home/living-room/motion/radar-south/value`
- `vad/home/kitchen/light/ceiling-switch/set`

### 3.2 Capability Catalog

**Environmental:** `temperature`, `humidity`, `pressure`, `illuminance`, `co2`, `voc`, `pm25`, `pm10`

**Presence/Safety:** `motion`, `presence`, `contact`, `water_leak`, `smoke`, `gas`, `tamper`

**Control:** `light`, `power`, `lock`, `cover_position`, `thermostat_mode`, `target_temperature`, `fan_mode`, `button`

**Device health:** `battery`, `battery_low`

Reguli:
- `power` pe `home` doar pentru control semantics (on/off)
- metrici electrice (`active_power`, `energy_total`) pe `energy`
- `linkquality` nu pe `home` (merge pe `sys` sau viitor `network`)
- `presence` = stare derivată de nivel mai înalt; detecția brută mmWave cu `fading_time=0` se publică ca `motion`

### 3.3 Historian Mapping

```
metric_name = <capability>
device_id   = <location>.<device_id>
```

Exemplu:
- Topic: `vad/home/bedroom/temperature/bedroom-sensor/value`
- `metric_name = temperature`
- `device_id = bedroom.bedroom-sensor`
- `value = 23.4` (scalar)
- `observed_at = ingested_at` (Profile A fallback)

---

## 4. Energy Bus

Definit în `energy_bus.md`. Bus pentru topologia electrică: producție, stocare, grid, load.

### 4.1 Topic Grammar

```
<site>/energy/<entity_type>/<entity_id>/<metric>/<stream>
```

Entity types: `source`, `storage`, `grid`, `load`, `transfer`

Exemple:
- `vad/energy/source/pv-roof-1/active_power/value`
- `vad/energy/storage/battery-main/soc/value`
- `vad/energy/grid/main-meter/import_power/value`
- `vad/energy/load/living-room-tv/active_power/value`

### 4.2 Metric Classes

**Measurement-style** (compatibile cu `ingest_measurement()` acum):

`active_power`, `voltage`, `current`, `frequency`, `soc`, `charge_power`, `discharge_power`

**Counter-style** (pe bus, dar NU prin measurement API — urmează `ingest_counter()`):

`energy_total`, `import_energy_total`, `export_energy_total`

### 4.3 Units Canonice

| Metric | Unit |
|---|---|
| power | `W` |
| energy | `Wh` sau `kWh` |
| voltage | `V` |
| current | `A` |
| frequency | `Hz` |
| state of charge | `%` |

### 4.4 Historian Mapping

```
metric_name = <metric>
device_id   = <entity_type>.<entity_id>
```

Exemplu:
- Topic: `vad/energy/storage/battery-main/soc/value`
- `metric_name = soc`
- `device_id = storage.battery-main`

### 4.5 Ownership Rule

- Dacă valoarea e pentru control utilizator într-o cameră → `home`
- Dacă valoarea e pentru contabilitate electrică → `energy`
- Un device fizic poate proiecta pe ambele busuri (smart plug: `power` pe `home`, `active_power` pe `energy`)

---

## 5. Namespace Operațional (sys)

Definit în `sys_bus.md`. Nu este un bus semantic.

### 5.1 Topic Grammar

```
<site>/sys/<producer_kind>/<instance_id>/<stream>
```

Producer kinds v1: `adapter`, `historian`

### 5.2 Stream-uri Operaționale

| Stream | Scop | Retain |
|---|---|---|
| `availability` | liveness-ul componentei | true + LWT |
| `stats` | contori operaționali, snapshot periodic | true |
| `error` | erori structurate pentru operator | false |
| `dlq` | mesaje dead-letter cu context | false |

Exemple:
- `vad/sys/adapter/z2m-main/availability` → `online`
- `vad/sys/historian/main/error` → `{"code":"unknown_metric",...}`

---

## 6. Adaptoare

Definit în `addapters.md` și `adapter_implementation_examples.md`.

### 6.1 Responsabilități

1. Traducere topic-uri din format vendor în bus canonic
2. Normalizare payload (JSON → scalar, conversie unități)
3. Păstrare timestamp sursă (dacă există)
4. Fan-out: 1 mesaj inbound → N publicațiuni canonice
5. Proiecție multi-bus (smart plug → `home` + `energy`)
6. Publicare `meta` retained înaintea traficului live
7. Publicare `availability` retained + LWT
8. Routing erori pe `sys/.../error` și `sys/.../dlq`

### 6.2 NON-Responsabilități

- Nu implementează logica HomeKit
- Nu implementează reguli de automatizare
- Nu agregă senzori
- Nu stochează date istorice
- Nu repară mesaje malformed (le expune pe `sys`)

### 6.3 Publish Boundary

La publicare, mesajul conține DOAR:

```javascript
{ topic: normalizedTopic, payload: normalizedValue }
```

Structurile interne de normalizare se șterg înainte de publish.

### 6.4 Z2M Topic Convention

Friendly name: `<device_type>/<site>/<location>/<device_id>`

Exemplu:
- Z2M: `zigbee2mqtt/ZG-204ZV/vad/balcon/south`
- Canonical: `vad/home/balcon/illuminance/south/value`

Permite traducere deterministă fără lookup tables.

### 6.5 Z2M Field Mapping

| Z2M field | Bus | Canonical capability/metric |
|---|---|---|
| `temperature` | home | `temperature` |
| `humidity` | home | `humidity` |
| `pressure` | home | `pressure` |
| `illuminance` | home | `illuminance` |
| `contact` | home | `contact` |
| `occupancy` (PIR) | home | `motion` |
| `presence` (mmWave, `fading_time=0`) | home | `motion` |
| `battery` | home | `battery` |
| `state` (plug/switch) | home | `power` |
| `action` (remote) | home | `button` |
| `power` (smart plug) | energy | `active_power` |
| `voltage` (smart plug) | energy | `voltage` |
| `current` (smart plug) | energy | `current` |
| `energy` (smart plug) | energy | `energy_total` |

### 6.6 Mapping Registry

Adapoarele folosesc configurație declarativă. Câmpuri recomandate:

`source_system`, `source_topic_match`, `source_field`, `target_bus`, `target_location` / `target_entity_id`, `target_capability` / `target_metric`, `target_device_id`, `stream`, `payload_profile`, `unit`, `historian_enabled`, `historian_mode`

### 6.7 Auto-Provisioning

Adapoarele derivă dimensiunile semantice din structura topic-ului Z2M. Override-uri se aplică doar pentru excepții. Default-uri: `bus=home`, `location=unknown`, `stream=value`.

---

## 7. Historian Worker

Definit în `historian_worker.md`.

### 7.1 Subscription Model

```
+/home/+/+/+/value       ← ingestie primară
+/energy/+/+/+/value      ← ingestie primară
+/home/+/+/+/meta         ← cache local
+/energy/+/+/+/meta       ← cache local
+/home/+/+/+/last         ← ignorat pentru ingestie
+/energy/+/+/+/last       ← ignorat pentru ingestie
```

### 7.2 Topic → Historian Mapping (sumar)

| Bus | `metric_name` | `device_id` |
|---|---|---|
| `home` | `<capability>` | `<location>.<device_id>` |
| `energy` | `<metric>` | `<entity_type>.<entity_id>` |

### 7.3 Payload Handling

**Scalar (Profile A):**
- `value` = payload parsat ca number/boolean
- `observed_at` = ingestion time
- `unit` din meta cache

**Envelope (Profile B):**
- `value` = `payload.value`
- `observed_at` = `payload.observed_at` sau ingestion time
- `unit` = `payload.unit` sau meta cache

### 7.4 Type Routing

| Tip payload | Overload DB |
|---|---|
| numeric | `ingest_measurement(..., value::double precision, ...)` |
| boolean | `ingest_measurement(..., value::boolean, ...)` |
| string enum | SKIP (nu se ingestează fără encoding policy explicit) |
| counter cumulative | SKIP measurement path; rutare pe counter pipeline |

### 7.5 Ordering

- Serializare scrieri per `(metric_name, device_id)` — obligatoriu
- Paralelism acceptabil între device-uri/metrici diferite
- Retry-urile păstrează ordinea pe stream

### 7.6 Error Handling

**Erori permanente** (log + DLQ, nu retry):
- topic incompatibil cu grammar-ul busului
- tip payload incompatibil
- metric necunoscut
- out-of-order measurement
- valoare respinsă de policy

**Erori tranziente** (retry cu ordine):
- PostgreSQL indisponibil
- deconectare rețea
- deconectare broker temporară

### 7.7 Meta Cache

- Cheie: stem topic (fără ultimul segment stream)
- Câmpuri cache: `payload_profile`, `data_type`, `unit`, `historian.enabled`, `historian.mode`, `schema_ref`, `adapter_id`, `source`, `source_ref`
- Missing meta = mod degradat, nu stop

### 7.8 Deployment

Model recomandat inițial: un singur worker per site consumând `home` + `energy`.

---

## 8. Measurement Ingestion API

Definit în `tdb_ingestion/mqtt_ingestion_api.md`. Implementat și funcțional.

### 8.1 Semnătură

```sql
-- Numeric
SELECT * FROM telemetry.ingest_measurement(
    p_metric_name => $1,
    p_device_id   => $2,
    p_value       => $3::double precision,
    p_observed_at => $4::timestamptz
);

-- Boolean
SELECT * FROM telemetry.ingest_measurement(
    p_metric_name => $1,
    p_device_id   => $2,
    p_value       => $3::boolean,
    p_observed_at => $4::timestamptz
);
```

### 8.2 Comportament Automat

Gestionat de PostgreSQL, worker-ul nu reimplementează:

- Auto-provisioning device (`ensure_device()`)
- Append-only enforcement (watermark per `metric_name, device_id`)
- Policy lookup din `telemetry.metric_policies`
- Gap detection la sosirea următorului measurement
- Segment splitting (value change, NULL, policy change, gap)
- Tail NULL simulation la query time

### 8.3 Response Actions

`opened`, `opened_null`, `extended`, `extended_null`, `split`, `null_to_value`, `value_to_null`, `gap_split`, `gap_to_null`

Worker-ul le loghează, nu branșează logica pe ele.

### 8.4 Idempotency

Limitare curentă: `ingest_measurement()` nu acceptă `idempotency_key`. Replay după network failure ambiguă poate produce eroare out-of-order. Worker-ul tratează aceasta ca duplicate operațional.

### 8.5 Advisory Locking

- Un advisory lock per `(metric_name, device_id)` pe tranzacție
- Implementat ca wait (nu NOWAIT)
- Recomandare: writer affinity per `(metric_name, device_id)` via routing determinist

---

## 9. Counter Ingestion API

Definit în `tdb_ingestion/counter_ingestion_api.md`. Contractul este acum materializat în repo și reprezintă ținta de implementare pentru worker și backend.

### 9.1 Semnătură

```sql
SELECT * FROM telemetry.ingest_counter(
    p_metric_name     => $1,
    p_device_id       => $2,
    p_counter_value   => $3::numeric,
    p_observed_at     => $4::timestamptz,
    p_source_sequence => $5,  -- nullable
    p_idempotency_key => $6,  -- nullable
    p_snapshot_id     => $7   -- nullable
);
```

### 9.2 Reguli Core

- Append-only, ordonat per `(metric_name, device_id)`
- `counter_value` nu acceptă NULL; dacă sursa nu are valoare, observația se omite
- Gap-uri derivate la query time din freshness, nu scrieri sintetice NULL
- Scăderea valorii = boundary semantic (`reset_boundary`, `rollover_boundary`, `invalid_drop`)
- Delta și rate nu traversează boundary-uri de reset/rollover
- Worker-ul rutează aceste metrici pe `ingest_counter()`, nu pe `ingest_measurement()`

### 9.3 Stream Identity

```
stream = (metric_name, device_id)
```

Nu există `stream_id` suplimentar. Ordering, freshness, replay metadata — toate per `(metric_name, device_id)`.

### 9.4 Reliability Levels

| Nivel | Cerințe | Garanții |
|---|---|---|
| `degraded / at-most-once` | fără `source_sequence`, fără `idempotency_key` | retry nesigur |
| `recommended / at-least-once` | `source_sequence` sau `idempotency_key` | replay detectabil |
| `stronger replay safety` | ambele | deduplicare explicită |

### 9.5 Reporting Modes

| Mode | Semantică |
|---|---|
| `periodic` | cadență așteptată; lipsa update-ului = gap |
| `on_change` | publicare la schimbare; tăcerea nu = delta 0 |
| `hybrid` | on_change + heartbeat periodic |

### 9.6 Freshness Defaults

| Reporting mode | Input | Default `stale_after_s` |
|---|---|---|
| `periodic` | `expected_interval_s` | `expected_interval_s * 2` |
| `on_change` | `heartbeat_interval_s` | `heartbeat_interval_s * 2` |
| `hybrid` | `heartbeat_interval_s` | `heartbeat_interval_s * 2` |

`stale_after_s` explicit în policy overridează default-ul.

---

## 10. Decizii Luate

### 10.1 Closed for Phase 1

Aceste decizii sunt stabile și se poate implementa pe baza lor.

| # | Decizie | Status |
|---|---|---|
| 1 | Measurement path: numeric + boolean prin `ingest_measurement()` | Acceptat, implementat |
| 2 | Counter path: separat de measurement, prin `ingest_counter()` | Contract stabilizat, neimplementat |
| 3 | `last` nu se ingestează ca time-series | Închis |
| 4 | Ordering per `(metric_name, device_id)` obligatoriu | Închis |
| 5 | Missing source timestamp → fallback la `ingested_at` | Închis |
| 6 | Missing `meta` = mod degradat, nu hard stop | Închis |
| 7 | String enum states: nu se ingestează fără encoding policy | Închis |
| 8 | Counter `stream_id = (metric_name, device_id)` — fără ID suplimentar | Închis |
| 9 | Counter: NULL never, gaps derivate la query time | Închis |
| 10 | Counter: un model comun cu profile de domeniu, nu separare energy/traffic | Închis |
| 11 | Permanent errors → DLQ + log, nu retry | Închis |
| 12 | Transient errors → retry cu ordine păstrată | Închis |

### 10.2 Decizii Luate Acum (în acest document)

| # | Decizie | Alegere | Rațiune |
|---|---|---|---|
| D1 | Snapshot pe bus vs poller→worker | **Snapshot rămâne pe boundary-ul poller→worker.** Bus-ul semantic păstrează publicații per-stream. | Busul semantic e optimizat pentru publicații lightweight per-topic. Snapshot-ul e un artefact de colectare, nu o semantică de bus. Dacă mai târziu se dovedește necesar, se poate evolua contractul explicit. |
| D2 | Freshness defaults | **`stale_after_s = expected_interval * 2`** (conform cu draft-ul existent) | Multiplicatorul 2x din draft e suficient de conservator. Override explicit per policy rămâne disponibil. |
| D3 | Profile energy/traffic counter | **Amânat până la date reale.** | Nu avem network bus; profilele de traffic nu sunt relevante acum. Energy counter profile se definesc când implementăm `ingest_counter()`. |

---

## 11. Puncte Deschise (non-blocante)

Aceste puncte nu blochează Phase 1 dar vor necesita atenție:

| # | Punct | Când devine relevant |
|---|---|---|
| O1 | `idempotency_key` pe measurement API | Când avem nevoie de replay sigur pe measurement path |
| O2 | Encoding policy pentru string enum states | Când vrem să persistăm HVAC modes sau alte enums |
| O3 | Forma fizică a reset/rollover boundary în counter storage | La implementarea `ingest_counter()` |
| O4 | Batch SQL pentru counter ingestion | Când volumul de counters justifică optimizarea |
| O5 | Completitudine snapshot (cum se declară explicit) | Când se adaugă surse bulk polling |

---

## 12. Testing & Validation

### 12.1 Prima Matrice de Test

| # | Device | Capabilities | Bus | Historian |
|---|---|---|---|---|
| 1 | Room sensor | `temperature`, `humidity`, `battery` | home | Da (numeric) |
| 2 | Door sensor | `contact`, `battery` | home | Da (boolean + numeric) |
| 3 | Smart plug | `power` (control) | home | Nu (string enum) |
| 3 | Smart plug | `active_power`, `voltage`, `current` | energy | Da (numeric) |
| 3 | Smart plug | `energy_total` | energy | Nu (counter path) |
| 4 | Remote | `button` / `action` | home | Nu (event, skip default) |

### 12.2 Acceptance Criteria Phase 1

1. Historian stochează samples `home` + `energy` (measurement) fără parsing vendor topics
2. `value` topics ingestabile doar cu topic parsing + opțional `meta` cache
3. Smart plug proiectat pe ambele busuri fără ambiguitate
4. `meta.historian.mode` respectat (sample/state/event/ignore)
5. Missing timestamps nu blochează ingestia
6. Missing `meta` degradează, nu blochează
7. Erori adapter pe `sys`, nu pe bus semantic
8. Counter totals coexistă pe bus fără a fi confundate cu measurement path

---

## 13. Hartă de Navigare a Documentelor

| Document | Ce conține | Când îl citești |
|---|---|---|
| `mqtt_contract.md` | Contract partajat: payload, meta, time, quality, delivery | Când construiești orice componentă care publică sau consumă de pe bus |
| `home_bus.md` | Topic grammar, capabilities, historian mapping home | Când lucrezi pe adaptoare home sau pe historian worker |
| `energy_bus.md` | Topic grammar, entity types, metrics, historian mapping energy | Când lucrezi pe surse de energie |
| `sys_bus.md` | Namespace operațional, stream-uri sys | Când adaugi observabilitate la adaptoare sau worker |
| `addapters.md` | Responsabilități adapter, fan-out, multi-bus, Z2M convention | Când construiești un adapter |
| `adapter_implementation_examples.md` | Patterns Node-RED, cold-start, failure modes | Când implementezi concret în Node-RED |
| `historian_worker.md` | Subscription, mapping, payload handling, ordering, errors | Când construiești historian worker-ul |
| `tdb_ingestion/mqtt_ingestion_api.md` | API measurement implementat, error handling, ordering | Când scrii cod de ingestie measurement |
| `tdb_ingestion/counter_ingestion_api.md` | Contract counter stabilizat (neimplementat) | Când implementezi counter ingestion |

---

## 14. End-to-End Trace: De la Device la PostgreSQL

### Exemplu 1: Temperatură cameră (measurement, Profile A)

```
[Z2M Device] zigbee2mqtt/SENSOR/vad/bedroom/bedroom-sensor
  payload: {"temperature": 23.4, "humidity": 41, "battery": 87}

[Adapter] fan-out → 3 publicațiuni:
  vad/home/bedroom/temperature/bedroom-sensor/value  → 23.4
  vad/home/bedroom/humidity/bedroom-sensor/value     → 41
  vad/home/bedroom/battery/bedroom-sensor/value      → 87

[Worker] topic parse:
  metric_name = "temperature"
  device_id   = "bedroom.bedroom-sensor"
  value       = 23.4
  observed_at = ingested_at (Profile A, no source timestamp)

[PostgreSQL]
  SELECT * FROM telemetry.ingest_measurement(
    'temperature', 'bedroom.bedroom-sensor',
    23.4::double precision, now()
  );
  → action: extended
```

### Exemplu 2: Smart plug dual projection (measurement + counter)

```
[Z2M Device] zigbee2mqtt/PLUG/vad/living-room/tv-plug
  payload: {"state":"on", "power":126.8, "energy":4.72, "voltage":229.4, "current":0.58}

[Adapter] fan-out → 5 publicațiuni pe 2 busuri:
  vad/home/living-room/power/tv-plug/value                → "on"          (home, string enum, skip historian)
  vad/energy/load/living-room-tv/active_power/value       → 126.8         (energy, measurement)
  vad/energy/load/living-room-tv/voltage/value            → 229.4         (energy, measurement)
  vad/energy/load/living-room-tv/current/value            → 0.58          (energy, measurement)
  vad/energy/load/living-room-tv/energy_total/value       → 4.72          (energy, counter — skip measurement path)

[Worker]
  active_power → ingest_measurement('active_power', 'load.living-room-tv', 126.8, now())  ✓
  voltage      → ingest_measurement('voltage', 'load.living-room-tv', 229.4, now())       ✓
  current      → ingest_measurement('current', 'load.living-room-tv', 0.58, now())        ✓
  energy_total → skip measurement path, route to counter pipeline                         ⏳
  "on"         → skip (string enum, no encoding policy)                                   ⊘
```

### Exemplu 3: Contact sensor (boolean measurement)

```
[Z2M Device] zigbee2mqtt/SNZB-04P/vad/entrance/front-door
  payload: {"contact": false, "battery": 94}

[Adapter] fan-out → 2 publicațiuni:
  vad/home/entrance/contact/front-door/value  → false
  vad/home/entrance/battery/front-door/value  → 94

[Worker]
  contact → ingest_measurement('contact', 'entrance.front-door', false, now())  ✓
  battery → ingest_measurement('battery', 'entrance.front-door', 94, now())     ✓
```
