mqtt_bus / consolidated_spec.md
1 contributor
693 lines | 23.52kb

Consolidated Bus & Data Collection Specification

Data: 2026-03-20

Acest document consolidează specificațiile existente într-o referință unică pentru busuri, colectare de date și conformitate cu cerințele historian-ului. Nu înlocuiește documentele sursă, ci le leagă și stabilește deciziile rămase deschise.

Documentele sursă rămân canonice. Acest document este o hartă de navigare și un registru de decizii.


1. Arhitectura Pipeline-ului

Device / External System
    ↓
Protocol Adapter (Node-RED)
    ↓
Canonical MQTT Bus (home | energy)
    ↓
Historian Worker
    ↓
telemetry.ingest_measurement(...)   ← measurement path (implementat)
telemetry.ingest_counter(...)       ← counter path (contract stabilizat, neimplementat)
    ↓
PostgreSQL Historian

Documente relevante: - README.md — arhitectura generală - addapters.md — responsabilitățile adapterelor - historian_worker.md — responsabilitățile worker-ului


2. Contractul MQTT Partajat

Definit în mqtt_contract.md. Toate busurile moștenesc aceste reguli.

2.1 Namespace

  • Busuri semantice: <site>/<bus>/...
  • Namespace operațional: <site>/sys/...
  • <site> = site-id stabil, kebab-case (ex: vad)
  • Busuri v1: home, energy

2.2 Stream-uri Canonice

Stream Scop Retain QoS
value sample live hot-path false 1
last ultimul sample cu timestamp, pentru cold-start true 1
set comandă/request false 1
meta metadata statică sau slow-changing true 1
availability online/offline true (+ LWT) 1

Reguli: - adaptoarele emit date live doar pe value - adaptoarele deduplică value când valoarea semantică nu s-a schimbat - adaptoarele actualizează last (retained) la fiecare schimbare de timestamp - state și event sunt legacy, nu se introduc de adaptoare noi - set nu se reține niciodată - historian-ul ingestează doar value, ignoră last, set, meta, availability

2.3 Payload Profiles

Profile A — Scalar (default pentru hot paths):

23.6
41
true
on
  • metadata pe meta separat (retained)
  • observed_at = ingestion time (acceptabil pentru Phase 1)

Profile B — Envelope JSON (opțional):

{
  "value": 23.6,
  "unit": "C",
  "observed_at": "2026-03-08T10:15:12Z",
  "quality": "good"
}
  • câmpuri opționale: published_at, source_seq, annotations
  • utilizat când fidelitatea timestamp-ului contează sau când quality trebuie propagat per-sample

2.4 Meta Contract

Fiecare topic retained meta descrie familia sibling de value/last.

Formă minimă recomandată:

{
  "schema_ref": "mqbus.home.v1",
  "payload_profile": "scalar",
  "data_type": "number",
  "unit": "C",
  "adapter_id": "z2m-main",
  "source": "zigbee2mqtt",
  "source_ref": "0x00158d0008aa1111",
  "source_topic": "zigbee2mqtt/SENSOR/vad/bedroom/bedroom-sensor",
  "precision": 0.1,
  "historian": {
    "enabled": true,
    "mode": "sample"
  }
}

Câmpuri historian: - historian.enabled: boolean - historian.mode: sample | state | event | ignore - historian.retention_class: opțional (short, default, long) - historian.sample_period_hint_s: opțional

2.5 Time Semantics

Trei timestamp-uri distincte:

Timestamp Semnificație
observed_at când sursa a observat/măsurat valoarea
published_at când adapterul a publicat mesajul
ingested_at când worker-ul a procesat mesajul

Reguli: - dacă sursa are timestamp, adapterul îl păstrează ca observed_at - dacă nu, adapterul omite observed_at; worker-ul folosește ingested_at - adaptoarele nu fabrică timestamp-uri - dacă adapterul estimează timpul, folosește Profile B cu quality=estimated

2.6 Quality Model

Valori: good, estimated, degraded, stale, invalid

  • invalid nu se emite pe bus semantic; merge pe sys/.../error
  • quality se omite doar dacă good este implicit

3. Home Bus

Definit în home_bus.md. Bus room-centric pentru senzori, control, automatizare.

3.1 Topic Grammar

<site>/home/<location>/<capability>/<device_id>/<stream>

Exemple: - vad/home/bedroom/temperature/bedroom-sensor/value - vad/home/living-room/motion/radar-south/value - vad/home/kitchen/light/ceiling-switch/set

3.2 Capability Catalog

Environmental: temperature, humidity, pressure, illuminance, co2, voc, pm25, pm10

