This document defines how a historian worker should write cumulative counter observations into the PostgreSQL telemetry historian.
It complements the measurement ingestion path and exists because cumulative counters do not have the same storage semantics as instantaneous measurements.
Examples of counters:
energy_totalimport_energy_totalexport_energy_totalrx_bytes_totaltx_packets_totalMeasurement-style numeric and boolean values continue to use mqtt_ingestion_api.md.
One common ingestion path is:
Canonical MQTT bus
-> historian worker
-> telemetry.ingest_counter(...)
-> counter historian tables
The worker is responsible for:
metric_name, device_id, counter_value, and observed_atThe database is responsible for:
The worker should treat PostgreSQL as the source of truth for counter boundary logic.
Cumulative counters are not the same as instantaneous measurements.
Key differences:
NULLcounter_value should never be written as NULLFor this reason, the worker MUST NOT send cumulative counters through telemetry.ingest_measurement(...) just because the payload is numeric.
The database does not parse MQTT topics.
The worker maps canonical topics to application identifiers before calling PostgreSQL.
Example 1:
topic: vad/energy/load/living-room-tv/energy_total/value
metric_name = "energy_total"
device_id = "load.living-room-tv"
counter_value = 4.72
observed_at = 2026-03-21T10:15:12Z
Example 2:
topic: vad/energy/grid/main-meter/import_energy_total/value
metric_name = "import_energy_total"
device_id = "grid.main-meter"
counter_value = 18654.31
observed_at = 2026-03-21T10:15:12Z
The same pattern applies to future domains such as network traffic counters.
Canonical signature:
SELECT *
FROM telemetry.ingest_counter(
p_metric_name => $1,
p_device_id => $2,
p_counter_value => $3::numeric,
p_observed_at => $4::timestamptz,
p_source_sequence => $5,
p_idempotency_key => $6,
p_snapshot_id => $7
);
Parameter semantics:
p_metric_name: counter metric identifier registered in the databasep_device_id: logical device identifier chosen by the workerp_counter_value: cumulative counter observationp_observed_at: time the source observed the counter valuep_source_sequence: optional monotonic source-side sequence or offsetp_idempotency_key: optional replay-safe deduplication keyp_snapshot_id: optional identifier for a source snapshot or polling batchRules:
p_counter_value MUST NOT be NULLp_observed_at MUST be presentp_source_sequence when the source provides a reliable monotonic sequencep_idempotency_key when replay ambiguity existsNULL if the source does not provide themCounter stream identity is:
stream = (metric_name, device_id)
Core rules:
observed_at order per streamcounter_value MUST be non-nullNULL writescounter_value create semantic boundariesThe worker should assume the database owns the interpretation of those boundaries through metric policy.
Three reliability levels are supported conceptually.
Inputs:
source_sequenceidempotency_keyImplication:
Inputs:
source_sequence or idempotency_keyImplication:
Inputs:
source_sequence and idempotency_keyImplication:
The database contract should remain usable at all three levels, but higher reliability depends on richer source metadata.
Counter policies should support three reporting modes:
periodicon_changehybridSemantics:
periodic: updates are expected on a cadence; silence eventually means staleon_change: updates happen only on change; silence does not imply delta 0hybrid: updates happen on change plus periodic heartbeatRecommended freshness defaults:
| Reporting mode | Input | Default stale_after_s |
|---|---|---|
periodic |
expected_interval_s |
expected_interval_s * 2 |
on_change |
heartbeat_interval_s |
heartbeat_interval_s * 2 |
hybrid |
heartbeat_interval_s |
heartbeat_interval_s * 2 |
If stale_after_s is explicitly defined by policy, it overrides these defaults.
Freshness is evaluated at query time, not by inserting synthetic unknown rows.
Each call should return one row that is useful for logging and debugging.
Recommended fields:
metric_namedevice_idnormalized_counter_valueactionboundary_kindRecommended action values:
opened: first observation for an open streamextended: normal append to an existing streamduplicate_ignored: replay duplicate recognized from reliability metadataboundary_split: a reset or rollover boundary was classified and a new segment startedRecommended boundary_kind values:
nonereset_boundaryrollover_boundaryinvalid_dropWorkers usually should not branch on these values, but they SHOULD log them.
The historian worker should route counter samples based on canonical metric semantics, not vendor heuristics.
Rules:
telemetry.ingest_counter(...)telemetry.ingest_measurement(...)last MUST NOT be used as a normal counter ingestion sourceExamples of metrics that belong here:
energy_totalimport_energy_totalexport_energy_totalrx_bytes_totaltx_packets_totalExamples of metrics that do not belong here:
active_powervoltagecurrenttemperaturesocPermanent message errors include:
NULL counter valueTransient failures include:
Rules:
(metric_name, device_id)Recommended implementation order:
telemetry.ingest_counter(...)Recommended first live metrics:
energy_totalimport_energy_totalexport_energy_totalThese are the most concrete counter metrics already present in the current semantic bus design.
The semantic bus remains the canonical transport.
Counter metrics:
.../valueThis preserves one semantic model while allowing the historian to use a storage API that matches counter behavior.