telemetrydatabase / ingestion_docs / mqtt_ingestion_api.md
Newer Older
428 lines | 15.276kb
Bogdan Timofte authored 2 weeks ago
1
# Telemetry Ingestion API
2

            
3
This document describes how an ingestion worker should write measurements into the PostgreSQL telemetry historian.
4

            
5
It documents the database API as implemented today. It does not redefine the schema or the historian model.
6

            
7
This document covers only the currently implemented measurement API. Counter metrics are tracked separately in [counter_ingestion_api.md](./counter_ingestion_api.md) and [energy_counter_draft.md](./energy_counter_draft.md).
8

            
9
MQTT is used throughout this document only as an example source transport because it is a common worker implementation target. The PostgreSQL API itself is source-agnostic: the same ingestion flow applies to any worker that can map incoming data to `metric_name`, `device_id`, `value`, and `observed_at`.
10

            
11
## 1. System Overview
12

            
13
One common ingestion path is:
14

            
15
```text
16
Source devices / upstream bus
17
    -> ingestion worker
18
    -> telemetry.ingest_measurement(...)
19
    -> metric segment tables
20
```
21

            
22
More generally, the source worker is responsible for:
23

            
24
- receiving source messages from MQTT, HTTP, files, queues, serial links, or another upstream transport
25
- parsing the source envelope and payload
26
- mapping each message to `metric_name`, `device_id`, `value`, and `observed_at`
27
- calling the correct PostgreSQL ingestion function
28
- logging success or failure
29

            
30
The database is responsible for historian behavior:
31

            
32
- append-only ingestion
33
- per-device/per-metric serialization via advisory locks
34
- deduplication by epsilon or exact comparison
35
- segment storage instead of raw sample storage
36
- lazy gap detection
37
- explicit unknown intervals via `NULL`
38
- policy versioning via `telemetry.metric_policies`
39
- automatic segment splitting when policy boundaries are crossed
40

            
41
For non-MQTT transports, replace topic subscription/parsing with the equivalent source-specific decode step. The database contract stays the same.
42

            
43
Workers should treat PostgreSQL as the source of truth for consolidation logic. They should send measurements in order and let the database decide whether a segment is extended, split, or closed.
44

            
45
## 2. Example MQTT Topic Model
46

            
47
If the source transport is MQTT, a common topic shape is:
48

            
49
```text
50
/$bus/$metric/$domain/$sensor
51
```
52

            
53
Examples:
54

            
55
```text
56
/homebus/temperature/bedroom/sensor1
57
/homebus/humidity/bedroom/sensor1
58
/homebus/rssi/network/sensor1
59
```
60

            
61
Example worker mapping:
62

            
63
```text
64
topic: /homebus/temperature/bedroom/sensor1
65

            
66
metric_name = "temperature"
67
device_id   = "bedroom.sensor1"
68
```
69

            
70
Another example:
71

            
72
```text
73
topic: /homebus/rssi/network/sensor1
74

            
75
metric_name = "rssi"
76
device_id   = "network.sensor1"
77
```
78

            
79
Important:
80

            
81
- topic-to-metric mapping is implemented in the ingestion worker
82
- topic-to-device mapping is implemented in the ingestion worker
83
- the database does not parse MQTT topics
84
- the database expects already-mapped application identifiers
85

            
86
If your deployment is not using MQTT, apply the same idea to whatever source envelope you receive. The database only needs the final mapped identifiers.
87

            
88
If your deployment needs aliases, normalization, or routing rules, implement them in the worker before calling PostgreSQL.
89

            
90
## 3. Database Ingestion API
91

            
92
Two typed overloads exist.
93

            
94
Numeric metrics:
95

            
96
```sql
97
SELECT *
98
FROM telemetry.ingest_measurement(
99
    p_metric_name  => $1,
100
    p_device_id    => $2,
101
    p_value        => $3::double precision,
102
    p_observed_at  => $4::timestamptz
103
);
104
```
105

            
106
Boolean metrics:
107

            
108
```sql
109
SELECT *
110
FROM telemetry.ingest_measurement(
111
    p_metric_name  => $1,
112
    p_device_id    => $2,
113
    p_value        => $3::boolean,
114
    p_observed_at  => $4::timestamptz
115
);
116
```
117

            
118
Parameter semantics:
119

            
120
- `p_metric_name`: application metric identifier registered in `telemetry.metrics`
121
- `p_device_id`: logical device identifier chosen by the worker
122
- `p_value`: typed measurement value, or `NULL` for “measurement unavailable”
123
- `p_observed_at`: time the device observed the measurement
124

            
125
Important implementation note:
126

            
127
- because `ingest_measurement` is overloaded, `NULL` values must be typed explicitly
128
- use `NULL::double precision` for numeric unknown values
129
- use `NULL::boolean` for boolean unknown values
130

            
131
Example:
132

            
133
```sql
134
SELECT *
135
FROM telemetry.ingest_measurement(
136
    'temperature',
137
    'bedroom.sensor1',
138
    NULL::double precision,
139
    '2026-03-08T10:00:00Z'::timestamptz
140
);
141
```
142

            
143
## 4. Ingestion Response
144

            
145
Each call returns one row:
146

            
147
- `metric_name`
148
- `device_id`
149
- `table_name`
150
- `normalized_value`
151
- `action`
152

            
153
`normalized_value` is the value actually used by the historian after normalization:
154

            
155
- numeric metrics: rounded according to metric policy
156
- boolean metrics: identical to input
157
- `NULL`: explicit unknown value
158

            
159
`action` is returned by the internal segment engine and is useful for logs, metrics, and debugging.
160

            
161
Actions:
162

            
163
- `opened`: no open segment existed; a new non-`NULL` segment was created
164
- `opened_null`: no open segment existed; a new `NULL` segment was created
165
- `extended`: the current non-`NULL` segment was kept open and `samples_count` increased
166
- `extended_null`: the current `NULL` segment was kept open and `samples_count` increased
167
- `split`: the previous segment was closed and a new segment was opened
168
- `null_to_value`: a `NULL` segment was closed and replaced by a value segment
169
- `value_to_null`: a value segment was closed and replaced by a `NULL` segment
170
- `gap_split`: a late gap was detected; the previous value segment was closed at `last_observed_at + max_sampling_interval`, a synthetic `NULL` gap segment was inserted, then a new value segment was opened
171
- `gap_to_null`: a late gap was detected and the next state is unknown; the previous value segment was closed at `last_observed_at + max_sampling_interval`, then a new open `NULL` segment was opened
172

            
173
Workers usually do not branch on `action`. Treat it as observability data.
174

            
175
Recommended worker logging:
176

            
177
- metric name
178
- device id
179
- observed timestamp
180
- original payload value
181
- returned `normalized_value`
182
- returned `action`
183

            
184
## 5. Automatic Behaviors Implemented by the Database
185

            
186
The worker should not re-implement historian logic already handled in PostgreSQL.
187

            
188
Automatic device provisioning:
189

            
190
- ingestion calls `ensure_device()`
191
- if `device_id` does not exist in `telemetry.devices`, it is created automatically
192

            
193
Append-only enforcement:
194

            
195
- ingestion calls `assert_append_only()`
196
- the database rejects measurements where `observed_at <= last_observed_at` for the same `(metric_name, device_id)`
197

            
198
Policy lookup:
199

            
200
- ingestion calls `require_metric_policy()`
201
- the active policy is selected from `telemetry.metric_policies` using `valid_from <= observed_at`
202

            
203
Gap detection:
204

            
205
- the database detects gaps lazily when the next measurement arrives
206
- workers must not emit timeout markers on their own
207

            
208
Segment splitting:
209

            
210
- value changes, explicit `NULL`, policy changes, and gap boundaries are converted into segment transitions automatically
211

            
212
Policy boundary splitting:
213

            
214
- if a metric policy changes while a segment is still open, the database splits the open segment at `policy.valid_from`
215
- workers do not need to know segment internals or policy transitions
216

            
217
Tail `NULL` simulation:
218

            
219
- if a stream stops, the database does not persist a trailing `NULL` segment immediately
220
- query functions simulate unknown tail values after `last_observed_at + max_sampling_interval`
221

            
222
## 6. Error Handling
223

            
224
Workers should treat database errors as either permanent message errors or transient infrastructure errors, regardless of whether the source transport is MQTT, HTTP, serial, or another upstream feed.
225

            
226
Common permanent errors:
227

            
228
- unknown metric
229
  - example: `ERROR: unknown metric: temperature`