Presence/Safety: motion, presence, contact, water_leak, smoke, gas, tamper

Control: light, power, lock, cover_position, thermostat_mode, target_temperature, fan_mode, button

Device health: battery, battery_low

Reguli: - power pe home doar pentru control semantics (on/off) - metrici electrice (active_power, energy_total) pe energy - linkquality nu pe home (merge pe sys sau viitor network) - presence = stare derivată de nivel mai înalt; detecția brută mmWave cu fading_time=0 se publică ca motion

3.3 Historian Mapping

metric_name = <capability>
device_id   = <location>.<device_id>

Exemplu: - Topic: vad/home/bedroom/temperature/bedroom-sensor/value - metric_name = temperature - device_id = bedroom.bedroom-sensor - value = 23.4 (scalar) - observed_at = ingested_at (Profile A fallback)


4. Energy Bus

Definit în energy_bus.md. Bus pentru topologia electrică: producție, stocare, grid, load.

4.1 Topic Grammar

<site>/energy/<entity_type>/<entity_id>/<metric>/<stream>

Entity types: source, storage, grid, load, transfer

Exemple: - vad/energy/source/pv-roof-1/active_power/value - vad/energy/storage/battery-main/soc/value - vad/energy/grid/main-meter/import_power/value - vad/energy/load/living-room-tv/active_power/value

4.2 Metric Classes

Measurement-style (compatibile cu ingest_measurement() acum):

active_power, voltage, current, frequency, soc, charge_power, discharge_power

Counter-style (pe bus, dar NU prin measurement API — urmează ingest_counter()):

energy_total, import_energy_total, export_energy_total

4.3 Units Canonice

Metric Unit
power W
energy Wh sau kWh
voltage V
current A
frequency Hz
state of charge %

4.4 Historian Mapping

metric_name = <metric>
device_id   = <entity_type>.<entity_id>

Exemplu: - Topic: vad/energy/storage/battery-main/soc/value - metric_name = soc - device_id = storage.battery-main

4.5 Ownership Rule

  • Dacă valoarea e pentru control utilizator într-o cameră → home
  • Dacă valoarea e pentru contabilitate electrică → energy
  • Un device fizic poate proiecta pe ambele busuri (smart plug: power pe home, active_power pe energy)

5. Namespace Operațional (sys)

Definit în sys_bus.md. Nu este un bus semantic.

5.1 Topic Grammar

<site>/sys/<producer_kind>/<instance_id>/<stream>

Producer kinds v1: adapter, historian

5.2 Stream-uri Operaționale

Stream Scop Retain
availability liveness-ul componentei true + LWT
stats contori operaționali, snapshot periodic true
error erori structurate pentru operator false
dlq mesaje dead-letter cu context false

Exemple: - vad/sys/adapter/z2m-main/availabilityonline - vad/sys/historian/main/error{"code":"unknown_metric",...}


6. Adaptoare

Definit în addapters.md și adapter_implementation_examples.md.

6.1 Responsabilități

  1. Traducere topic-uri din format vendor în bus canonic
  2. Normalizare payload (JSON → scalar, conversie unități)
  3. Păstrare timestamp sursă (dacă există)
  4. Fan-out: 1 mesaj inbound → N publicațiuni canonice
  5. Proiecție multi-bus (smart plug → home + energy)
  6. Publicare meta retained înaintea traficului live
  7. Publicare availability retained + LWT
  8. Routing erori pe sys/.../error și sys/.../dlq

6.2 NON-Responsabilități

  • Nu implementează logica HomeKit
  • Nu implementează reguli de automatizare
  • Nu agregă senzori
  • Nu stochează date istorice
  • Nu repară mesaje malformed (le expune pe sys)

6.3 Publish Boundary

La publicare, mesajul conține DOAR:

{ topic: normalizedTopic, payload: normalizedValue }

Structurile interne de normalizare se șterg înainte de publish.

6.4 Z2M Topic Convention

Friendly name: <device_type>/<site>/<location>/<device_id>

Exemplu: - Z2M: zigbee2mqtt/ZG-204ZV/vad/balcon/south - Canonical: vad/home/balcon/illuminance/south/value

Permite traducere deterministă fără lookup tables.

6.5 Z2M Field Mapping

Z2M field Bus Canonical capability/metric
temperature home temperature
humidity home humidity
pressure home pressure
illuminance home illuminance
contact home contact
occupancy (PIR) home motion
presence (mmWave, fading_time=0) home motion
battery home battery
state (plug/switch) home power
action (remote) home button
power (smart plug) energy active_power
voltage (smart plug) energy voltage
current (smart plug) energy current
energy (smart plug) energy energy_total

