Data: 2026-03-20
Acest document consolidează specificațiile existente într-o referință unică pentru busuri, colectare de date și conformitate cu cerințele historian-ului. Nu înlocuiește documentele sursă, ci le leagă și stabilește deciziile rămase deschise.
Documentele sursă rămân canonice. Acest document este o hartă de navigare și un registru de decizii.
Device / External System
↓
Protocol Adapter (Node-RED)
↓
Canonical MQTT Bus (home | energy)
↓
Historian Worker
↓
telemetry.ingest_measurement(...) ← measurement path (implementat)
telemetry.ingest_counter(...) ← counter path (contract stabilizat, neimplementat)
↓
PostgreSQL Historian
Documente relevante:
- README.md — arhitectura generală
- addapters.md — responsabilitățile adapterelor
- historian_worker.md — responsabilitățile worker-ului
Definit în mqtt_contract.md. Toate busurile moștenesc aceste reguli.
<site>/<bus>/...<site>/sys/...<site> = site-id stabil, kebab-case (ex: vad)home, energy| Stream | Scop | Retain | QoS |
|---|---|---|---|
value |
sample live hot-path | false | 1 |
last |
ultimul sample cu timestamp, pentru cold-start | true | 1 |
set |
comandă/request | false | 1 |
meta |
metadata statică sau slow-changing | true | 1 |
availability |
online/offline | true (+ LWT) | 1 |
Reguli:
- adaptoarele emit date live doar pe value
- adaptoarele deduplică value când valoarea semantică nu s-a schimbat
- adaptoarele actualizează last (retained) la fiecare schimbare de timestamp
- state și event sunt legacy, nu se introduc de adaptoare noi
- set nu se reține niciodată
- historian-ul ingestează doar value, ignoră last, set, meta, availability
Profile A — Scalar (default pentru hot paths):
23.6
41
true
on
meta separat (retained)observed_at = ingestion time (acceptabil pentru Phase 1)Profile B — Envelope JSON (opțional):
{
"value": 23.6,
"unit": "C",
"observed_at": "2026-03-08T10:15:12Z",
"quality": "good"
}
published_at, source_seq, annotationsFiecare topic retained meta descrie familia sibling de value/last.
Formă minimă recomandată:
{
"schema_ref": "mqbus.home.v1",
"payload_profile": "scalar",
"data_type": "number",
"unit": "C",
"adapter_id": "z2m-main",
"source": "zigbee2mqtt",
"source_ref": "0x00158d0008aa1111",
"source_topic": "zigbee2mqtt/SENSOR/vad/bedroom/bedroom-sensor",
"precision": 0.1,
"historian": {
"enabled": true,
"mode": "sample"
}
}
Câmpuri historian:
- historian.enabled: boolean
- historian.mode: sample | state | event | ignore
- historian.retention_class: opțional (short, default, long)
- historian.sample_period_hint_s: opțional
Trei timestamp-uri distincte:
| Timestamp | Semnificație |
|---|---|
observed_at |
când sursa a observat/măsurat valoarea |
published_at |
când adapterul a publicat mesajul |
ingested_at |
când worker-ul a procesat mesajul |
Reguli:
- dacă sursa are timestamp, adapterul îl păstrează ca observed_at
- dacă nu, adapterul omite observed_at; worker-ul folosește ingested_at
- adaptoarele nu fabrică timestamp-uri
- dacă adapterul estimează timpul, folosește Profile B cu quality=estimated
Valori: good, estimated, degraded, stale, invalid
invalid nu se emite pe bus semantic; merge pe sys/.../errorquality se omite doar dacă good este implicitDefinit în home_bus.md. Bus room-centric pentru senzori, control, automatizare.
<site>/home/<location>/<capability>/<device_id>/<stream>
Exemple:
- vad/home/bedroom/temperature/bedroom-sensor/value
- vad/home/living-room/motion/radar-south/value
- vad/home/kitchen/light/ceiling-switch/set
Environmental: temperature, humidity, pressure, illuminance, co2, voc, pm25, pm10
Presence/Safety: motion, presence, contact, water_leak, smoke, gas, tamper
Control: light, power, lock, cover_position, thermostat_mode, target_temperature, fan_mode, button
Device health: battery, battery_low
Reguli:
- power pe home doar pentru control semantics (on/off)
- metrici electrice (active_power, energy_total) pe energy
- linkquality nu pe home (merge pe sys sau viitor network)
- presence = stare derivată de nivel mai înalt; detecția brută mmWave cu fading_time=0 se publică ca motion
metric_name = <capability>
device_id = <location>.<device_id>
Exemplu:
- Topic: vad/home/bedroom/temperature/bedroom-sensor/value
- metric_name = temperature
- device_id = bedroom.bedroom-sensor
- value = 23.4 (scalar)
- observed_at = ingested_at (Profile A fallback)
Definit în energy_bus.md. Bus pentru topologia electrică: producție, stocare, grid, load.
<site>/energy/<entity_type>/<entity_id>/<metric>/<stream>
Entity types: source, storage, grid, load, transfer
Exemple:
- vad/energy/source/pv-roof-1/active_power/value
- vad/energy/storage/battery-main/soc/value
- vad/energy/grid/main-meter/import_power/value
- vad/energy/load/living-room-tv/active_power/value
Measurement-style (compatibile cu ingest_measurement() acum):
active_power, voltage, current, frequency, soc, charge_power, discharge_power
Counter-style (pe bus, dar NU prin measurement API — urmează ingest_counter()):
energy_total, import_energy_total, export_energy_total
| Metric | Unit |
|---|---|
| power | W |
| energy | Wh sau kWh |
| voltage | V |
| current | A |
| frequency | Hz |
| state of charge | % |
metric_name = <metric>
device_id = <entity_type>.<entity_id>
Exemplu:
- Topic: vad/energy/storage/battery-main/soc/value
- metric_name = soc
- device_id = storage.battery-main
homeenergypower pe home, active_power pe energy)Definit în sys_bus.md. Nu este un bus semantic.
<site>/sys/<producer_kind>/<instance_id>/<stream>
Producer kinds v1: adapter, historian
| Stream | Scop | Retain |
|---|---|---|
availability |
liveness-ul componentei | true + LWT |
stats |
contori operaționali, snapshot periodic | true |
error |
erori structurate pentru operator | false |
dlq |
mesaje dead-letter cu context | false |
Exemple:
- vad/sys/adapter/z2m-main/availability → online
- vad/sys/historian/main/error → {"code":"unknown_metric",...}
Definit în addapters.md și adapter_implementation_examples.md.
home + energy)meta retained înaintea traficului liveavailability retained + LWTsys/.../error și sys/.../dlqsys)La publicare, mesajul conține DOAR:
{ topic: normalizedTopic, payload: normalizedValue }
Structurile interne de normalizare se șterg înainte de publish.
Friendly name: <device_type>/<site>/<location>/<device_id>
Exemplu:
- Z2M: zigbee2mqtt/ZG-204ZV/vad/balcon/south
- Canonical: vad/home/balcon/illuminance/south/value
Permite traducere deterministă fără lookup tables.
| Z2M field | Bus | Canonical capability/metric |
|---|---|---|
temperature |
home | temperature |
humidity |
home | humidity |
pressure |
home | pressure |
illuminance |
home | illuminance |
contact |
home | contact |
occupancy (PIR) |
home | motion |
presence (mmWave, fading_time=0) |
home | motion |
battery |
home | battery |
state (plug/switch) |
home | power |
action (remote) |
home | button |
power (smart plug) |
energy | active_power |
voltage (smart plug) |
energy | voltage |
current (smart plug) |
energy | current |
energy (smart plug) |
energy | energy_total |
Adapoarele folosesc configurație declarativă. Câmpuri recomandate:
source_system, source_topic_match, source_field, target_bus, target_location / target_entity_id, target_capability / target_metric, target_device_id, stream, payload_profile, unit, historian_enabled, historian_mode
Adapoarele derivă dimensiunile semantice din structura topic-ului Z2M. Override-uri se aplică doar pentru excepții. Default-uri: bus=home, location=unknown, stream=value.
Definit în historian_worker.md.
+/home/+/+/+/value ← ingestie primară
+/energy/+/+/+/value ← ingestie primară
+/home/+/+/+/meta ← cache local
+/energy/+/+/+/meta ← cache local
+/home/+/+/+/last ← ignorat pentru ingestie
+/energy/+/+/+/last ← ignorat pentru ingestie
| Bus | metric_name |
device_id |
|---|---|---|
home |
<capability> |
<location>.<device_id> |
energy |
<metric> |
<entity_type>.<entity_id> |
Scalar (Profile A):
- value = payload parsat ca number/boolean
- observed_at = ingestion time
- unit din meta cache
Envelope (Profile B):
- value = payload.value
- observed_at = payload.observed_at sau ingestion time
- unit = payload.unit sau meta cache
| Tip payload | Overload DB |
|---|---|
| numeric | ingest_measurement(..., value::double precision, ...) |
| boolean | ingest_measurement(..., value::boolean, ...) |
| string enum | SKIP (nu se ingestează fără encoding policy explicit) |
| counter cumulative | SKIP measurement path; rutare pe counter pipeline |
(metric_name, device_id) — obligatoriuErori permanente (log + DLQ, nu retry): - topic incompatibil cu grammar-ul busului - tip payload incompatibil - metric necunoscut - out-of-order measurement - valoare respinsă de policy
Erori tranziente (retry cu ordine): - PostgreSQL indisponibil - deconectare rețea - deconectare broker temporară
payload_profile, data_type, unit, historian.enabled, historian.mode, schema_ref, adapter_id, source, source_refModel recomandat inițial: un singur worker per site consumând home + energy.
Definit în tdb_ingestion/mqtt_ingestion_api.md. Implementat și funcțional.
-- Numeric
SELECT * FROM telemetry.ingest_measurement(
p_metric_name => $1,
p_device_id => $2,
p_value => $3::double precision,
p_observed_at => $4::timestamptz
);
-- Boolean
SELECT * FROM telemetry.ingest_measurement(
p_metric_name => $1,
p_device_id => $2,
p_value => $3::boolean,
p_observed_at => $4::timestamptz
);
Gestionat de PostgreSQL, worker-ul nu reimplementează:
ensure_device())metric_name, device_id)telemetry.metric_policiesopened, opened_null, extended, extended_null, split, null_to_value, value_to_null, gap_split, gap_to_null
Worker-ul le loghează, nu branșează logica pe ele.
Limitare curentă: ingest_measurement() nu acceptă idempotency_key. Replay după network failure ambiguă poate produce eroare out-of-order. Worker-ul tratează aceasta ca duplicate operațional.
(metric_name, device_id) pe tranzacție(metric_name, device_id) via routing deterministDefinit în tdb_ingestion/counter_ingestion_api.md. Contractul este acum materializat în repo și reprezintă ținta de implementare pentru worker și backend.
SELECT * FROM telemetry.ingest_counter(
p_metric_name => $1,
p_device_id => $2,
p_counter_value => $3::numeric,
p_observed_at => $4::timestamptz,
p_source_sequence => $5, -- nullable
p_idempotency_key => $6, -- nullable
p_snapshot_id => $7 -- nullable
);
(metric_name, device_id)counter_value nu acceptă NULL; dacă sursa nu are valoare, observația se omitereset_boundary, rollover_boundary, invalid_drop)ingest_counter(), nu pe ingest_measurement()stream = (metric_name, device_id)
Nu există stream_id suplimentar. Ordering, freshness, replay metadata — toate per (metric_name, device_id).
| Nivel | Cerințe | Garanții |
|---|---|---|
degraded / at-most-once |
fără source_sequence, fără idempotency_key |
retry nesigur |
recommended / at-least-once |
source_sequence sau idempotency_key |
replay detectabil |
stronger replay safety |
ambele | deduplicare explicită |
| Mode | Semantică |
|---|---|
periodic |
cadență așteptată; lipsa update-ului = gap |
on_change |
publicare la schimbare; tăcerea nu = delta 0 |
hybrid |
on_change + heartbeat periodic |
| Reporting mode | Input | Default stale_after_s |
|---|---|---|
periodic |
expected_interval_s |
expected_interval_s * 2 |
on_change |
heartbeat_interval_s |
heartbeat_interval_s * 2 |
hybrid |
heartbeat_interval_s |
heartbeat_interval_s * 2 |
stale_after_s explicit în policy overridează default-ul.
Aceste decizii sunt stabile și se poate implementa pe baza lor.
| # | Decizie | Status |
|---|---|---|
| 1 | Measurement path: numeric + boolean prin ingest_measurement() |
Acceptat, implementat |
| 2 | Counter path: separat de measurement, prin ingest_counter() |
Contract stabilizat, neimplementat |
| 3 | last nu se ingestează ca time-series |
Închis |
| 4 | Ordering per (metric_name, device_id) obligatoriu |
Închis |
| 5 | Missing source timestamp → fallback la ingested_at |
Închis |
| 6 | Missing meta = mod degradat, nu hard stop |
Închis |
| 7 | String enum states: nu se ingestează fără encoding policy | Închis |
| 8 | Counter stream_id = (metric_name, device_id) — fără ID suplimentar |
Închis |
| 9 | Counter: NULL never, gaps derivate la query time | Închis |
| 10 | Counter: un model comun cu profile de domeniu, nu separare energy/traffic | Închis |
| 11 | Permanent errors → DLQ + log, nu retry | Închis |
| 12 | Transient errors → retry cu ordine păstrată | Închis |
| # | Decizie | Alegere | Rațiune |
|---|---|---|---|
| D1 | Snapshot pe bus vs poller→worker | Snapshot rămâne pe boundary-ul poller→worker. Bus-ul semantic păstrează publicații per-stream. | Busul semantic e optimizat pentru publicații lightweight per-topic. Snapshot-ul e un artefact de colectare, nu o semantică de bus. Dacă mai târziu se dovedește necesar, se poate evolua contractul explicit. |
| D2 | Freshness defaults | stale_after_s = expected_interval * 2 (conform cu draft-ul existent) |
Multiplicatorul 2x din draft e suficient de conservator. Override explicit per policy rămâne disponibil. |
| D3 | Profile energy/traffic counter | Amânat până la date reale. | Nu avem network bus; profilele de traffic nu sunt relevante acum. Energy counter profile se definesc când implementăm ingest_counter(). |
Aceste puncte nu blochează Phase 1 dar vor necesita atenție:
| # | Punct | Când devine relevant |
|---|---|---|
| O1 | idempotency_key pe measurement API |
Când avem nevoie de replay sigur pe measurement path |
| O2 | Encoding policy pentru string enum states | Când vrem să persistăm HVAC modes sau alte enums |
| O3 | Forma fizică a reset/rollover boundary în counter storage | La implementarea ingest_counter() |
| O4 | Batch SQL pentru counter ingestion | Când volumul de counters justifică optimizarea |
| O5 | Completitudine snapshot (cum se declară explicit) | Când se adaugă surse bulk polling |
| # | Device | Capabilities | Bus | Historian |
|---|---|---|---|---|
| 1 | Room sensor | temperature, humidity, battery |
home | Da (numeric) |
| 2 | Door sensor | contact, battery |
home | Da (boolean + numeric) |
| 3 | Smart plug | power (control) |
home | Nu (string enum) |
| 3 | Smart plug | active_power, voltage, current |
energy | Da (numeric) |
| 3 | Smart plug | energy_total |
energy | Nu (counter path) |
| 4 | Remote | button / action |
home | Nu (event, skip default) |
home + energy (measurement) fără parsing vendor topicsvalue topics ingestabile doar cu topic parsing + opțional meta cachemeta.historian.mode respectat (sample/state/event/ignore)meta degradează, nu blocheazăsys, nu pe bus semantic| Document | Ce conține | Când îl citești |
|---|---|---|
mqtt_contract.md |
Contract partajat: payload, meta, time, quality, delivery | Când construiești orice componentă care publică sau consumă de pe bus |
home_bus.md |
Topic grammar, capabilities, historian mapping home | Când lucrezi pe adaptoare home sau pe historian worker |
energy_bus.md |
Topic grammar, entity types, metrics, historian mapping energy | Când lucrezi pe surse de energie |
sys_bus.md |
Namespace operațional, stream-uri sys | Când adaugi observabilitate la adaptoare sau worker |
addapters.md |
Responsabilități adapter, fan-out, multi-bus, Z2M convention | Când construiești un adapter |
adapter_implementation_examples.md |
Patterns Node-RED, cold-start, failure modes | Când implementezi concret în Node-RED |
historian_worker.md |
Subscription, mapping, payload handling, ordering, errors | Când construiești historian worker-ul |
tdb_ingestion/mqtt_ingestion_api.md |
API measurement implementat, error handling, ordering | Când scrii cod de ingestie measurement |
tdb_ingestion/counter_ingestion_api.md |
Contract counter stabilizat (neimplementat) | Când implementezi counter ingestion |
[Z2M Device] zigbee2mqtt/SENSOR/vad/bedroom/bedroom-sensor
payload: {"temperature": 23.4, "humidity": 41, "battery": 87}
[Adapter] fan-out → 3 publicațiuni:
vad/home/bedroom/temperature/bedroom-sensor/value → 23.4
vad/home/bedroom/humidity/bedroom-sensor/value → 41
vad/home/bedroom/battery/bedroom-sensor/value → 87
[Worker] topic parse:
metric_name = "temperature"
device_id = "bedroom.bedroom-sensor"
value = 23.4
observed_at = ingested_at (Profile A, no source timestamp)
[PostgreSQL]
SELECT * FROM telemetry.ingest_measurement(
'temperature', 'bedroom.bedroom-sensor',
23.4::double precision, now()
);
→ action: extended
[Z2M Device] zigbee2mqtt/PLUG/vad/living-room/tv-plug
payload: {"state":"on", "power":126.8, "energy":4.72, "voltage":229.4, "current":0.58}
[Adapter] fan-out → 5 publicațiuni pe 2 busuri:
vad/home/living-room/power/tv-plug/value → "on" (home, string enum, skip historian)
vad/energy/load/living-room-tv/active_power/value → 126.8 (energy, measurement)
vad/energy/load/living-room-tv/voltage/value → 229.4 (energy, measurement)
vad/energy/load/living-room-tv/current/value → 0.58 (energy, measurement)
vad/energy/load/living-room-tv/energy_total/value → 4.72 (energy, counter — skip measurement path)
[Worker]
active_power → ingest_measurement('active_power', 'load.living-room-tv', 126.8, now()) ✓
voltage → ingest_measurement('voltage', 'load.living-room-tv', 229.4, now()) ✓
current → ingest_measurement('current', 'load.living-room-tv', 0.58, now()) ✓
energy_total → skip measurement path, route to counter pipeline ⏳
"on" → skip (string enum, no encoding policy) ⊘
[Z2M Device] zigbee2mqtt/SNZB-04P/vad/entrance/front-door
payload: {"contact": false, "battery": 94}
[Adapter] fan-out → 2 publicațiuni:
vad/home/entrance/contact/front-door/value → false
vad/home/entrance/battery/front-door/value → 94
[Worker]
contact → ingest_measurement('contact', 'entrance.front-door', false, now()) ✓
battery → ingest_measurement('battery', 'entrance.front-door', 94, now()) ✓