230
  - cause: `metric_name` is not registered
231
- out-of-order measurement
232
  - example: `ERROR: out-of-order measurement for metric ...`
233
  - cause: message timestamp is older than or equal to the watermark for that metric/device
234
- invalid numeric value
235
  - examples:
236
    - `ERROR: value ... is below min_value ...`
237
    - `ERROR: value ... is above max_value ...`
238
  - cause: payload violates policy bounds
239
- metric type mismatch
240
  - examples:
241
    - `ERROR: metric ... is boolean; use the boolean overload of ingest_measurement(...)`
242
    - `ERROR: metric ... is numeric; use the numeric overload of ingest_measurement(...)`
243
- disallowed explicit `NULL`
244
  - example: `ERROR: metric table ... does not allow explicit NULL measurements`
245
  - cause: worker sent unknown value for a metric with `allow_null = false`
246

            
247
Recommended behavior for permanent message errors:
248

            
249
- log the full error together with the source metadata and payload
250
- if the source is MQTT, include topic and payload
251
- write the message to a dead-letter path or equivalent audited error sink before discarding it
252
- discard the message from the hot path after it has been captured for audit
253
- do not retry it
254
- increment an application metric or dead-letter counter
255

            
256
Recommended dead-letter payload:
257

            
258
- error type
259
- original source metadata such as topic, route, file offset, or queue message id
260
- original payload or a safe reference to it
261
- database error message
262
- worker instance id
263
- timestamp
264
- attempt count if retries already happened
265

            
266
Recommended behavior for transient failures:
267

            
268
- retry only for infrastructure failures such as connection loss, failover, or temporary database unavailability
269
- preserve per-device/per-metric ordering when retrying
270
- do not reorder buffered measurements while retrying
271

            
272
If a retry happens after an ambiguous network failure, the worker may see an out-of-order error if the first attempt actually committed. Handle that case as an operational duplicate and log it accordingly.
273

            
274
## 7. Idempotency and Ordering
275

            
276
The ingestion API assumes chronological ordering per `(metric_name, device_id)`.
277

            
278
This is enforced by:
279

            
280
- `telemetry.metric_device_watermarks`
281

            
282
Consequences for workers:
283

            
284
- do not send older measurements after newer ones for the same metric/device
285
- if a device buffers multiple samples, flush them in timestamp order
286
- if the source can redeliver messages, preserve original order
287
- do not use broker receive time to reorder a device stream
288

            
289
This API is append-only, not last-write-wins.
290

            
291
Same-timestamp replay is not accepted:
292

            
293
- `observed_at = last_observed_at` is rejected
294
- `observed_at < last_observed_at` is rejected
295

            
296
Important limitation of the current measurement API:
297

            
298
- `ingest_measurement(...)` does not currently accept an explicit `idempotency_key`
299
- after an ambiguous network failure, a safe replay may surface as an out-of-order duplicate
300
- workers should treat this as a limitation of the current measurement API, not as a general design recommendation for future counter ingestion
301

            
302
## 8. Time Semantics
303

            
304
`observed_at` is the time the device measured the value.
305

            
306
It is not:
307

            
308
- source receive time
309
- worker processing time
310
- database insert time
311

            
312
Preferred worker behavior:
313

            
314
- use the timestamp provided by the device payload if available
315
- preserve the device timestamp exactly
316
- only fall back to worker receive time when the upstream source does not provide an observation timestamp
317

            
318
Historian correctness depends on `observed_at` being as close as possible to the real device observation time.
319

            
320
## 9. Example Worker Flow
321

            
322
Example flow using MQTT as one source transport:
323

            
324
1. MQTT message is received on `/homebus/temperature/bedroom/sensor1`
325
2. worker parses the topic
326
3. worker maps the topic to:
327
   - `metric_name = 'temperature'`