6.6 Mapping Registry

Adapoarele folosesc configurație declarativă. Câmpuri recomandate:

source_system, source_topic_match, source_field, target_bus, target_location / target_entity_id, target_capability / target_metric, target_device_id, stream, payload_profile, unit, historian_enabled, historian_mode

6.7 Auto-Provisioning

Adapoarele derivă dimensiunile semantice din structura topic-ului Z2M. Override-uri se aplică doar pentru excepții. Default-uri: bus=home, location=unknown, stream=value.


7. Historian Worker

Definit în historian_worker.md.

7.1 Subscription Model

+/home/+/+/+/value       ← ingestie primară
+/energy/+/+/+/value      ← ingestie primară
+/home/+/+/+/meta         ← cache local
+/energy/+/+/+/meta       ← cache local
+/home/+/+/+/last         ← ignorat pentru ingestie
+/energy/+/+/+/last       ← ignorat pentru ingestie

7.2 Topic → Historian Mapping (sumar)

Bus metric_name device_id
home <capability> <location>.<device_id>
energy <metric> <entity_type>.<entity_id>

7.3 Payload Handling

Scalar (Profile A): - value = payload parsat ca number/boolean - observed_at = ingestion time - unit din meta cache

Envelope (Profile B): - value = payload.value - observed_at = payload.observed_at sau ingestion time - unit = payload.unit sau meta cache

7.4 Type Routing

Tip payload Overload DB
numeric ingest_measurement(..., value::double precision, ...)
boolean ingest_measurement(..., value::boolean, ...)
string enum SKIP (nu se ingestează fără encoding policy explicit)
counter cumulative SKIP measurement path; rutare pe counter pipeline

7.5 Ordering

  • Serializare scrieri per (metric_name, device_id) — obligatoriu
  • Paralelism acceptabil între device-uri/metrici diferite
  • Retry-urile păstrează ordinea pe stream

7.6 Error Handling

Erori permanente (log + DLQ, nu retry): - topic incompatibil cu grammar-ul busului - tip payload incompatibil - metric necunoscut - out-of-order measurement - valoare respinsă de policy

Erori tranziente (retry cu ordine): - PostgreSQL indisponibil - deconectare rețea - deconectare broker temporară

7.7 Meta Cache

  • Cheie: stem topic (fără ultimul segment stream)
  • Câmpuri cache: payload_profile, data_type, unit, historian.enabled, historian.mode, schema_ref, adapter_id, source, source_ref
  • Missing meta = mod degradat, nu stop

7.8 Deployment

Model recomandat inițial: un singur worker per site consumând home + energy.


8. Measurement Ingestion API

Definit în tdb_ingestion/mqtt_ingestion_api.md. Implementat și funcțional.

8.1 Semnătură

-- Numeric
SELECT * FROM telemetry.ingest_measurement(
    p_metric_name => $1,
    p_device_id   => $2,
    p_value       => $3::double precision,
    p_observed_at => $4::timestamptz
);

-- Boolean
SELECT * FROM telemetry.ingest_measurement(
    p_metric_name => $1,
    p_device_id   => $2,
    p_value       => $3::boolean,
    p_observed_at => $4::timestamptz
);

8.2 Comportament Automat

Gestionat de PostgreSQL, worker-ul nu reimplementează:

  • Auto-provisioning device (ensure_device())
  • Append-only enforcement (watermark per metric_name, device_id)
  • Policy lookup din telemetry.metric_policies
  • Gap detection la sosirea următorului measurement
  • Segment splitting (value change, NULL, policy change, gap)
  • Tail NULL simulation la query time

8.3 Response Actions

opened, opened_null, extended, extended_null, split, null_to_value, value_to_null, gap_split, gap_to_null

Worker-ul le loghează, nu branșează logica pe ele.

8.4 Idempotency

Limitare curentă: ingest_measurement() nu acceptă idempotency_key. Replay după network failure ambiguă poate produce eroare out-of-order. Worker-ul tratează aceasta ca duplicate operațional.

8.5 Advisory Locking

  • Un advisory lock per (metric_name, device_id) pe tranzacție
  • Implementat ca wait (nu NOWAIT)
  • Recomandare: writer affinity per (metric_name, device_id) via routing determinist

9. Counter Ingestion API

Definit în tdb_ingestion/counter_ingestion_api.md. Contractul este acum materializat în repo și reprezintă ținta de implementare pentru worker și backend.

9.1 Semnătură

