telemetrydatabase / ingestion_docs / counter_ingestion_api.md
Newer Older
335 lines | 8.956kb
Bogdan Timofte authored 2 weeks ago
1
# Counter Ingestion API
2

            
3
This document defines how a historian worker should write cumulative counter observations into the PostgreSQL telemetry historian.
4

            
5
It complements the measurement ingestion path and exists because cumulative counters do not have the same storage semantics as instantaneous measurements.
6

            
7
Examples of counters:
8

            
9
- `energy_total`
10
- `import_energy_total`
11
- `export_energy_total`
12
- `rx_bytes_total`
13
- `tx_packets_total`
14

            
15
Measurement-style numeric and boolean values continue to use `mqtt_ingestion_api.md`.
16

            
17
---
18

            
19
## 1. System Overview
20

            
21
One common ingestion path is:
22

            
23
```text
24
Canonical MQTT bus
25
    -> historian worker
26
    -> telemetry.ingest_counter(...)
27
    -> counter historian tables
28
```
29

            
30
The worker is responsible for:
31

            
32
- subscribing to canonical counter-bearing bus topics
33
- mapping each message to `metric_name`, `device_id`, `counter_value`, and `observed_at`
34
- preserving ordering per counter stream
35
- passing replay metadata when available
36
- logging result actions and failures
37

            
38
The database is responsible for:
39

            
40
- append-only ordering enforcement
41
- duplicate detection where replay metadata allows it
42
- reset and rollover boundary classification
43
- policy lookup
44
- query-time freshness and gap interpretation
45

            
46
The worker should treat PostgreSQL as the source of truth for counter boundary logic.
47

            
48
---
49

            
50
## 2. Why Counter Data Uses a Separate API
51

            
52
Cumulative counters are not the same as instantaneous measurements.
53

            
54
Key differences:
55

            
56
- the value represents an ever-growing total, not a sampled state
57
- a drop in value is usually meaningful and may indicate reset or rollover
58
- gaps should be derived at query time from freshness policy, not stored as synthetic `NULL`
59
- `counter_value` should never be written as `NULL`
60

            
61
For this reason, the worker MUST NOT send cumulative counters through `telemetry.ingest_measurement(...)` just because the payload is numeric.
62

            
63
---
64

            
65
## 3. Example MQTT Mapping
66

            
67
The database does not parse MQTT topics.
68

            
69
The worker maps canonical topics to application identifiers before calling PostgreSQL.
70

            
71
Example 1:
72

            
73
```text
74
topic: vad/energy/load/living-room-tv/energy_total/value
75

            
76
metric_name   = "energy_total"
77
device_id     = "load.living-room-tv"
78
counter_value = 4.72
79
observed_at   = 2026-03-21T10:15:12Z
80
```
81

            
82
Example 2:
83

            
84
```text
85
topic: vad/energy/grid/main-meter/import_energy_total/value
86

            
87
metric_name   = "import_energy_total"
88
device_id     = "grid.main-meter"
89
counter_value = 18654.31
90
observed_at   = 2026-03-21T10:15:12Z
91
```
92

            
93
The same pattern applies to future domains such as network traffic counters.
94

            
95
---
96

            
97
## 4. Database Ingestion API
98

            
99
Canonical signature:
100

            
101
```sql
102
SELECT *
103
FROM telemetry.ingest_counter(
104
    p_metric_name     => $1,
105
    p_device_id       => $2,
106
    p_counter_value   => $3::numeric,
107
    p_observed_at     => $4::timestamptz,
108
    p_source_sequence => $5,
109
    p_idempotency_key => $6,
110
    p_snapshot_id     => $7
111
);
112
```
113

            
114
Parameter semantics:
115

            
116
- `p_metric_name`: counter metric identifier registered in the database
117
- `p_device_id`: logical device identifier chosen by the worker
118
- `p_counter_value`: cumulative counter observation
119
- `p_observed_at`: time the source observed the counter value
120
- `p_source_sequence`: optional monotonic source-side sequence or offset
121
- `p_idempotency_key`: optional replay-safe deduplication key
122
- `p_snapshot_id`: optional identifier for a source snapshot or polling batch
123

            
124
Rules:
125

            
126
- `p_counter_value` MUST NOT be `NULL`
127
- `p_observed_at` MUST be present
128
- workers SHOULD pass `p_source_sequence` when the source provides a reliable monotonic sequence
129
- workers SHOULD pass `p_idempotency_key` when replay ambiguity exists
130
- workers MAY leave optional replay fields `NULL` if the source does not provide them
131

            
132
---
133

            
134
## 5. Core Semantics
135

            
136
Counter stream identity is:
137

            
138
```text
139
stream = (metric_name, device_id)
140
```
141

            
142
Core rules:
143

            
144
- writes are append-only per stream
145
- observations MUST arrive in strictly increasing `observed_at` order per stream
146
- `counter_value` MUST be non-null
147
- silence does not produce synthetic `NULL` writes
148
- decreases in `counter_value` create semantic boundaries
149
- deltas and rates MUST NOT cross reset or rollover boundaries
150

            
151
The worker should assume the database owns the interpretation of those boundaries through metric policy.
152

            
153
---
154

            
155
## 6. Reliability Metadata
156

            
157
Three reliability levels are supported conceptually.
158

            
159
### Degraded / At-Most-Once
160

            
161
Inputs:
162

            
163
- no `source_sequence`
164
- no `idempotency_key`
165

            
166
Implication:
167

            
168
- retries after ambiguous failures are unsafe
169

            
170
### Recommended / At-Least-Once
171

            
172
Inputs:
173

            
174
- `source_sequence` or `idempotency_key`
175

            
176
Implication:
177

            
178
- replay is detectable
179

            
180
### Stronger Replay Safety
181

            
182
Inputs:
183

            
184
- both `source_sequence` and `idempotency_key`
185

            
186
Implication:
187

            
188
- duplicate detection is explicit and robust
189

            
190
The database contract should remain usable at all three levels, but higher reliability depends on richer source metadata.
191

            
192
---
193

            
194
## 7. Reporting Modes and Freshness
195

            
196
Counter policies should support three reporting modes:
197

            
198
- `periodic`
199
- `on_change`
200
- `hybrid`
201

            
202
Semantics:
203

            
204
- `periodic`: updates are expected on a cadence; silence eventually means stale
205
- `on_change`: updates happen only on change; silence does not imply delta `0`
206
- `hybrid`: updates happen on change plus periodic heartbeat
207

            
208
Recommended freshness defaults:
209

            
210
| Reporting mode | Input | Default `stale_after_s` |
211
|---|---|---|
212
| `periodic` | `expected_interval_s` | `expected_interval_s * 2` |
213
| `on_change` | `heartbeat_interval_s` | `heartbeat_interval_s * 2` |
214
| `hybrid` | `heartbeat_interval_s` | `heartbeat_interval_s * 2` |
215

            
216
If `stale_after_s` is explicitly defined by policy, it overrides these defaults.
217

            
218
Freshness is evaluated at query time, not by inserting synthetic unknown rows.
219

            
220
---
221

            
222
## 8. Ingestion Response
223

            
224
Each call should return one row that is useful for logging and debugging.
225

            
226
Recommended fields:
227

            
228
- `metric_name`
229
- `device_id`
230
- `normalized_counter_value`
231
- `action`
232
- `boundary_kind`
233

            
234
Recommended `action` values:
235

            
236
- `opened`: first observation for an open stream
237
- `extended`: normal append to an existing stream
238
- `duplicate_ignored`: replay duplicate recognized from reliability metadata
239
- `boundary_split`: a reset or rollover boundary was classified and a new segment started
240

            
241
Recommended `boundary_kind` values:
242

            
243
- `none`
244
- `reset_boundary`
245
- `rollover_boundary`
246
- `invalid_drop`
247

            
248
Workers usually should not branch on these values, but they SHOULD log them.
249

            
250
---
251

            
252
## 9. Worker Routing Rules
253

            
254
The historian worker should route counter samples based on canonical metric semantics, not vendor heuristics.
255

            
256
Rules:
257

            
258
- bus metrics explicitly classified as cumulative counters MUST go to `telemetry.ingest_counter(...)`
259
- measurement-style metrics MUST continue to go to `telemetry.ingest_measurement(...)`
260
- retained `last` MUST NOT be used as a normal counter ingestion source
261
- if a counter metric is encountered and the counter path is unavailable, the worker SHOULD skip it and expose the skip operationally
262

            
263
Examples of metrics that belong here:
264

            
265
- `energy_total`
266
- `import_energy_total`
267
- `export_energy_total`
268
- `rx_bytes_total`
269
- `tx_packets_total`
270

            
271
Examples of metrics that do not belong here:
272

            
273
- `active_power`
274
- `voltage`
275
- `current`
276
- `temperature`
277
- `soc`
278

            
279
---
280

            
281
## 10. Error Handling
282

            
283
Permanent message errors include:
284

            
285
- unknown counter metric
286
- out-of-order counter observation
287
- `NULL` counter value
288
- negative counter value where policy disallows it
289
- replay metadata conflict that proves the message is invalid
290

            
291
Transient failures include:
292

            
293
- PostgreSQL unavailable
294
- network interruption between worker and database
295
- temporary worker restart during streaming
296

            
297
Rules:
298

            
299
- permanent message errors SHOULD be logged and dead-lettered
300
- transient failures SHOULD be retried
301
- retries MUST preserve ordering per `(metric_name, device_id)`
302
- workers SHOULD treat ambiguous retry success followed by duplicate detection as an operational duplicate, not as corruption
303

            
304
---
305

            
306
## 11. Implementation Guidance
307

            
308
Recommended implementation order:
309

            
310
1. worker-side routing from semantic bus metrics to counter pipeline
311
2. SQL wrapper for `telemetry.ingest_counter(...)`
312
3. operational metrics for skipped, ingested, duplicated, and boundary-split counter samples
313
4. query-side validation for freshness, reset handling, and delta/rate calculations
314

            
315
Recommended first live metrics:
316

            
317
- `energy_total`
318
- `import_energy_total`
319
- `export_energy_total`
320

            
321
These are the most concrete counter metrics already present in the current semantic bus design.
322

            
323
---
324

            
325
## 12. Relationship with the Semantic Bus
326

            
327
The semantic bus remains the canonical transport.
328

            
329
Counter metrics:
330

            
331
- continue to use normal semantic bus topics such as `.../value`
332
- remain visible to any consumer that understands cumulative semantics
333
- do not require a separate bus just because the historian storage path differs
334

            
335
This preserves one semantic model while allowing the historian to use a storage API that matches counter behavior.