328
   - `device_id = 'bedroom.sensor1'`
329
4. worker parses payload value and device timestamp
330
5. worker calls `telemetry.ingest_measurement(...)`
331
6. worker logs `action` and `normalized_value`
332

            
333
Example numeric SQL call:
334

            
335
```sql
336
SELECT *
337
FROM telemetry.ingest_measurement(
338
    'temperature',
339
    'bedroom.sensor1',
340
    22.97::double precision,
341
    '2026-03-08T10:15:12Z'::timestamptz
342
);
343
```
344

            
345
Example boolean SQL call:
346

            
347
```sql
348
SELECT *
349
FROM telemetry.ingest_measurement(
350
    'motion_detected',
351
    'hallway.sensor1',
352
    true,
353
    '2026-03-08T10:15:12Z'::timestamptz
354
);
355
```
356

            
357
Example pseudo-code:
358

            
359
```text
360
on_message(topic, payload):
361
    metric_name = map_topic_to_metric(topic)
362
    device_id = map_topic_to_device(topic)
363
    observed_at = extract_device_timestamp(payload)
364
    value = parse_payload_value(payload, metric_name)
365

            
366
    row = db.query_one(
367
        "SELECT * FROM telemetry.ingest_measurement($1, $2, $3::double precision, $4)",
368
        [metric_name, device_id, value, observed_at]
369
    )
370

            
371
    log.info(
372
        "ingested",
373
        metric_name=row.metric_name,
374
        device_id=row.device_id,
375
        action=row.action,
376
        normalized_value=row.normalized_value
377
    )
378
```
379

            
380
The worker is responsible for choosing the correct numeric or boolean call path before executing SQL. The same database call pattern applies to any worker after it maps its source message format into the historian parameters.
381

            
382
## 10. Performance Expectations
383

            
384
This API is designed for high ingestion rates without requiring the worker to manage historian state.
385

            
386
Typical work performed per measurement:
387

            
388
- one advisory lock for `(metric_name, device_id)`
389
- metric and policy resolution inside PostgreSQL
390
- one segment update or one segment insert on the target metric table
391
- watermark and device `last_seen` bookkeeping
392

            
393
In the common case where a value is unchanged, ingestion usually updates the current open segment rather than inserting a new raw sample row.
394

            
395
Locking notes:
396

            
397
- serialization is enforced by a PostgreSQL advisory transaction lock on `(metric_name, device_id)`
398
- the current implementation waits for the advisory lock inside the transaction; it is not a fail-fast `NOWAIT` contract
399
- workers should therefore avoid sending the same `(metric_name, device_id)` concurrently from multiple writers
400
- the preferred operational pattern is writer affinity per `(metric_name, device_id)`, for example via deterministic routing or consistent hashing in the worker layer
401
- if lock waits become visible in production, treat that as a signal that worker concurrency or routing needs review rather than relying on database-side contention as normal flow
402

            
403
Batching is optional:
404

            
405
- a worker may issue one SQL call per source message
406
- batching can reduce client/network overhead
407
- batching is not required for correctness
408

            
409
Practical batching guidance:
410

            
411
- batching is most useful when it reduces network round-trips without mixing unrelated retry domains
412
- keep ordering intact per `(metric_name, device_id)` inside the batch
413
- if a deployment expects large polling waves, modest transactional batches are preferable to one giant transaction
414
- if policy changes are rolled out, stagger them when possible rather than changing many hot metrics at exactly the same moment
415

            
416
The most important worker-side performance requirement is preserving ordering while keeping the database connection pool healthy.
417

            
418
## Implementation Checklist
419

            
420
- map the source message into `metric_name` and `device_id`
421
- parse payload into typed numeric or boolean value
422
- preserve device observation timestamps
423
- call the correct `ingest_measurement` overload
424
- cast `NULL` explicitly when sending unknown values
425
- log returned `action` and `normalized_value`
426
- discard permanent validation errors
427
- retry only transient database failures
428
- preserve order per `(metric_name, device_id)` during retries and buffering