SELECT * FROM telemetry.ingest_counter(
    p_metric_name     => $1,
    p_device_id       => $2,
    p_counter_value   => $3::numeric,
    p_observed_at     => $4::timestamptz,
    p_source_sequence => $5,  -- nullable
    p_idempotency_key => $6,  -- nullable
    p_snapshot_id     => $7   -- nullable
);

9.2 Reguli Core

  • Append-only, ordonat per (metric_name, device_id)
  • counter_value nu acceptă NULL; dacă sursa nu are valoare, observația se omite
  • Gap-uri derivate la query time din freshness, nu scrieri sintetice NULL
  • Scăderea valorii = boundary semantic (reset_boundary, rollover_boundary, invalid_drop)
  • Delta și rate nu traversează boundary-uri de reset/rollover
  • Worker-ul rutează aceste metrici pe ingest_counter(), nu pe ingest_measurement()

9.3 Stream Identity

stream = (metric_name, device_id)

Nu există stream_id suplimentar. Ordering, freshness, replay metadata — toate per (metric_name, device_id).

9.4 Reliability Levels

Nivel Cerințe Garanții
degraded / at-most-once fără source_sequence, fără idempotency_key retry nesigur
recommended / at-least-once source_sequence sau idempotency_key replay detectabil
stronger replay safety ambele deduplicare explicită

9.5 Reporting Modes

Mode Semantică
periodic cadență așteptată; lipsa update-ului = gap
on_change publicare la schimbare; tăcerea nu = delta 0
hybrid on_change + heartbeat periodic

9.6 Freshness Defaults

Reporting mode Input Default stale_after_s
periodic expected_interval_s expected_interval_s * 2
on_change heartbeat_interval_s heartbeat_interval_s * 2
hybrid heartbeat_interval_s heartbeat_interval_s * 2

stale_after_s explicit în policy overridează default-ul.


10. Decizii Luate

10.1 Closed for Phase 1

Aceste decizii sunt stabile și se poate implementa pe baza lor.

# Decizie Status
1 Measurement path: numeric + boolean prin ingest_measurement() Acceptat, implementat
2 Counter path: separat de measurement, prin ingest_counter() Contract stabilizat, neimplementat
3 last nu se ingestează ca time-series Închis
4 Ordering per (metric_name, device_id) obligatoriu Închis
5 Missing source timestamp → fallback la ingested_at Închis
6 Missing meta = mod degradat, nu hard stop Închis
7 String enum states: nu se ingestează fără encoding policy Închis
8 Counter stream_id = (metric_name, device_id) — fără ID suplimentar Închis
9 Counter: NULL never, gaps derivate la query time Închis
10 Counter: un model comun cu profile de domeniu, nu separare energy/traffic Închis
11 Permanent errors → DLQ + log, nu retry Închis
12 Transient errors → retry cu ordine păstrată Închis

10.2 Decizii Luate Acum (în acest document)

# Decizie Alegere Rațiune
D1 Snapshot pe bus vs poller→worker Snapshot rămâne pe boundary-ul poller→worker. Bus-ul semantic păstrează publicații per-stream. Busul semantic e optimizat pentru publicații lightweight per-topic. Snapshot-ul e un artefact de colectare, nu o semantică de bus. Dacă mai târziu se dovedește necesar, se poate evolua contractul explicit.
D2 Freshness defaults stale_after_s = expected_interval * 2 (conform cu draft-ul existent) Multiplicatorul 2x din draft e suficient de conservator. Override explicit per policy rămâne disponibil.
D3 Profile energy/traffic counter Amânat până la date reale. Nu avem network bus; profilele de traffic nu sunt relevante acum. Energy counter profile se definesc când implementăm ingest_counter().

11. Puncte Deschise (non-blocante)

Aceste puncte nu blochează Phase 1 dar vor necesita atenție:

# Punct Când devine relevant
O1 idempotency_key pe measurement API Când avem nevoie de replay sigur pe measurement path
O2 Encoding policy pentru string enum states Când vrem să persistăm HVAC modes sau alte enums
O3 Forma fizică a reset/rollover boundary în counter storage La implementarea ingest_counter()
O4 Batch SQL pentru counter ingestion Când volumul de counters justifică optimizarea
O5 Completitudine snapshot (cum se declară explicit) Când se adaugă surse bulk polling

12. Testing & Validation

12.1 Prima Matrice de Test

# Device Capabilities Bus Historian
1 Room sensor temperature, humidity, battery home Da (numeric)
2 Door sensor contact, battery home Da (boolean + numeric)
3 Smart plug power (control) home Nu (string enum)
3 Smart plug active_power, voltage, current energy Da (numeric)
3 Smart plug energy_total energy Nu (counter path)
4 Remote button / action home Nu (event, skip default)

12.2 Acceptance Criteria Phase 1

  1. Historian stochează samples home + energy (measurement) fără parsing vendor topics
  2. value topics ingestabile doar cu topic parsing + opțional meta cache
  3. Smart plug proiectat pe ambele busuri fără ambiguitate
  4. meta.historian.mode respectat (sample/state/event/ignore)
  5. Missing timestamps nu blochează ingestia
  6. Missing meta degradează, nu blochează
  7. Erori adapter pe sys, nu pe bus semantic
  8. Counter totals coexistă pe bus fără a fi confundate cu measurement path

13. Hartă de Navigare a Documentelor

Document Ce conține Când îl citești
mqtt_contract.md Contract partajat: payload, meta, time, quality, delivery Când construiești orice componentă care publică sau consumă de pe bus
home_bus.md Topic grammar, capabilities, historian mapping home Când lucrezi pe adaptoare home sau pe historian worker
energy_bus.md Topic grammar, entity types, metrics, historian mapping energy Când lucrezi pe surse de energie
sys_bus.md Namespace operațional, stream-uri sys Când adaugi observabilitate la adaptoare sau worker
addapters.md Responsabilități adapter, fan-out, multi-bus, Z2M convention Când construiești un adapter
adapter_implementation_examples.md Patterns Node-RED, cold-start, failure modes Când implementezi concret în Node-RED
historian_worker.md Subscription, mapping, payload handling, ordering, errors Când construiești historian worker-ul
tdb_ingestion/mqtt_ingestion_api.md API measurement implementat, error handling, ordering Când scrii cod de ingestie measurement
tdb_ingestion/counter_ingestion_api.md Contract counter stabilizat (neimplementat) Când implementezi counter ingestion

14. End-to-End Trace: De la Device la PostgreSQL

Exemplu 1: Temperatură cameră (measurement, Profile A)

[Z2M Device] zigbee2mqtt/SENSOR/vad/bedroom/bedroom-sensor
  payload: {"temperature": 23.4, "humidity": 41, "battery": 87}

[Adapter] fan-out → 3 publicațiuni:
  vad/home/bedroom/temperature/bedroom-sensor/value  → 23.4
  vad/home/bedroom/humidity/bedroom-sensor/value     → 41
  vad/home/bedroom/battery/bedroom-sensor/value      → 87

[Worker] topic parse:
  metric_name = "temperature"
  device_id   = "bedroom.bedroom-sensor"
  value       = 23.4
  observed_at = ingested_at (Profile A, no source timestamp)

[PostgreSQL]
  SELECT * FROM telemetry.ingest_measurement(
    'temperature', 'bedroom.bedroom-sensor',
    23.4::double precision, now()
  );
  → action: extended

Exemplu 2: Smart plug dual projection (measurement + counter)

[Z2M Device] zigbee2mqtt/PLUG/vad/living-room/tv-plug
  payload: {"state":"on", "power":126.8, "energy":4.72, "voltage":229.4, "current":0.58}

[Adapter] fan-out → 5 publicațiuni pe 2 busuri:
  vad/home/living-room/power/tv-plug/value                → "on"          (home, string enum, skip historian)
  vad/energy/load/living-room-tv/active_power/value       → 126.8         (energy, measurement)
  vad/energy/load/living-room-tv/voltage/value            → 229.4         (energy, measurement)
  vad/energy/load/living-room-tv/current/value            → 0.58          (energy, measurement)
  vad/energy/load/living-room-tv/energy_total/value       → 4.72          (energy, counter — skip measurement path)

[Worker]
  active_power → ingest_measurement('active_power', 'load.living-room-tv', 126.8, now())  ✓
  voltage      → ingest_measurement('voltage', 'load.living-room-tv', 229.4, now())       ✓
  current      → ingest_measurement('current', 'load.living-room-tv', 0.58, now())        ✓
  energy_total → skip measurement path, route to counter pipeline                         ⏳
  "on"         → skip (string enum, no encoding policy)                                   ⊘

Exemplu 3: Contact sensor (boolean measurement)

[Z2M Device] zigbee2mqtt/SNZB-04P/vad/entrance/front-door
  payload: {"contact": false, "battery": 94}

[Adapter] fan-out → 2 publicațiuni:
  vad/home/entrance/contact/front-door/value  → false
  vad/home/entrance/battery/front-door/value  → 94

[Worker]
  contact → ingest_measurement('contact', 'entrance.front-door', false, now())  ✓
  battery → ingest_measurement('battery', 'entrance.front-door', 94, now())     ✓