Showing 5 changed files with 3089 additions and 0 deletions
+1 -0
.gitignore
@@ -0,0 +1 @@
1
+.DS_Store
+113 -0
README.md
@@ -0,0 +1,113 @@
1
+# Telemetry Measurements Schema
2
+
3
+The first production schema is now a single canonical file: `schema/telemetry_schema.sql`.
4
+
5
+Legacy compatibility migrations were removed because this database has no prior production deployments. The schema file represents the current design directly and runs on an empty PostgreSQL database.
6
+
7
+Source worker ingestion is documented in `docs/mqtt_ingestion_api.md`, with MQTT used there as a concrete example.
8
+
9
+A draft note for future counter-style telemetry, including energy and traffic counters, is in `docs/energy_counter_draft.md`.
10
+
11
+## Design note
12
+
13
+### Generic segment tables
14
+
15
+Segment tables are now created through:
16
+
17
+- `telemetry.create_segment_table(p_table_name, p_value_type)`
18
+
19
+Supported value types are:
20
+
21
+- `double precision`
22
+- `boolean`
23
+- `smallint`
24
+
25
+The generated tables keep the existing historian shape and protections:
26
+
27
+- one open segment per device
28
+- internal `device_pk` foreign key for future numeric-key joins while keeping `device_id` for compatibility
29
+- `policy_id` foreign key to `telemetry.metric_policies(policy_id)` so every stored interval remembers which ingestion policy produced it
30
+- foreign key to `telemetry.devices(device_id)` with `ON UPDATE/DELETE CASCADE`
31
+- non-overlapping `tstzrange` periods enforced with `EXCLUDE USING gist`
32
+- covering index on `(device_id, start_time DESC)` for point/range lookups
33
+- additional GiST index on `(device_id, segment_period)` for range sampling/overlap queries
34
+- storage tuning via `fillfactor` and aggressive autovacuum/analyze thresholds
35
+
36
+### Generic ingestion engine
37
+
38
+Runtime ingestion now flows through a single function:
39
+
40
+- `telemetry.ingest_segment(...)`
41
+
42
+`telemetry.metrics.table_name` is now stored as plain `text`, not `regclass`. Runtime code normalizes it, validates that it resolves under the `telemetry` schema, and only then interpolates the quoted identifier into dynamic SQL.
43
+
44
+Dynamic SQL is used only to parameterize the target table name. All values stay bound through `USING`, so the only dynamic part of the hot path is table access.
45
+
46
+The generic engine preserves the existing historian semantics:
47
+
48
+- append-only watermark enforcement
49
+- advisory locking per `(metric, device)`
50
+- automatic device provisioning into `telemetry.devices`
51
+- `last_seen` updates on successful ingestion
52
+- epsilon/exact consolidation depending on policy
53
+- lazy gap detection
54
+- explicit `NULL` segments
55
+- query-time synthetic `NULL` tails
56
+
57
+### Device registry
58
+
59
+`telemetry.devices` is the canonical device catalog. Ingestion still accepts unseen devices without a separate provisioning step, but it inserts them into the registry before segment writes so foreign keys remain valid.
60
+
61
+Deleting or renaming a device is now referentially safe because segment tables and watermark rows cascade from the device registry.
62
+
63
+`telemetry.devices` also has an internal `device_pk` surrogate primary key. Segment tables keep `device_id` for API compatibility, but now store `device_pk` as the internal foreign key.
64
+
65
+### Metric registry
66
+
67
+`telemetry.metrics` keeps `metric_name` as the external API identifier, but now uses `metric_pk` as the internal primary key. Text identifiers remain unique and stable for backward compatibility.
68
+
69
+### Metric policy
70
+
71
+`telemetry.metrics` now carries both:
72
+
73
+- `metric_type`
74
+- `comparison_mode`
75
+
76
+`metric_type` and `comparison_mode` are PostgreSQL enums, so invalid values fail with type-level errors and show up clearly in schema introspection.
77
+
78
+Current combinations are:
79
+
80
+- numeric -> `comparison_mode = 'epsilon'`
81
+- boolean -> `comparison_mode = 'exact'`
82
+
83
+Runtime ingestion policy is now versioned in `telemetry.metric_policies`. The active row is selected by `(metric_name, valid_from <= observed_at)` with the greatest `valid_from`, and new segments store that `policy_id`. Existing metrics are seeded with an initial `valid_from = '-infinity'` policy row so historical segments stay compatible.
84
+
85
+This keeps historical semantics stable when policy parameters such as `epsilon`, `rounding_precision`, `allow_null`, or `max_sampling_interval` change over time. Query wrappers do not need to re-resolve policy history; they use the `policy_id` already attached to each segment when they need policy-specific behavior such as open-segment tail cutoff.
86
+
87
+New policies are added through `telemetry.add_metric_policy(...)`. `telemetry.register_numeric_metric(...)` and `telemetry.register_boolean_metric(...)` still provision the metric and ensure the initial `-infinity` policy exists.
88
+
89
+If a policy changes while a segment is still open, ingestion now splits that segment at `policy.valid_from` before handling the incoming sample. The synthetic continuation segment starts exactly at the boundary with `samples_count = 0`, so no interval can silently extend past the policy that generated it.
90
+
91
+The schema also allows a future `state` metric type backed by `smallint` with `comparison_mode = 'exact'`, without changing the segment engine again.
92
+
93
+### Query API
94
+
95
+The internal table readers are generic:
96
+
97
+- `telemetry.value_at_from_table(...)`
98
+- `telemetry.segments_between_from_table(...)`
99
+- `telemetry.samples_from_table(...)`
100
+
101
+Public wrappers stay typed because PostgreSQL function return types are static:
102
+
103
+- numeric: `telemetry.value_at(...)`, `telemetry.metric_segments(...)`, `telemetry.sample_metric(...)`
104
+- boolean: `telemetry.boolean_value_at(...)`, `telemetry.boolean_segments_between(...)`, `telemetry.boolean_samples(...)`
105
+
106
+### Maintenance
107
+
108
+The schema now also includes:
109
+
110
+- `telemetry.verify_segments(...)` for integrity checks
111
+- `telemetry.compact_segments(...)` for manual zero-gap identical-value compaction
112
+- `telemetry.metric_retention_policies` as a retention/compaction policy registry
113
+- `telemetry.inactive_devices(...)` for offline-device detection
+335 -0
ingestion_docs/counter_ingestion_api.md
@@ -0,0 +1,335 @@
1
+# Counter Ingestion API
2
+
3
+This document defines how a historian worker should write cumulative counter observations into the PostgreSQL telemetry historian.
4
+
5
+It complements the measurement ingestion path and exists because cumulative counters do not have the same storage semantics as instantaneous measurements.
6
+
7
+Examples of counters:
8
+
9
+- `energy_total`
10
+- `import_energy_total`
11
+- `export_energy_total`
12
+- `rx_bytes_total`
13
+- `tx_packets_total`
14
+
15
+Measurement-style numeric and boolean values continue to use `mqtt_ingestion_api.md`.
16
+
17
+---
18
+
19
+## 1. System Overview
20
+
21
+One common ingestion path is:
22
+
23
+```text
24
+Canonical MQTT bus
25
+    -> historian worker
26
+    -> telemetry.ingest_counter(...)
27
+    -> counter historian tables
28
+```
29
+
30
+The worker is responsible for:
31
+
32
+- subscribing to canonical counter-bearing bus topics
33
+- mapping each message to `metric_name`, `device_id`, `counter_value`, and `observed_at`
34
+- preserving ordering per counter stream
35
+- passing replay metadata when available
36
+- logging result actions and failures
37
+
38
+The database is responsible for:
39
+
40
+- append-only ordering enforcement
41
+- duplicate detection where replay metadata allows it
42
+- reset and rollover boundary classification
43
+- policy lookup
44
+- query-time freshness and gap interpretation
45
+
46
+The worker should treat PostgreSQL as the source of truth for counter boundary logic.
47
+
48
+---
49
+
50
+## 2. Why Counter Data Uses a Separate API
51
+
52
+Cumulative counters are not the same as instantaneous measurements.
53
+
54
+Key differences:
55
+
56
+- the value represents an ever-growing total, not a sampled state
57
+- a drop in value is usually meaningful and may indicate reset or rollover
58
+- gaps should be derived at query time from freshness policy, not stored as synthetic `NULL`
59
+- `counter_value` should never be written as `NULL`
60
+
61
+For this reason, the worker MUST NOT send cumulative counters through `telemetry.ingest_measurement(...)` just because the payload is numeric.
62
+
63
+---
64
+
65
+## 3. Example MQTT Mapping
66
+
67
+The database does not parse MQTT topics.
68
+
69
+The worker maps canonical topics to application identifiers before calling PostgreSQL.
70
+
71
+Example 1:
72
+
73
+```text
74
+topic: vad/energy/load/living-room-tv/energy_total/value
75
+
76
+metric_name   = "energy_total"
77
+device_id     = "load.living-room-tv"
78
+counter_value = 4.72
79
+observed_at   = 2026-03-21T10:15:12Z
80
+```
81
+
82
+Example 2:
83
+
84
+```text
85
+topic: vad/energy/grid/main-meter/import_energy_total/value
86
+
87
+metric_name   = "import_energy_total"
88
+device_id     = "grid.main-meter"
89
+counter_value = 18654.31
90
+observed_at   = 2026-03-21T10:15:12Z
91
+```
92
+
93
+The same pattern applies to future domains such as network traffic counters.
94
+
95
+---
96
+
97
+## 4. Database Ingestion API
98
+
99
+Canonical signature:
100
+
101
+```sql
102
+SELECT *
103
+FROM telemetry.ingest_counter(
104
+    p_metric_name     => $1,
105
+    p_device_id       => $2,
106
+    p_counter_value   => $3::numeric,
107
+    p_observed_at     => $4::timestamptz,
108
+    p_source_sequence => $5,
109
+    p_idempotency_key => $6,
110
+    p_snapshot_id     => $7
111
+);
112
+```
113
+
114
+Parameter semantics:
115
+
116
+- `p_metric_name`: counter metric identifier registered in the database
117
+- `p_device_id`: logical device identifier chosen by the worker
118
+- `p_counter_value`: cumulative counter observation
119
+- `p_observed_at`: time the source observed the counter value
120
+- `p_source_sequence`: optional monotonic source-side sequence or offset
121
+- `p_idempotency_key`: optional replay-safe deduplication key
122
+- `p_snapshot_id`: optional identifier for a source snapshot or polling batch
123
+
124
+Rules:
125
+
126
+- `p_counter_value` MUST NOT be `NULL`
127
+- `p_observed_at` MUST be present
128
+- workers SHOULD pass `p_source_sequence` when the source provides a reliable monotonic sequence
129
+- workers SHOULD pass `p_idempotency_key` when replay ambiguity exists
130
+- workers MAY leave optional replay fields `NULL` if the source does not provide them
131
+
132
+---
133
+
134
+## 5. Core Semantics
135
+
136
+Counter stream identity is:
137
+
138
+```text
139
+stream = (metric_name, device_id)
140
+```
141
+
142
+Core rules:
143
+
144
+- writes are append-only per stream
145
+- observations MUST arrive in strictly increasing `observed_at` order per stream
146
+- `counter_value` MUST be non-null
147
+- silence does not produce synthetic `NULL` writes
148
+- decreases in `counter_value` create semantic boundaries
149
+- deltas and rates MUST NOT cross reset or rollover boundaries
150
+
151
+The worker should assume the database owns the interpretation of those boundaries through metric policy.
152
+
153
+---
154
+
155
+## 6. Reliability Metadata
156
+
157
+Three reliability levels are supported conceptually.
158
+
159
+### Degraded / At-Most-Once
160
+
161
+Inputs:
162
+
163
+- no `source_sequence`
164
+- no `idempotency_key`
165
+
166
+Implication:
167
+
168
+- retries after ambiguous failures are unsafe
169
+
170
+### Recommended / At-Least-Once
171
+
172
+Inputs:
173
+
174
+- `source_sequence` or `idempotency_key`
175
+
176
+Implication:
177
+
178
+- replay is detectable
179
+
180
+### Stronger Replay Safety
181
+
182
+Inputs:
183
+
184
+- both `source_sequence` and `idempotency_key`
185
+
186
+Implication:
187
+
188
+- duplicate detection is explicit and robust
189
+
190
+The database contract should remain usable at all three levels, but higher reliability depends on richer source metadata.
191
+
192
+---
193
+
194
+## 7. Reporting Modes and Freshness
195
+
196
+Counter policies should support three reporting modes:
197
+
198
+- `periodic`
199
+- `on_change`
200
+- `hybrid`
201
+
202
+Semantics:
203
+
204
+- `periodic`: updates are expected on a cadence; silence eventually means stale
205
+- `on_change`: updates happen only on change; silence does not imply delta `0`
206
+- `hybrid`: updates happen on change plus periodic heartbeat
207
+
208
+Recommended freshness defaults:
209
+
210
+| Reporting mode | Input | Default `stale_after_s` |
211
+|---|---|---|
212
+| `periodic` | `expected_interval_s` | `expected_interval_s * 2` |
213
+| `on_change` | `heartbeat_interval_s` | `heartbeat_interval_s * 2` |
214
+| `hybrid` | `heartbeat_interval_s` | `heartbeat_interval_s * 2` |
215
+
216
+If `stale_after_s` is explicitly defined by policy, it overrides these defaults.
217
+
218
+Freshness is evaluated at query time, not by inserting synthetic unknown rows.
219
+
220
+---
221
+
222
+## 8. Ingestion Response
223
+
224
+Each call should return one row that is useful for logging and debugging.
225
+
226
+Recommended fields:
227
+
228
+- `metric_name`
229
+- `device_id`
230
+- `normalized_counter_value`
231
+- `action`
232
+- `boundary_kind`
233
+
234
+Recommended `action` values:
235
+
236
+- `opened`: first observation for an open stream
237
+- `extended`: normal append to an existing stream
238
+- `duplicate_ignored`: replay duplicate recognized from reliability metadata
239
+- `boundary_split`: a reset or rollover boundary was classified and a new segment started
240
+
241
+Recommended `boundary_kind` values:
242
+
243
+- `none`
244
+- `reset_boundary`
245
+- `rollover_boundary`
246
+- `invalid_drop`
247
+
248
+Workers usually should not branch on these values, but they SHOULD log them.
249
+
250
+---
251
+
252
+## 9. Worker Routing Rules
253
+
254
+The historian worker should route counter samples based on canonical metric semantics, not vendor heuristics.
255
+
256
+Rules:
257
+
258
+- bus metrics explicitly classified as cumulative counters MUST go to `telemetry.ingest_counter(...)`
259
+- measurement-style metrics MUST continue to go to `telemetry.ingest_measurement(...)`
260
+- retained `last` MUST NOT be used as a normal counter ingestion source
261
+- if a counter metric is encountered and the counter path is unavailable, the worker SHOULD skip it and expose the skip operationally
262
+
263
+Examples of metrics that belong here:
264
+
265
+- `energy_total`
266
+- `import_energy_total`
267
+- `export_energy_total`
268
+- `rx_bytes_total`
269
+- `tx_packets_total`
270
+
271
+Examples of metrics that do not belong here:
272
+
273
+- `active_power`
274
+- `voltage`
275
+- `current`
276
+- `temperature`
277
+- `soc`
278
+
279
+---
280
+
281
+## 10. Error Handling
282
+
283
+Permanent message errors include:
284
+
285
+- unknown counter metric
286
+- out-of-order counter observation
287
+- `NULL` counter value
288
+- negative counter value where policy disallows it
289
+- replay metadata conflict that proves the message is invalid
290
+
291
+Transient failures include:
292
+
293
+- PostgreSQL unavailable
294
+- network interruption between worker and database
295
+- temporary worker restart during streaming
296
+
297
+Rules:
298
+
299
+- permanent message errors SHOULD be logged and dead-lettered
300
+- transient failures SHOULD be retried
301
+- retries MUST preserve ordering per `(metric_name, device_id)`
302
+- workers SHOULD treat ambiguous retry success followed by duplicate detection as an operational duplicate, not as corruption
303
+
304
+---
305
+
306
+## 11. Implementation Guidance
307
+
308
+Recommended implementation order:
309
+
310
+1. worker-side routing from semantic bus metrics to counter pipeline
311
+2. SQL wrapper for `telemetry.ingest_counter(...)`
312
+3. operational metrics for skipped, ingested, duplicated, and boundary-split counter samples
313
+4. query-side validation for freshness, reset handling, and delta/rate calculations
314
+
315
+Recommended first live metrics:
316
+
317
+- `energy_total`
318
+- `import_energy_total`
319
+- `export_energy_total`
320
+
321
+These are the most concrete counter metrics already present in the current semantic bus design.
322
+
323
+---
324
+
325
+## 12. Relationship with the Semantic Bus
326
+
327
+The semantic bus remains the canonical transport.
328
+
329
+Counter metrics:
330
+
331
+- continue to use normal semantic bus topics such as `.../value`
332
+- remain visible to any consumer that understands cumulative semantics
333
+- do not require a separate bus just because the historian storage path differs
334
+
335
+This preserves one semantic model while allowing the historian to use a storage API that matches counter behavior.
+428 -0
ingestion_docs/mqtt_ingestion_api.md
@@ -0,0 +1,428 @@
1
+# Telemetry Ingestion API
2
+
3
+This document describes how an ingestion worker should write measurements into the PostgreSQL telemetry historian.
4
+
5
+It documents the database API as implemented today. It does not redefine the schema or the historian model.
6
+
7
+This document covers only the currently implemented measurement API. Counter metrics are tracked separately in [counter_ingestion_api.md](./counter_ingestion_api.md) and [energy_counter_draft.md](./energy_counter_draft.md).
8
+
9
+MQTT is used throughout this document only as an example source transport because it is a common worker implementation target. The PostgreSQL API itself is source-agnostic: the same ingestion flow applies to any worker that can map incoming data to `metric_name`, `device_id`, `value`, and `observed_at`.
10
+
11
+## 1. System Overview
12
+
13
+One common ingestion path is:
14
+
15
+```text
16
+Source devices / upstream bus   
17
+    -> ingestion worker
18
+    -> telemetry.ingest_measurement(...)
19
+    -> metric segment tables
20
+```
21
+
22
+More generally, the source worker is responsible for:
23
+
24
+- receiving source messages from MQTT, HTTP, files, queues, serial links, or another upstream transport
25
+- parsing the source envelope and payload
26
+- mapping each message to `metric_name`, `device_id`, `value`, and `observed_at`
27
+- calling the correct PostgreSQL ingestion function
28
+- logging success or failure
29
+
30
+The database is responsible for historian behavior:
31
+
32
+- append-only ingestion
33
+- per-device/per-metric serialization via advisory locks
34
+- deduplication by epsilon or exact comparison
35
+- segment storage instead of raw sample storage
36
+- lazy gap detection
37
+- explicit unknown intervals via `NULL`
38
+- policy versioning via `telemetry.metric_policies`
39
+- automatic segment splitting when policy boundaries are crossed
40
+
41
+For non-MQTT transports, replace topic subscription/parsing with the equivalent source-specific decode step. The database contract stays the same.
42
+
43
+Workers should treat PostgreSQL as the source of truth for consolidation logic. They should send measurements in order and let the database decide whether a segment is extended, split, or closed.
44
+
45
+## 2. Example MQTT Topic Model
46
+
47
+If the source transport is MQTT, a common topic shape is:
48
+
49
+```text
50
+/$bus/$metric/$domain/$sensor
51
+```
52
+
53
+Examples:
54
+
55
+```text
56
+/homebus/temperature/bedroom/sensor1
57
+/homebus/humidity/bedroom/sensor1
58
+/homebus/rssi/network/sensor1
59
+```
60
+
61
+Example worker mapping:
62
+
63
+```text
64
+topic: /homebus/temperature/bedroom/sensor1
65
+
66
+metric_name = "temperature"
67
+device_id   = "bedroom.sensor1"
68
+```
69
+
70
+Another example:
71
+
72
+```text
73
+topic: /homebus/rssi/network/sensor1
74
+
75
+metric_name = "rssi"
76
+device_id   = "network.sensor1"
77
+```
78
+
79
+Important:
80
+
81
+- topic-to-metric mapping is implemented in the ingestion worker
82
+- topic-to-device mapping is implemented in the ingestion worker
83
+- the database does not parse MQTT topics
84
+- the database expects already-mapped application identifiers
85
+
86
+If your deployment is not using MQTT, apply the same idea to whatever source envelope you receive. The database only needs the final mapped identifiers.
87
+
88
+If your deployment needs aliases, normalization, or routing rules, implement them in the worker before calling PostgreSQL.
89
+
90
+## 3. Database Ingestion API
91
+
92
+Two typed overloads exist.
93
+
94
+Numeric metrics:
95
+
96
+```sql
97
+SELECT *
98
+FROM telemetry.ingest_measurement(
99
+    p_metric_name  => $1,
100
+    p_device_id    => $2,
101
+    p_value        => $3::double precision,
102
+    p_observed_at  => $4::timestamptz
103
+);
104
+```
105
+
106
+Boolean metrics:
107
+
108
+```sql
109
+SELECT *
110
+FROM telemetry.ingest_measurement(
111
+    p_metric_name  => $1,
112
+    p_device_id    => $2,
113
+    p_value        => $3::boolean,
114
+    p_observed_at  => $4::timestamptz
115
+);
116
+```
117
+
118
+Parameter semantics:
119
+
120
+- `p_metric_name`: application metric identifier registered in `telemetry.metrics`
121
+- `p_device_id`: logical device identifier chosen by the worker
122
+- `p_value`: typed measurement value, or `NULL` for “measurement unavailable”
123
+- `p_observed_at`: time the device observed the measurement
124
+
125
+Important implementation note:
126
+
127
+- because `ingest_measurement` is overloaded, `NULL` values must be typed explicitly
128
+- use `NULL::double precision` for numeric unknown values
129
+- use `NULL::boolean` for boolean unknown values
130
+
131
+Example:
132
+
133
+```sql
134
+SELECT *
135
+FROM telemetry.ingest_measurement(
136
+    'temperature',
137
+    'bedroom.sensor1',
138
+    NULL::double precision,
139
+    '2026-03-08T10:00:00Z'::timestamptz
140
+);
141
+```
142
+
143
+## 4. Ingestion Response
144
+
145
+Each call returns one row:
146
+
147
+- `metric_name`
148
+- `device_id`
149
+- `table_name`
150
+- `normalized_value`
151
+- `action`
152
+
153
+`normalized_value` is the value actually used by the historian after normalization:
154
+
155
+- numeric metrics: rounded according to metric policy
156
+- boolean metrics: identical to input
157
+- `NULL`: explicit unknown value
158
+
159
+`action` is returned by the internal segment engine and is useful for logs, metrics, and debugging.
160
+
161
+Actions:
162
+
163
+- `opened`: no open segment existed; a new non-`NULL` segment was created
164
+- `opened_null`: no open segment existed; a new `NULL` segment was created
165
+- `extended`: the current non-`NULL` segment was kept open and `samples_count` increased
166
+- `extended_null`: the current `NULL` segment was kept open and `samples_count` increased
167
+- `split`: the previous segment was closed and a new segment was opened
168
+- `null_to_value`: a `NULL` segment was closed and replaced by a value segment
169
+- `value_to_null`: a value segment was closed and replaced by a `NULL` segment
170
+- `gap_split`: a late gap was detected; the previous value segment was closed at `last_observed_at + max_sampling_interval`, a synthetic `NULL` gap segment was inserted, then a new value segment was opened
171
+- `gap_to_null`: a late gap was detected and the next state is unknown; the previous value segment was closed at `last_observed_at + max_sampling_interval`, then a new open `NULL` segment was opened
172
+
173
+Workers usually do not branch on `action`. Treat it as observability data.
174
+
175
+Recommended worker logging:
176
+
177
+- metric name
178
+- device id
179
+- observed timestamp
180
+- original payload value
181
+- returned `normalized_value`
182
+- returned `action`
183
+
184
+## 5. Automatic Behaviors Implemented by the Database
185
+
186
+The worker should not re-implement historian logic already handled in PostgreSQL.
187
+
188
+Automatic device provisioning:
189
+
190
+- ingestion calls `ensure_device()`
191
+- if `device_id` does not exist in `telemetry.devices`, it is created automatically
192
+
193
+Append-only enforcement:
194
+
195
+- ingestion calls `assert_append_only()`
196
+- the database rejects measurements where `observed_at <= last_observed_at` for the same `(metric_name, device_id)`
197
+
198
+Policy lookup:
199
+
200
+- ingestion calls `require_metric_policy()`
201
+- the active policy is selected from `telemetry.metric_policies` using `valid_from <= observed_at`
202
+
203
+Gap detection:
204
+
205
+- the database detects gaps lazily when the next measurement arrives
206
+- workers must not emit timeout markers on their own
207
+
208
+Segment splitting:
209
+
210
+- value changes, explicit `NULL`, policy changes, and gap boundaries are converted into segment transitions automatically
211
+
212
+Policy boundary splitting:
213
+
214
+- if a metric policy changes while a segment is still open, the database splits the open segment at `policy.valid_from`
215
+- workers do not need to know segment internals or policy transitions
216
+
217
+Tail `NULL` simulation:
218
+
219
+- if a stream stops, the database does not persist a trailing `NULL` segment immediately
220
+- query functions simulate unknown tail values after `last_observed_at + max_sampling_interval`
221
+
222
+## 6. Error Handling
223
+
224
+Workers should treat database errors as either permanent message errors or transient infrastructure errors, regardless of whether the source transport is MQTT, HTTP, serial, or another upstream feed.
225
+
226
+Common permanent errors:
227
+
228
+- unknown metric
229
+  - example: `ERROR: unknown metric: temperature`
230
+  - cause: `metric_name` is not registered
231
+- out-of-order measurement
232
+  - example: `ERROR: out-of-order measurement for metric ...`
233
+  - cause: message timestamp is older than or equal to the watermark for that metric/device
234
+- invalid numeric value
235
+  - examples:
236
+    - `ERROR: value ... is below min_value ...`
237
+    - `ERROR: value ... is above max_value ...`
238
+  - cause: payload violates policy bounds
239
+- metric type mismatch
240
+  - examples:
241
+    - `ERROR: metric ... is boolean; use the boolean overload of ingest_measurement(...)`
242
+    - `ERROR: metric ... is numeric; use the numeric overload of ingest_measurement(...)`
243
+- disallowed explicit `NULL`
244
+  - example: `ERROR: metric table ... does not allow explicit NULL measurements`
245
+  - cause: worker sent unknown value for a metric with `allow_null = false`
246
+
247
+Recommended behavior for permanent message errors:
248
+
249
+- log the full error together with the source metadata and payload
250
+- if the source is MQTT, include topic and payload
251
+- write the message to a dead-letter path or equivalent audited error sink before discarding it
252
+- discard the message from the hot path after it has been captured for audit
253
+- do not retry it
254
+- increment an application metric or dead-letter counter
255
+
256
+Recommended dead-letter payload:
257
+
258
+- error type
259
+- original source metadata such as topic, route, file offset, or queue message id
260
+- original payload or a safe reference to it
261
+- database error message
262
+- worker instance id
263
+- timestamp
264
+- attempt count if retries already happened
265
+
266
+Recommended behavior for transient failures:
267
+
268
+- retry only for infrastructure failures such as connection loss, failover, or temporary database unavailability
269
+- preserve per-device/per-metric ordering when retrying
270
+- do not reorder buffered measurements while retrying
271
+
272
+If a retry happens after an ambiguous network failure, the worker may see an out-of-order error if the first attempt actually committed. Handle that case as an operational duplicate and log it accordingly.
273
+
274
+## 7. Idempotency and Ordering
275
+
276
+The ingestion API assumes chronological ordering per `(metric_name, device_id)`.
277
+
278
+This is enforced by:
279
+
280
+- `telemetry.metric_device_watermarks`
281
+
282
+Consequences for workers:
283
+
284
+- do not send older measurements after newer ones for the same metric/device
285
+- if a device buffers multiple samples, flush them in timestamp order
286
+- if the source can redeliver messages, preserve original order
287
+- do not use broker receive time to reorder a device stream
288
+
289
+This API is append-only, not last-write-wins.
290
+
291
+Same-timestamp replay is not accepted:
292
+
293
+- `observed_at = last_observed_at` is rejected
294
+- `observed_at < last_observed_at` is rejected
295
+
296
+Important limitation of the current measurement API:
297
+
298
+- `ingest_measurement(...)` does not currently accept an explicit `idempotency_key`
299
+- after an ambiguous network failure, a safe replay may surface as an out-of-order duplicate
300
+- workers should treat this as a limitation of the current measurement API, not as a general design recommendation for future counter ingestion
301
+
302
+## 8. Time Semantics
303
+
304
+`observed_at` is the time the device measured the value.
305
+
306
+It is not:
307
+
308
+- source receive time
309
+- worker processing time
310
+- database insert time
311
+
312
+Preferred worker behavior:
313
+
314
+- use the timestamp provided by the device payload if available
315
+- preserve the device timestamp exactly
316
+- only fall back to worker receive time when the upstream source does not provide an observation timestamp
317
+
318
+Historian correctness depends on `observed_at` being as close as possible to the real device observation time.
319
+
320
+## 9. Example Worker Flow
321
+
322
+Example flow using MQTT as one source transport:
323
+
324
+1. MQTT message is received on `/homebus/temperature/bedroom/sensor1`
325
+2. worker parses the topic
326
+3. worker maps the topic to:
327
+   - `metric_name = 'temperature'`
328
+   - `device_id = 'bedroom.sensor1'`
329
+4. worker parses payload value and device timestamp
330
+5. worker calls `telemetry.ingest_measurement(...)`
331
+6. worker logs `action` and `normalized_value`
332
+
333
+Example numeric SQL call:
334
+
335
+```sql
336
+SELECT *
337
+FROM telemetry.ingest_measurement(
338
+    'temperature',
339
+    'bedroom.sensor1',
340
+    22.97::double precision,
341
+    '2026-03-08T10:15:12Z'::timestamptz
342
+);
343
+```
344
+
345
+Example boolean SQL call:
346
+
347
+```sql
348
+SELECT *
349
+FROM telemetry.ingest_measurement(
350
+    'motion_detected',
351
+    'hallway.sensor1',
352
+    true,
353
+    '2026-03-08T10:15:12Z'::timestamptz
354
+);
355
+```
356
+
357
+Example pseudo-code:
358
+
359
+```text
360
+on_message(topic, payload):
361
+    metric_name = map_topic_to_metric(topic)
362
+    device_id = map_topic_to_device(topic)
363
+    observed_at = extract_device_timestamp(payload)
364
+    value = parse_payload_value(payload, metric_name)
365
+
366
+    row = db.query_one(
367
+        "SELECT * FROM telemetry.ingest_measurement($1, $2, $3::double precision, $4)",
368
+        [metric_name, device_id, value, observed_at]
369
+    )
370
+
371
+    log.info(
372
+        "ingested",
373
+        metric_name=row.metric_name,
374
+        device_id=row.device_id,
375
+        action=row.action,
376
+        normalized_value=row.normalized_value
377
+    )
378
+```
379
+
380
+The worker is responsible for choosing the correct numeric or boolean call path before executing SQL. The same database call pattern applies to any worker after it maps its source message format into the historian parameters.
381
+
382
+## 10. Performance Expectations
383
+
384
+This API is designed for high ingestion rates without requiring the worker to manage historian state.
385
+
386
+Typical work performed per measurement:
387
+
388
+- one advisory lock for `(metric_name, device_id)`
389
+- metric and policy resolution inside PostgreSQL
390
+- one segment update or one segment insert on the target metric table
391
+- watermark and device `last_seen` bookkeeping
392
+
393
+In the common case where a value is unchanged, ingestion usually updates the current open segment rather than inserting a new raw sample row.
394
+
395
+Locking notes:
396
+
397
+- serialization is enforced by a PostgreSQL advisory transaction lock on `(metric_name, device_id)`
398
+- the current implementation waits for the advisory lock inside the transaction; it is not a fail-fast `NOWAIT` contract
399
+- workers should therefore avoid sending the same `(metric_name, device_id)` concurrently from multiple writers
400
+- the preferred operational pattern is writer affinity per `(metric_name, device_id)`, for example via deterministic routing or consistent hashing in the worker layer
401
+- if lock waits become visible in production, treat that as a signal that worker concurrency or routing needs review rather than relying on database-side contention as normal flow
402
+
403
+Batching is optional:
404
+
405
+- a worker may issue one SQL call per source message
406
+- batching can reduce client/network overhead
407
+- batching is not required for correctness
408
+
409
+Practical batching guidance:
410
+
411
+- batching is most useful when it reduces network round-trips without mixing unrelated retry domains
412
+- keep ordering intact per `(metric_name, device_id)` inside the batch
413
+- if a deployment expects large polling waves, modest transactional batches are preferable to one giant transaction
414
+- if policy changes are rolled out, stagger them when possible rather than changing many hot metrics at exactly the same moment
415
+
416
+The most important worker-side performance requirement is preserving ordering while keeping the database connection pool healthy.
417
+
418
+## Implementation Checklist
419
+
420
+- map the source message into `metric_name` and `device_id`
421
+- parse payload into typed numeric or boolean value
422
+- preserve device observation timestamps
423
+- call the correct `ingest_measurement` overload
424
+- cast `NULL` explicitly when sending unknown values
425
+- log returned `action` and `normalized_value`
426
+- discard permanent validation errors
427
+- retry only transient database failures
428
+- preserve order per `(metric_name, device_id)` during retries and buffering
+2212 -0
schema/telemetry_schema.sql
@@ -0,0 +1,2212 @@
1
+BEGIN;
2
+
3
+-- Extensions.
4
+
5
+CREATE SCHEMA telemetry;
6
+CREATE EXTENSION btree_gist;
7
+
8
+-- Enums.
9
+
10
+-- 'state' is reserved for future implementation of discrete enumerated
11
+-- machine states such as thermostat_mode, pump_state, hvac_mode, or
12
+-- door_state. Values will likely be stored as smallint and use the
13
+-- same segment engine as boolean metrics, but with exact comparison
14
+-- over enumerated values.
15
+CREATE TYPE telemetry.metric_type_enum AS ENUM ('numeric', 'boolean', 'state');
16
+
17
+CREATE TYPE telemetry.comparison_mode_enum AS ENUM ('epsilon', 'exact');
18
+
19
+-- Registry utility functions.
20
+
21
+CREATE OR REPLACE FUNCTION telemetry.normalize_metric_table_name(p_table_name text)
22
+RETURNS text
23
+LANGUAGE plpgsql
24
+IMMUTABLE
25
+AS $$
26
+DECLARE
27
+    v_parts text[];
28
+BEGIN
29
+    IF p_table_name IS NULL OR btrim(p_table_name) = '' THEN
30
+        RAISE EXCEPTION 'table_name is required';
31
+    END IF;
32
+
33
+    v_parts := pg_catalog.parse_ident(p_table_name, false);
34
+
35
+    CASE COALESCE(array_length(v_parts, 1), 0)
36
+        WHEN 1 THEN
37
+            RETURN v_parts[1];
38
+        WHEN 2 THEN
39
+            IF v_parts[1] <> 'telemetry' THEN
40
+                RAISE EXCEPTION 'metric tables must live in schema telemetry: %', p_table_name;
41
+            END IF;
42
+
43
+            RETURN v_parts[2];
44
+        ELSE
45
+            RAISE EXCEPTION 'invalid metric table reference: %', p_table_name;
46
+    END CASE;
47
+END;
48
+$$;
49
+
50
+CREATE OR REPLACE FUNCTION telemetry.metric_table_sql(p_table_name text)
51
+RETURNS text
52
+LANGUAGE plpgsql
53
+STABLE
54
+AS $$
55
+DECLARE
56
+    v_table_name text := telemetry.normalize_metric_table_name(p_table_name);
57
+    v_table_sql text := format('telemetry.%I', v_table_name);
58
+BEGIN
59
+    IF to_regclass(v_table_sql) IS NULL THEN
60
+        RAISE EXCEPTION 'could not resolve metric table %', v_table_sql;
61
+    END IF;
62
+
63
+    RETURN v_table_sql;
64
+END;
65
+$$;
66
+
67
+-- Core registry tables.
68
+
69
+CREATE TABLE telemetry.metrics (
70
+    metric_pk bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
71
+    metric_name text NOT NULL UNIQUE,
72
+    table_name text NOT NULL UNIQUE,
73
+    domain_name text NOT NULL DEFAULT 'generic',
74
+    metric_type telemetry.metric_type_enum NOT NULL,
75
+    comparison_mode telemetry.comparison_mode_enum NOT NULL,
76
+    epsilon double precision,
77
+    min_value double precision,
78
+    max_value double precision,
79
+    rounding_precision double precision,
80
+    allow_null boolean NOT NULL DEFAULT true,
81
+    max_sampling_interval interval NOT NULL,
82
+    created_at timestamptz NOT NULL DEFAULT now(),
83
+    updated_at timestamptz NOT NULL DEFAULT now(),
84
+    CHECK (min_value IS NULL OR max_value IS NULL OR min_value <= max_value),
85
+    CONSTRAINT metrics_policy_shape_check
86
+        CHECK (
87
+            (metric_type = 'numeric'::telemetry.metric_type_enum
88
+             AND comparison_mode = 'epsilon'::telemetry.comparison_mode_enum
89
+             AND epsilon IS NOT NULL
90
+             AND rounding_precision IS NOT NULL)
91
+            OR
92
+            (metric_type IN (
93
+                 'boolean'::telemetry.metric_type_enum,
94
+                 'state'::telemetry.metric_type_enum
95
+             )
96
+             AND comparison_mode = 'exact'::telemetry.comparison_mode_enum
97
+             AND epsilon IS NULL
98
+             AND min_value IS NULL
99
+             AND max_value IS NULL
100
+             AND rounding_precision IS NULL)
101
+        ),
102
+    CONSTRAINT metrics_table_name_check
103
+        CHECK (table_name = telemetry.normalize_metric_table_name(table_name))
104
+);
105
+
106
+-- Policy tables.
107
+
108
+CREATE TABLE telemetry.metric_policies (
109
+    policy_id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
110
+    metric_name text NOT NULL
111
+        REFERENCES telemetry.metrics(metric_name)
112
+        ON UPDATE CASCADE
113
+        ON DELETE CASCADE,
114
+    valid_from timestamptz NOT NULL,
115
+    comparison_mode telemetry.comparison_mode_enum NOT NULL,
116
+    epsilon double precision,
117
+    min_value double precision,
118
+    max_value double precision,
119
+    rounding_precision double precision,
120
+    allow_null boolean NOT NULL,
121
+    max_sampling_interval interval NOT NULL,
122
+    created_at timestamptz NOT NULL DEFAULT now(),
123
+    UNIQUE (metric_name, valid_from)
124
+);
125
+
126
+CREATE TABLE telemetry.metric_retention_policies (
127
+    metric_name text PRIMARY KEY
128
+        REFERENCES telemetry.metrics(metric_name)
129
+        ON UPDATE CASCADE
130
+        ON DELETE CASCADE,
131
+    raw_retention interval NULL,
132
+    compact_after interval NULL
133
+);
134
+
135
+-- Device registry tables.
136
+
137
+CREATE TABLE telemetry.devices (
138
+    device_pk bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
139
+    device_id text NOT NULL UNIQUE,
140
+    device_type text NULL,
141
+    location text NULL,
142
+    metadata jsonb NULL,
143
+    last_seen timestamptz NULL,
144
+    created_at timestamptz NOT NULL DEFAULT now(),
145
+    updated_at timestamptz NOT NULL DEFAULT now()
146
+);
147
+
148
+CREATE TABLE telemetry.metric_device_watermarks (
149
+    metric_name text NOT NULL
150
+        REFERENCES telemetry.metrics(metric_name)
151
+        ON UPDATE CASCADE
152
+        ON DELETE CASCADE,
153
+    device_id text NOT NULL
154
+        REFERENCES telemetry.devices(device_id)
155
+        ON UPDATE CASCADE
156
+        ON DELETE CASCADE,
157
+    last_observed_at timestamptz NOT NULL,
158
+    PRIMARY KEY (metric_name, device_id)
159
+);
160
+
161
+-- Runtime helpers.
162
+
163
+CREATE OR REPLACE FUNCTION telemetry.round_to_increment(
164
+    p_value double precision,
165
+    p_increment double precision
166
+)
167
+RETURNS double precision
168
+LANGUAGE sql
169
+IMMUTABLE
170
+STRICT
171
+AS $$
172
+    SELECT CASE
173
+        WHEN p_increment <= 0 THEN p_value
174
+        ELSE round(p_value / p_increment) * p_increment
175
+    END;
176
+$$;
177
+
178
+CREATE OR REPLACE FUNCTION telemetry.ensure_device(p_device_id text)
179
+RETURNS void
180
+LANGUAGE sql
181
+AS $$
182
+    INSERT INTO telemetry.devices (device_id)
183
+    VALUES ($1)
184
+    ON CONFLICT (device_id) DO NOTHING;
185
+$$;
186
+
187
+CREATE OR REPLACE FUNCTION telemetry.touch_device_last_seen(
188
+    p_device_id text,
189
+    p_observed_at timestamptz
190
+)
191
+RETURNS void
192
+LANGUAGE sql
193
+AS $$
194
+    UPDATE telemetry.devices
195
+    SET last_seen = CASE
196
+        WHEN last_seen IS NULL OR last_seen < $2 THEN $2
197
+        ELSE last_seen
198
+    END
199
+    WHERE device_id = $1;
200
+$$;
201
+
202
+CREATE OR REPLACE FUNCTION telemetry.require_device(p_device_id text)
203
+RETURNS telemetry.devices
204
+LANGUAGE plpgsql
205
+STABLE
206
+AS $$
207
+DECLARE
208
+    v_device telemetry.devices%ROWTYPE;
209
+BEGIN
210
+    SELECT *
211
+    INTO v_device
212
+    FROM telemetry.devices AS d
213
+    WHERE d.device_id = p_device_id;
214
+
215
+    IF NOT FOUND THEN
216
+        RAISE EXCEPTION 'unknown device: %', p_device_id;
217
+    END IF;
218
+
219
+    RETURN v_device;
220
+END;
221
+$$;
222
+
223
+CREATE OR REPLACE FUNCTION telemetry.lock_key_from_text(p_value text)
224
+RETURNS integer
225
+LANGUAGE sql
226
+IMMUTABLE
227
+STRICT
228
+AS $$
229
+    SELECT (hashtextextended($1, 0) % 2147483647)::integer;
230
+$$;
231
+
232
+-- Segment template and table creation.
233
+
234
+CREATE OR REPLACE FUNCTION telemetry.create_segment_table(
235
+    p_table_name text,
236
+    p_value_type text
237
+)
238
+RETURNS void
239
+LANGUAGE plpgsql
240
+AS $$
241
+DECLARE
242
+    v_base_table_name text := telemetry.normalize_metric_table_name(p_table_name);
243
+    v_table_name text := format('telemetry.%I', v_base_table_name);
244
+    v_metric_name text;
245
+    v_initial_policy_id bigint;
246
+    v_open_idx text := format('%s_open_segment_uidx', v_base_table_name);
247
+    v_start_idx text := format('%s_device_start_cover_idx', v_base_table_name);
248
+    v_device_period_gist_idx text := format('%s_device_period_gist', v_base_table_name);
249
+    v_period_excl text := format('%s_device_period_excl', v_base_table_name);
250
+    v_device_pk_fk text := format('%s_device_pk_fkey', v_base_table_name);
251
+    v_device_fk text := format('%s_device_id_fkey', v_base_table_name);
252
+    v_policy_fk text := format('%s_policy_id_fkey', v_base_table_name);
253
+    v_samples_check text := format('%s_samples_count_check', v_base_table_name);
254
+    v_end_time_check text := format('%s_check', v_base_table_name);
255
+BEGIN
256
+    IF p_value_type NOT IN ('double precision', 'boolean', 'smallint') THEN
257
+        RAISE EXCEPTION 'unsupported segment value type: %', p_value_type;
258
+    END IF;
259
+
260
+    SELECT m.metric_name
261
+    INTO v_metric_name
262
+    FROM telemetry.metrics AS m
263
+    WHERE m.table_name = v_base_table_name;
264
+
265
+    IF v_metric_name IS NULL THEN
266
+        RAISE EXCEPTION
267
+            'metric metadata must exist before creating segment table %',
268
+            v_base_table_name;
269
+    END IF;
270
+
271
+    SELECT mp.policy_id
272
+    INTO v_initial_policy_id
273
+    FROM telemetry.metric_policies AS mp
274
+    WHERE mp.metric_name = v_metric_name
275
+      AND mp.valid_from = '-infinity'::timestamptz;
276
+
277
+    IF v_initial_policy_id IS NULL THEN
278
+        RAISE EXCEPTION
279
+            'initial policy row is missing for metric %',
280
+            v_metric_name;
281
+    END IF;
282
+
283
+    EXECUTE format(
284
+        'CREATE TABLE IF NOT EXISTS %s (
285
+            segment_id bigserial PRIMARY KEY,
286
+            device_pk bigint NOT NULL,
287
+            device_id text NOT NULL,
288
+            start_time timestamptz NOT NULL,
289
+            end_time timestamptz NULL,
290
+            value %s NULL,
291
+            -- samples_count = 0 is intentional for synthetic gap segments inserted
292
+            -- during lazy gap detection; those intervals represent missing data, not samples.
293
+            samples_count integer NOT NULL DEFAULT 1
294
+                CONSTRAINT %I CHECK (samples_count >= 0),
295
+            policy_id bigint NOT NULL,
296
+            segment_period tstzrange GENERATED ALWAYS AS (
297
+                tstzrange(start_time, COALESCE(end_time, ''infinity''::timestamptz), ''[)'')
298
+            ) STORED,
299
+            CONSTRAINT %I CHECK (end_time IS NULL OR end_time > start_time),
300
+            CONSTRAINT %I
301
+                FOREIGN KEY (device_pk)
302
+                REFERENCES telemetry.devices(device_pk)
303
+                ON UPDATE CASCADE
304
+                ON DELETE CASCADE,
305
+            CONSTRAINT %I
306
+                FOREIGN KEY (device_id)
307
+                REFERENCES telemetry.devices(device_id)
308
+                ON UPDATE CASCADE
309
+                ON DELETE CASCADE,
310
+            CONSTRAINT %I
311
+                FOREIGN KEY (policy_id)
312
+                REFERENCES telemetry.metric_policies(policy_id)
313
+        ) WITH (
314
+            fillfactor = 90,
315
+            autovacuum_vacuum_scale_factor = 0.02,
316
+            autovacuum_analyze_scale_factor = 0.01
317
+        )',
318
+        v_table_name,
319
+        p_value_type,
320
+        v_samples_check,
321
+        v_end_time_check,
322
+        v_device_pk_fk,
323
+        v_device_fk,
324
+        v_policy_fk
325
+    );
326
+
327
+    EXECUTE format(
328
+        'CREATE UNIQUE INDEX IF NOT EXISTS %I
329
+         ON %s (device_id)
330
+         WHERE end_time IS NULL',
331
+        v_open_idx,
332
+        v_table_name
333
+    );
334
+
335
+    EXECUTE format(
336
+        'CREATE INDEX IF NOT EXISTS %I
337
+         ON %s (device_id, start_time DESC)
338
+         INCLUDE (end_time, value, samples_count)',
339
+        v_start_idx,
340
+        v_table_name
341
+    );
342
+
343
+    EXECUTE format(
344
+        'CREATE INDEX IF NOT EXISTS %I
345
+         ON %s
346
+         USING gist (device_id, segment_period)',
347
+        v_device_period_gist_idx,
348
+        v_table_name
349
+    );
350
+
351
+    BEGIN
352
+        EXECUTE format(
353
+            'ALTER TABLE %s
354
+             ADD CONSTRAINT %I
355
+             EXCLUDE USING gist (
356
+                 device_id WITH =,
357
+                 segment_period WITH &&
358
+             )',
359
+            v_table_name,
360
+            v_period_excl
361
+        );
362
+    EXCEPTION
363
+        WHEN duplicate_object THEN
364
+            NULL;
365
+        WHEN duplicate_table THEN
366
+            NULL;
367
+    END;
368
+END;
369
+$$;
370
+
371
+
372
+-- Metric registration.
373
+
374
+CREATE OR REPLACE FUNCTION telemetry.register_numeric_metric(
375
+    p_metric_name text,
376
+    p_table_name text,
377
+    p_domain_name text,
378
+    p_epsilon double precision,
379
+    p_min_value double precision,
380
+    p_max_value double precision,
381
+    p_rounding_precision double precision,
382
+    p_max_sampling_interval interval,
383
+    p_allow_null boolean DEFAULT true
384
+)
385
+RETURNS void
386
+LANGUAGE plpgsql
387
+AS $$
388
+DECLARE
389
+    v_table_name text := telemetry.normalize_metric_table_name(p_table_name);
390
+BEGIN
391
+    INSERT INTO telemetry.metrics (
392
+        metric_name,
393
+        table_name,
394
+        domain_name,
395
+        metric_type,
396
+        comparison_mode,
397
+        epsilon,
398
+        min_value,
399
+        max_value,
400
+        rounding_precision,
401
+        max_sampling_interval,
402
+        allow_null
403
+    )
404
+    VALUES (
405
+        p_metric_name,
406
+        v_table_name,
407
+        p_domain_name,
408
+        'numeric',
409
+        'epsilon',
410
+        p_epsilon,
411
+        p_min_value,
412
+        p_max_value,
413
+        p_rounding_precision,
414
+        p_max_sampling_interval,
415
+        p_allow_null
416
+    )
417
+    ON CONFLICT (metric_name) DO UPDATE
418
+    SET table_name = EXCLUDED.table_name,
419
+        domain_name = EXCLUDED.domain_name,
420
+        metric_type = EXCLUDED.metric_type,
421
+        comparison_mode = EXCLUDED.comparison_mode,
422
+        epsilon = EXCLUDED.epsilon,
423
+        min_value = EXCLUDED.min_value,
424
+        max_value = EXCLUDED.max_value,
425
+        rounding_precision = EXCLUDED.rounding_precision,
426
+        max_sampling_interval = EXCLUDED.max_sampling_interval,
427
+        allow_null = EXCLUDED.allow_null,
428
+        updated_at = now();
429
+
430
+    INSERT INTO telemetry.metric_policies (
431
+        metric_name,
432
+        valid_from,
433
+        comparison_mode,
434
+        epsilon,
435
+        min_value,
436
+        max_value,
437
+        rounding_precision,
438
+        allow_null,
439
+        max_sampling_interval
440
+    )
441
+    VALUES (
442
+        p_metric_name,
443
+        '-infinity'::timestamptz,
444
+        'epsilon',
445
+        p_epsilon,
446
+        p_min_value,
447
+        p_max_value,
448
+        p_rounding_precision,
449
+        p_allow_null,
450
+        p_max_sampling_interval
451
+    )
452
+    ON CONFLICT (metric_name, valid_from) DO NOTHING;
453
+
454
+    PERFORM telemetry.create_segment_table(v_table_name, 'double precision');
455
+    PERFORM telemetry.metric_table_sql(v_table_name);
456
+END;
457
+$$;
458
+
459
+CREATE OR REPLACE FUNCTION telemetry.register_boolean_metric(
460
+    p_metric_name text,
461
+    p_table_name text,
462
+    p_domain_name text,
463
+    p_max_sampling_interval interval,
464
+    p_allow_null boolean DEFAULT true
465
+)
466
+RETURNS void
467
+LANGUAGE plpgsql
468
+AS $$
469
+DECLARE
470
+    v_table_name text := telemetry.normalize_metric_table_name(p_table_name);
471
+BEGIN
472
+    INSERT INTO telemetry.metrics (
473
+        metric_name,
474
+        table_name,
475
+        domain_name,
476
+        metric_type,
477
+        comparison_mode,
478
+        epsilon,
479
+        min_value,
480
+        max_value,
481
+        rounding_precision,
482
+        max_sampling_interval,
483
+        allow_null
484
+    )
485
+    VALUES (
486
+        p_metric_name,
487
+        v_table_name,
488
+        p_domain_name,
489
+        'boolean',
490
+        'exact',
491
+        NULL,
492
+        NULL,
493
+        NULL,
494
+        NULL,
495
+        p_max_sampling_interval,
496
+        p_allow_null
497
+    )
498
+    ON CONFLICT (metric_name) DO UPDATE
499
+    SET table_name = EXCLUDED.table_name,
500
+        domain_name = EXCLUDED.domain_name,
501
+        metric_type = EXCLUDED.metric_type,
502
+        comparison_mode = EXCLUDED.comparison_mode,
503
+        epsilon = EXCLUDED.epsilon,
504
+        min_value = EXCLUDED.min_value,
505
+        max_value = EXCLUDED.max_value,
506
+        rounding_precision = EXCLUDED.rounding_precision,
507
+        max_sampling_interval = EXCLUDED.max_sampling_interval,
508
+        allow_null = EXCLUDED.allow_null,
509
+        updated_at = now();
510
+
511
+    INSERT INTO telemetry.metric_policies (
512
+        metric_name,
513
+        valid_from,
514
+        comparison_mode,
515
+        epsilon,
516
+        min_value,
517
+        max_value,
518
+        rounding_precision,
519
+        allow_null,
520
+        max_sampling_interval
521
+    )
522
+    VALUES (
523
+        p_metric_name,
524
+        '-infinity'::timestamptz,
525
+        'exact',
526
+        NULL,
527
+        NULL,
528
+        NULL,
529
+        NULL,
530
+        p_allow_null,
531
+        p_max_sampling_interval
532
+    )
533
+    ON CONFLICT (metric_name, valid_from) DO NOTHING;
534
+
535
+    PERFORM telemetry.create_segment_table(v_table_name, 'boolean');
536
+    PERFORM telemetry.metric_table_sql(v_table_name);
537
+END;
538
+$$;
539
+
540
+-- Metric and policy lookup.
541
+
542
+CREATE OR REPLACE FUNCTION telemetry.require_metric(p_metric_name text)
543
+RETURNS telemetry.metrics
544
+LANGUAGE plpgsql
545
+STABLE
546
+AS $$
547
+DECLARE
548
+    v_metric telemetry.metrics%ROWTYPE;
549
+BEGIN
550
+    SELECT *
551
+    INTO v_metric
552
+    FROM telemetry.metrics AS m
553
+    WHERE m.metric_name = p_metric_name;
554
+
555
+    IF NOT FOUND THEN
556
+        RAISE EXCEPTION 'unknown metric: %', p_metric_name;
557
+    END IF;
558
+
559
+    RETURN v_metric;
560
+END;
561
+$$;
562
+
563
+CREATE OR REPLACE FUNCTION telemetry.require_metric_policy(
564
+    p_metric_name text,
565
+    p_observed_at timestamptz
566
+)
567
+RETURNS telemetry.metric_policies
568
+LANGUAGE plpgsql
569
+STABLE
570
+AS $$
571
+DECLARE
572
+    v_policy telemetry.metric_policies%ROWTYPE;
573
+BEGIN
574
+    SELECT *
575
+    INTO v_policy
576
+    FROM telemetry.metric_policies AS mp
577
+    WHERE mp.metric_name = p_metric_name
578
+      AND mp.valid_from <= p_observed_at
579
+    ORDER BY mp.valid_from DESC
580
+    LIMIT 1;
581
+
582
+    IF NOT FOUND THEN
583
+        RAISE EXCEPTION
584
+            'no active policy for metric % at %',
585
+            p_metric_name,
586
+            p_observed_at;
587
+    END IF;
588
+
589
+    RETURN v_policy;
590
+END;
591
+$$;
592
+
593
+CREATE OR REPLACE FUNCTION telemetry.add_metric_policy(
594
+    p_metric_name text,
595
+    p_valid_from timestamptz,
596
+    p_comparison_mode telemetry.comparison_mode_enum,
597
+    p_epsilon double precision,
598
+    p_min_value double precision,
599
+    p_max_value double precision,
600
+    p_rounding_precision double precision,
601
+    p_max_sampling_interval interval,
602
+    p_allow_null boolean
603
+)
604
+RETURNS bigint
605
+LANGUAGE plpgsql
606
+AS $$
607
+DECLARE
608
+    v_metric telemetry.metrics%ROWTYPE;
609
+    v_policy_id bigint;
610
+BEGIN
611
+    IF p_valid_from IS NULL THEN
612
+        RAISE EXCEPTION 'valid_from is required';
613
+    END IF;
614
+
615
+    IF p_max_sampling_interval IS NULL THEN
616
+        RAISE EXCEPTION 'max_sampling_interval is required';
617
+    END IF;
618
+
619
+    v_metric := telemetry.require_metric(p_metric_name);
620
+
621
+    IF p_min_value IS NOT NULL
622
+       AND p_max_value IS NOT NULL
623
+       AND p_min_value > p_max_value THEN
624
+        RAISE EXCEPTION 'min_value % exceeds max_value % for metric %',
625
+            p_min_value,
626
+            p_max_value,
627
+            p_metric_name;
628
+    END IF;
629
+
630
+    CASE v_metric.metric_type
631
+        WHEN 'numeric' THEN
632
+            IF p_comparison_mode <> 'epsilon'::telemetry.comparison_mode_enum THEN
633
+                RAISE EXCEPTION
634
+                    'numeric metric % requires comparison_mode epsilon',
635
+                    p_metric_name;
636
+            END IF;
637
+
638
+            IF p_epsilon IS NULL OR p_rounding_precision IS NULL THEN
639
+                RAISE EXCEPTION
640
+                    'numeric metric % requires epsilon and rounding_precision',
641
+                    p_metric_name;
642
+            END IF;
643
+        WHEN 'boolean', 'state' THEN
644
+            IF p_comparison_mode <> 'exact'::telemetry.comparison_mode_enum THEN
645
+                RAISE EXCEPTION
646
+                    'metric % with type % requires comparison_mode exact',
647
+                    p_metric_name,
648
+                    v_metric.metric_type;
649
+            END IF;
650
+
651
+            IF p_epsilon IS NOT NULL
652
+               OR p_min_value IS NOT NULL
653
+               OR p_max_value IS NOT NULL
654
+               OR p_rounding_precision IS NOT NULL THEN
655
+                RAISE EXCEPTION
656
+                    'metric % with type % does not accept epsilon/min/max/rounding_precision in policies',
657
+                    p_metric_name,
658
+                    v_metric.metric_type;
659
+            END IF;
660
+        ELSE
661
+            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
662
+    END CASE;
663
+
664
+    IF EXISTS (
665
+        SELECT 1
666
+        FROM telemetry.metric_policies AS mp
667
+        WHERE mp.metric_name = p_metric_name
668
+          AND mp.valid_from = p_valid_from
669
+    ) THEN
670
+        RAISE EXCEPTION
671
+            'policy already exists for metric % at %',
672
+            p_metric_name,
673
+            p_valid_from;
674
+    END IF;
675
+
676
+    INSERT INTO telemetry.metric_policies (
677
+        metric_name,
678
+        valid_from,
679
+        comparison_mode,
680
+        epsilon,
681
+        min_value,
682
+        max_value,
683
+        rounding_precision,
684
+        allow_null,
685
+        max_sampling_interval
686
+    )
687
+    VALUES (
688
+        p_metric_name,
689
+        p_valid_from,
690
+        p_comparison_mode,
691
+        p_epsilon,
692
+        p_min_value,
693
+        p_max_value,
694
+        p_rounding_precision,
695
+        p_allow_null,
696
+        p_max_sampling_interval
697
+    )
698
+    RETURNING policy_id
699
+    INTO v_policy_id;
700
+
701
+    UPDATE telemetry.metrics AS m
702
+    SET comparison_mode = p_comparison_mode,
703
+        epsilon = p_epsilon,
704
+        min_value = p_min_value,
705
+        max_value = p_max_value,
706
+        rounding_precision = p_rounding_precision,
707
+        allow_null = p_allow_null,
708
+        max_sampling_interval = p_max_sampling_interval,
709
+        updated_at = now()
710
+    WHERE m.metric_name = p_metric_name
711
+      AND NOT EXISTS (
712
+          SELECT 1
713
+          FROM telemetry.metric_policies AS later
714
+          WHERE later.metric_name = p_metric_name
715
+            AND later.valid_from > p_valid_from
716
+      );
717
+
718
+    RETURN v_policy_id;
719
+END;
720
+$$;
721
+
722
+-- Ingestion functions.
723
+
724
+CREATE OR REPLACE FUNCTION telemetry.assert_append_only(
725
+    p_metric_name text,
726
+    p_device_id text,
727
+    p_observed_at timestamptz
728
+)
729
+RETURNS timestamptz
730
+LANGUAGE plpgsql
731
+AS $$
732
+DECLARE
733
+    v_last_observed_at timestamptz;
734
+BEGIN
735
+    SELECT w.last_observed_at
736
+    INTO v_last_observed_at
737
+    FROM telemetry.metric_device_watermarks AS w
738
+    WHERE w.metric_name = p_metric_name
739
+      AND w.device_id = p_device_id;
740
+
741
+    IF FOUND AND p_observed_at <= v_last_observed_at THEN
742
+        RAISE EXCEPTION
743
+            'out-of-order measurement for metric % device %: incoming % <= last observed %',
744
+            p_metric_name,
745
+            p_device_id,
746
+            p_observed_at,
747
+            v_last_observed_at;
748
+    END IF;
749
+
750
+    RETURN v_last_observed_at;
751
+END;
752
+$$;
753
+
754
+CREATE OR REPLACE FUNCTION telemetry.bump_watermark(
755
+    p_metric_name text,
756
+    p_device_id text,
757
+    p_observed_at timestamptz
758
+)
759
+RETURNS void
760
+LANGUAGE sql
761
+AS $$
762
+    INSERT INTO telemetry.metric_device_watermarks (
763
+        metric_name,
764
+        device_id,
765
+        last_observed_at
766
+    )
767
+    VALUES ($1, $2, $3)
768
+    ON CONFLICT ON CONSTRAINT metric_device_watermarks_pkey DO UPDATE
769
+    SET last_observed_at = EXCLUDED.last_observed_at;
770
+$$;
771
+
772
+CREATE OR REPLACE FUNCTION telemetry.ingest_segment(
773
+    p_table_name text,
774
+    p_metric_type telemetry.metric_type_enum,
775
+    p_comparison_mode telemetry.comparison_mode_enum,
776
+    p_policy_id bigint,
777
+    p_policy_valid_from timestamptz,
778
+    p_device_pk bigint,
779
+    p_device_id text,
780
+    p_value anyelement,
781
+    p_observed_at timestamptz,
782
+    p_last_observed_at timestamptz,
783
+    p_epsilon double precision,
784
+    p_max_sampling_interval interval,
785
+    p_allow_null boolean
786
+)
787
+RETURNS text
788
+LANGUAGE plpgsql
789
+AS $$
790
+DECLARE
791
+    v_current record;
792
+    v_has_current boolean;
793
+    v_gap_start timestamptz;
794
+    v_rowcount bigint;
795
+    v_equal boolean;
796
+    v_policy_changed boolean;
797
+    v_effective_last_observed_at timestamptz := p_last_observed_at;
798
+    v_table_name text := telemetry.normalize_metric_table_name(p_table_name);
799
+    v_table_sql text := telemetry.metric_table_sql(p_table_name);
800
+BEGIN
801
+    EXECUTE format(
802
+        'SELECT segment_id, device_pk, device_id, start_time, end_time, value, samples_count, policy_id
803
+         FROM %s
804
+         WHERE device_id = $1
805
+           AND end_time IS NULL
806
+         FOR UPDATE',
807
+        v_table_sql
808
+    )
809
+    INTO v_current
810
+    USING p_device_id;
811
+
812
+    GET DIAGNOSTICS v_rowcount = ROW_COUNT;
813
+    v_has_current := v_rowcount > 0;
814
+
815
+    IF p_value IS NULL AND NOT p_allow_null THEN
816
+        RAISE EXCEPTION 'metric table % does not allow explicit NULL measurements', v_table_name;
817
+    END IF;
818
+
819
+    IF NOT v_has_current THEN
820
+        EXECUTE format(
821
+            'INSERT INTO %s (
822
+                device_pk,
823
+                device_id,
824
+                start_time,
825
+                end_time,
826
+                value,
827
+                samples_count,
828
+                policy_id
829
+             )
830
+             VALUES ($1, $2, $3, NULL, $4, 1, $5)',
831
+            v_table_sql
832
+        )
833
+        USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id;
834
+
835
+        RETURN CASE WHEN p_value IS NULL THEN 'opened_null' ELSE 'opened' END;
836
+    END IF;
837
+
838
+    v_policy_changed := v_current.policy_id IS DISTINCT FROM p_policy_id;
839
+
840
+    IF v_policy_changed
841
+       AND p_policy_valid_from > v_current.start_time THEN
842
+        EXECUTE format(
843
+            'UPDATE %s
844
+             SET end_time = $1
845
+             WHERE segment_id = $2',
846
+            v_table_sql
847
+        )
848
+        USING p_policy_valid_from, v_current.segment_id;
849
+
850
+        EXECUTE format(
851
+            'INSERT INTO %s (
852
+                device_pk,
853
+                device_id,
854
+                start_time,
855
+                end_time,
856
+                value,
857
+                samples_count,
858
+                policy_id
859
+             )
860
+             -- samples_count = 0 marks a synthetic continuation across a policy boundary.
861
+             VALUES ($1, $2, $3, NULL, $4, 0, $5)
862
+             RETURNING segment_id, device_pk, device_id, start_time, end_time, value, samples_count, policy_id',
863
+            v_table_sql
864
+        )
865
+        INTO v_current
866
+        USING v_current.device_pk, v_current.device_id, p_policy_valid_from, v_current.value, p_policy_id;
867
+
868
+        v_policy_changed := false;
869
+        v_effective_last_observed_at := GREATEST(
870
+            COALESCE(p_last_observed_at, p_policy_valid_from),
871
+            p_policy_valid_from
872
+        );
873
+    END IF;
874
+
875
+    IF v_effective_last_observed_at IS NOT NULL
876
+       AND p_observed_at - v_effective_last_observed_at > p_max_sampling_interval
877
+       AND v_current.value IS NOT NULL THEN
878
+        v_gap_start := v_effective_last_observed_at + p_max_sampling_interval;
879
+
880
+        EXECUTE format(
881
+            'UPDATE %s
882
+             SET end_time = $1
883
+             WHERE segment_id = $2',
884
+            v_table_sql
885
+        )
886
+        USING v_gap_start, v_current.segment_id;
887
+
888
+        IF p_value IS NULL THEN
889
+            EXECUTE format(
890
+                'INSERT INTO %s (
891
+                    device_pk,
892
+                    device_id,
893
+                    start_time,
894
+                    end_time,
895
+                    value,
896
+                    samples_count,
897
+                    policy_id
898
+                 )
899
+                 VALUES ($1, $2, $3, NULL, NULL, 1, $4)',
900
+                v_table_sql
901
+            )
902
+            USING p_device_pk, p_device_id, v_gap_start, p_policy_id;
903
+
904
+            RETURN 'gap_to_null';
905
+        END IF;
906
+
907
+        EXECUTE format(
908
+            'INSERT INTO %s (
909
+                device_pk,
910
+                device_id,
911
+                start_time,
912
+                end_time,
913
+                value,
914
+                samples_count,
915
+                policy_id
916
+             )
917
+             -- samples_count = 0 marks a historian-generated gap interval.
918
+             VALUES ($1, $2, $3, $4, NULL, 0, $5)',
919
+            v_table_sql
920
+        )
921
+        USING p_device_pk, p_device_id, v_gap_start, p_observed_at, p_policy_id;
922
+
923
+        EXECUTE format(
924
+            'INSERT INTO %s (
925
+                device_pk,
926
+                device_id,
927
+                start_time,
928
+                end_time,
929
+                value,
930
+                samples_count,
931
+                policy_id
932
+             )
933
+             VALUES ($1, $2, $3, NULL, $4, 1, $5)',
934
+            v_table_sql
935
+        )
936
+        USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id;
937
+
938
+        RETURN 'gap_split';
939
+    END IF;
940
+
941
+    IF v_current.samples_count = 0
942
+       AND p_observed_at = v_current.start_time THEN
943
+        IF p_value IS NULL THEN
944
+            EXECUTE format(
945
+                'UPDATE %s
946
+                 SET value = NULL,
947
+                     samples_count = 1
948
+                 WHERE segment_id = $1',
949
+                v_table_sql
950
+            )
951
+            USING v_current.segment_id;
952
+
953
+            RETURN CASE
954
+                WHEN v_current.value IS NULL THEN 'extended_null'
955
+                ELSE 'value_to_null'
956
+            END;
957
+        END IF;
958
+
959
+        IF v_current.value IS NULL THEN
960
+            EXECUTE format(
961
+                'UPDATE %s
962
+                 SET value = $1,
963
+                     samples_count = 1
964
+                 WHERE segment_id = $2',
965
+                v_table_sql
966
+            )
967
+            USING p_value, v_current.segment_id;
968
+
969
+            RETURN 'null_to_value';
970
+        END IF;
971
+
972
+        CASE p_comparison_mode
973
+            WHEN 'epsilon' THEN
974
+                EXECUTE
975
+                    'SELECT abs($1::double precision - $2::double precision) <= $3'
976
+                INTO v_equal
977
+                USING v_current.value, p_value, p_epsilon;
978
+            WHEN 'exact' THEN
979
+                EXECUTE
980
+                    'SELECT $1 IS NOT DISTINCT FROM $2'
981
+                INTO v_equal
982
+                USING v_current.value, p_value;
983
+            ELSE
984
+                RAISE EXCEPTION 'unsupported comparison_mode % for table %', p_comparison_mode, v_table_name;
985
+        END CASE;
986
+
987
+        EXECUTE format(
988
+            'UPDATE %s
989
+             SET value = $1,
990
+                 samples_count = 1
991
+             WHERE segment_id = $2',
992
+            v_table_sql
993
+        )
994
+        USING p_value, v_current.segment_id;
995
+
996
+        RETURN CASE
997
+            WHEN v_equal THEN 'extended'
998
+            ELSE 'split'
999
+        END;
1000
+    END IF;
1001
+
1002
+    IF v_current.value IS NULL THEN
1003
+        IF p_value IS NULL THEN
1004
+            IF v_policy_changed THEN
1005
+                EXECUTE format(
1006
+                    'UPDATE %s
1007
+                     SET end_time = $1
1008
+                     WHERE segment_id = $2',
1009
+                    v_table_sql
1010
+                )
1011
+                USING p_observed_at, v_current.segment_id;
1012
+
1013
+                EXECUTE format(
1014
+                    'INSERT INTO %s (
1015
+                        device_pk,
1016
+                        device_id,
1017
+                        start_time,
1018
+                        end_time,
1019
+                        value,
1020
+                        samples_count,
1021
+                        policy_id
1022
+                     )
1023
+                     VALUES ($1, $2, $3, NULL, NULL, 1, $4)',
1024
+                    v_table_sql
1025
+                )
1026
+                USING p_device_pk, p_device_id, p_observed_at, p_policy_id;
1027
+
1028
+                RETURN 'split';
1029
+            END IF;
1030
+
1031
+            EXECUTE format(
1032
+                'UPDATE %s
1033
+                 SET samples_count = samples_count + 1
1034
+                 WHERE segment_id = $1',
1035
+                v_table_sql
1036
+            )
1037
+            USING v_current.segment_id;
1038
+
1039
+            RETURN 'extended_null';
1040
+        END IF;
1041
+
1042
+        EXECUTE format(
1043
+            'UPDATE %s
1044
+             SET end_time = $1
1045
+             WHERE segment_id = $2',
1046
+            v_table_sql
1047
+        )
1048
+        USING p_observed_at, v_current.segment_id;
1049
+
1050
+        EXECUTE format(
1051
+            'INSERT INTO %s (
1052
+                device_pk,
1053
+                device_id,
1054
+                start_time,
1055
+                end_time,
1056
+                value,
1057
+                samples_count,
1058
+                policy_id
1059
+             )
1060
+             VALUES ($1, $2, $3, NULL, $4, 1, $5)',
1061
+            v_table_sql
1062
+        )
1063
+        USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id;
1064
+
1065
+        RETURN 'null_to_value';
1066
+    END IF;
1067
+
1068
+    IF p_value IS NULL THEN
1069
+        EXECUTE format(
1070
+            'UPDATE %s
1071
+             SET end_time = $1
1072
+             WHERE segment_id = $2',
1073
+            v_table_sql
1074
+        )
1075
+        USING p_observed_at, v_current.segment_id;
1076
+
1077
+        EXECUTE format(
1078
+            'INSERT INTO %s (
1079
+                device_pk,
1080
+                device_id,
1081
+                start_time,
1082
+                end_time,
1083
+                value,
1084
+                samples_count,
1085
+                policy_id
1086
+             )
1087
+             VALUES ($1, $2, $3, NULL, NULL, 1, $4)',
1088
+            v_table_sql
1089
+        )
1090
+        USING p_device_pk, p_device_id, p_observed_at, p_policy_id;
1091
+
1092
+        RETURN 'value_to_null';
1093
+    END IF;
1094
+
1095
+    CASE p_comparison_mode
1096
+        WHEN 'epsilon' THEN
1097
+            EXECUTE
1098
+                'SELECT abs($1::double precision - $2::double precision) <= $3'
1099
+            INTO v_equal
1100
+            USING v_current.value, p_value, p_epsilon;
1101
+        WHEN 'exact' THEN
1102
+            EXECUTE
1103
+                'SELECT $1 IS NOT DISTINCT FROM $2'
1104
+            INTO v_equal
1105
+            USING v_current.value, p_value;
1106
+        ELSE
1107
+            RAISE EXCEPTION 'unsupported comparison_mode % for table %', p_comparison_mode, v_table_name;
1108
+    END CASE;
1109
+
1110
+    IF v_equal AND NOT v_policy_changed THEN
1111
+        EXECUTE format(
1112
+            'UPDATE %s
1113
+             SET samples_count = samples_count + 1
1114
+             WHERE segment_id = $1',
1115
+            v_table_sql
1116
+        )
1117
+        USING v_current.segment_id;
1118
+
1119
+        RETURN 'extended';
1120
+    END IF;
1121
+
1122
+    EXECUTE format(
1123
+        'UPDATE %s
1124
+         SET end_time = $1
1125
+         WHERE segment_id = $2',
1126
+        v_table_sql
1127
+    )
1128
+    USING p_observed_at, v_current.segment_id;
1129
+
1130
+    EXECUTE format(
1131
+        'INSERT INTO %s (
1132
+            device_pk,
1133
+            device_id,
1134
+            start_time,
1135
+            end_time,
1136
+            value,
1137
+            samples_count,
1138
+            policy_id
1139
+         )
1140
+         VALUES ($1, $2, $3, NULL, $4, 1, $5)',
1141
+        v_table_sql
1142
+    )
1143
+    USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id;
1144
+
1145
+    RETURN 'split';
1146
+END;
1147
+$$;
1148
+
1149
+CREATE OR REPLACE FUNCTION telemetry.ingest_measurement(
1150
+    p_metric_name text,
1151
+    p_device_id text,
1152
+    p_value double precision,
1153
+    p_observed_at timestamptz
1154
+)
1155
+RETURNS TABLE (
1156
+    metric_name text,
1157
+    device_id text,
1158
+    table_name text,
1159
+    normalized_value double precision,
1160
+    action text
1161
+)
1162
+LANGUAGE plpgsql
1163
+AS $$
1164
+DECLARE
1165
+    v_metric telemetry.metrics%ROWTYPE;
1166
+    v_policy telemetry.metric_policies%ROWTYPE;
1167
+    v_device telemetry.devices%ROWTYPE;
1168
+    v_last_observed_at timestamptz;
1169
+    v_normalized double precision;
1170
+    v_action text;
1171
+    v_metric_lock_key integer;
1172
+    v_device_lock_key integer;
1173
+BEGIN
1174
+    IF p_metric_name IS NULL OR btrim(p_metric_name) = '' THEN
1175
+        RAISE EXCEPTION 'metric_name is required';
1176
+    END IF;
1177
+
1178
+    IF p_device_id IS NULL OR btrim(p_device_id) = '' THEN
1179
+        RAISE EXCEPTION 'device_id is required';
1180
+    END IF;
1181
+
1182
+    IF p_observed_at IS NULL THEN
1183
+        RAISE EXCEPTION 'observed_at is required';
1184
+    END IF;
1185
+
1186
+    v_metric := telemetry.require_metric(p_metric_name);
1187
+    PERFORM telemetry.ensure_device(p_device_id);
1188
+    v_device := telemetry.require_device(p_device_id);
1189
+
1190
+    -- 'state' is reserved for future implementation of discrete enumerated
1191
+    -- machine states, likely stored as smallint. It will use the same segment
1192
+    -- logic as boolean metrics, but with exact comparison instead of epsilon.
1193
+    CASE v_metric.metric_type
1194
+        WHEN 'numeric' THEN
1195
+            NULL;
1196
+        WHEN 'boolean' THEN
1197
+            RAISE EXCEPTION 'metric % is boolean; use the boolean overload of ingest_measurement(...)', p_metric_name;
1198
+        WHEN 'state' THEN
1199
+            RAISE EXCEPTION 'metric % is state; state ingestion is not implemented yet', p_metric_name;
1200
+        ELSE
1201
+            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
1202
+    END CASE;
1203
+
1204
+    v_metric_lock_key := telemetry.lock_key_from_text(p_metric_name);
1205
+    v_device_lock_key := telemetry.lock_key_from_text(p_device_id);
1206
+    PERFORM pg_advisory_xact_lock(v_metric_lock_key, v_device_lock_key);
1207
+    v_last_observed_at := telemetry.assert_append_only(p_metric_name, p_device_id, p_observed_at);
1208
+    v_policy := telemetry.require_metric_policy(p_metric_name, p_observed_at);
1209
+
1210
+    IF p_value IS NULL THEN
1211
+        v_normalized := NULL;
1212
+    ELSE
1213
+        v_normalized := telemetry.round_to_increment(p_value, v_policy.rounding_precision);
1214
+
1215
+        IF v_policy.min_value IS NOT NULL AND v_normalized < v_policy.min_value THEN
1216
+            RAISE EXCEPTION 'value % is below min_value % for metric %',
1217
+                v_normalized, v_policy.min_value, p_metric_name;
1218
+        END IF;
1219
+
1220
+        IF v_policy.max_value IS NOT NULL AND v_normalized > v_policy.max_value THEN
1221
+            RAISE EXCEPTION 'value % is above max_value % for metric %',
1222
+                v_normalized, v_policy.max_value, p_metric_name;
1223
+        END IF;
1224
+    END IF;
1225
+
1226
+    v_action := telemetry.ingest_segment(
1227
+        v_metric.table_name,
1228
+        v_metric.metric_type,
1229
+        v_policy.comparison_mode,
1230
+        v_policy.policy_id,
1231
+        v_policy.valid_from,
1232
+        v_device.device_pk,
1233
+        p_device_id,
1234
+        v_normalized,
1235
+        p_observed_at,
1236
+        v_last_observed_at,
1237
+        v_policy.epsilon,
1238
+        v_policy.max_sampling_interval,
1239
+        v_policy.allow_null
1240
+    );
1241
+
1242
+    PERFORM telemetry.bump_watermark(p_metric_name, p_device_id, p_observed_at);
1243
+    PERFORM telemetry.touch_device_last_seen(p_device_id, p_observed_at);
1244
+
1245
+    RETURN QUERY
1246
+    SELECT
1247
+        p_metric_name,
1248
+        p_device_id,
1249
+        v_metric.table_name,
1250
+        v_normalized,
1251
+        v_action;
1252
+END;
1253
+$$;
1254
+
1255
+CREATE OR REPLACE FUNCTION telemetry.ingest_measurement(
1256
+    p_metric_name text,
1257
+    p_device_id text,
1258
+    p_value boolean,
1259
+    p_observed_at timestamptz
1260
+)
1261
+RETURNS TABLE (
1262
+    metric_name text,
1263
+    device_id text,
1264
+    table_name text,
1265
+    normalized_value boolean,
1266
+    action text
1267
+)
1268
+LANGUAGE plpgsql
1269
+AS $$
1270
+DECLARE
1271
+    v_metric telemetry.metrics%ROWTYPE;
1272
+    v_policy telemetry.metric_policies%ROWTYPE;
1273
+    v_device telemetry.devices%ROWTYPE;
1274
+    v_last_observed_at timestamptz;
1275
+    v_action text;
1276
+    v_metric_lock_key integer;
1277
+    v_device_lock_key integer;
1278
+BEGIN
1279
+    IF p_metric_name IS NULL OR btrim(p_metric_name) = '' THEN
1280
+        RAISE EXCEPTION 'metric_name is required';
1281
+    END IF;
1282
+
1283
+    IF p_device_id IS NULL OR btrim(p_device_id) = '' THEN
1284
+        RAISE EXCEPTION 'device_id is required';
1285
+    END IF;
1286
+
1287
+    IF p_observed_at IS NULL THEN
1288
+        RAISE EXCEPTION 'observed_at is required';
1289
+    END IF;
1290
+
1291
+    v_metric := telemetry.require_metric(p_metric_name);
1292
+    PERFORM telemetry.ensure_device(p_device_id);
1293
+    v_device := telemetry.require_device(p_device_id);
1294
+
1295
+    -- 'state' is reserved for future implementation of discrete enumerated
1296
+    -- machine states, likely stored as smallint. It will use the same segment
1297
+    -- logic as boolean metrics, but with exact comparison instead of epsilon.
1298
+    CASE v_metric.metric_type
1299
+        WHEN 'boolean' THEN
1300
+            NULL;
1301
+        WHEN 'numeric' THEN
1302
+            RAISE EXCEPTION 'metric % is numeric; use the numeric overload of ingest_measurement(...)', p_metric_name;
1303
+        WHEN 'state' THEN
1304
+            RAISE EXCEPTION 'metric % is state; state ingestion is not implemented yet', p_metric_name;
1305
+        ELSE
1306
+            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
1307
+    END CASE;
1308
+
1309
+    v_metric_lock_key := telemetry.lock_key_from_text(p_metric_name);
1310
+    v_device_lock_key := telemetry.lock_key_from_text(p_device_id);
1311
+    PERFORM pg_advisory_xact_lock(v_metric_lock_key, v_device_lock_key);
1312
+    v_last_observed_at := telemetry.assert_append_only(p_metric_name, p_device_id, p_observed_at);
1313
+    v_policy := telemetry.require_metric_policy(p_metric_name, p_observed_at);
1314
+
1315
+    v_action := telemetry.ingest_segment(
1316
+        v_metric.table_name,
1317
+        v_metric.metric_type,
1318
+        v_policy.comparison_mode,
1319
+        v_policy.policy_id,
1320
+        v_policy.valid_from,
1321
+        v_device.device_pk,
1322
+        p_device_id,
1323
+        p_value,
1324
+        p_observed_at,
1325
+        v_last_observed_at,
1326
+        v_policy.epsilon,
1327
+        v_policy.max_sampling_interval,
1328
+        v_policy.allow_null
1329
+    );
1330
+
1331
+    PERFORM telemetry.bump_watermark(p_metric_name, p_device_id, p_observed_at);
1332
+    PERFORM telemetry.touch_device_last_seen(p_device_id, p_observed_at);
1333
+
1334
+    RETURN QUERY
1335
+    SELECT
1336
+        p_metric_name,
1337
+        p_device_id,
1338
+        v_metric.table_name,
1339
+        p_value,
1340
+        v_action;
1341
+END;
1342
+$$;
1343
+
1344
+-- Query functions.
1345
+
1346
+CREATE OR REPLACE FUNCTION telemetry.value_at_from_table(
1347
+    p_table_name text,
1348
+    p_metric_name text,
1349
+    p_device_id text,
1350
+    p_at timestamptz
1351
+)
1352
+RETURNS jsonb
1353
+LANGUAGE plpgsql
1354
+STABLE
1355
+AS $$
1356
+DECLARE
1357
+    v_value jsonb;
1358
+    v_table_sql text := telemetry.metric_table_sql(p_table_name);
1359
+BEGIN
1360
+    EXECUTE format(
1361
+        'WITH watermark AS (
1362
+             SELECT w.last_observed_at
1363
+             FROM telemetry.metric_device_watermarks AS w
1364
+             WHERE w.metric_name = $1
1365
+               AND w.device_id = $2
1366
+         )
1367
+         SELECT to_jsonb(s.value)
1368
+         FROM %s AS s
1369
+         JOIN telemetry.metric_policies AS mp
1370
+           ON mp.policy_id = s.policy_id
1371
+         LEFT JOIN watermark AS w ON true
1372
+         WHERE s.device_id = $2
1373
+           AND tstzrange(
1374
+                 s.start_time,
1375
+                 CASE
1376
+                     WHEN s.end_time IS NOT NULL THEN s.end_time
1377
+                     WHEN s.value IS NULL THEN ''infinity''::timestamptz
1378
+                     ELSE COALESCE(
1379
+                         GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
1380
+                         + mp.max_sampling_interval,
1381
+                         ''infinity''::timestamptz
1382
+                     )
1383
+                 END,
1384
+                 ''[)''
1385
+               ) @> $3
1386
+         LIMIT 1',
1387
+        v_table_sql
1388
+    )
1389
+    INTO v_value
1390
+    USING p_metric_name, p_device_id, p_at;
1391
+
1392
+    RETURN v_value;
1393
+END;
1394
+$$;
1395
+
1396
+CREATE OR REPLACE FUNCTION telemetry.value_at(
1397
+    p_metric_name text,
1398
+    p_device_id text,
1399
+    p_at timestamptz
1400
+)
1401
+RETURNS double precision
1402
+LANGUAGE plpgsql
1403
+STABLE
1404
+AS $$
1405
+DECLARE
1406
+    v_metric telemetry.metrics%ROWTYPE;
1407
+    v_value jsonb;
1408
+BEGIN
1409
+    v_metric := telemetry.require_metric(p_metric_name);
1410
+
1411
+    -- 'state' is reserved for future implementation of discrete enumerated
1412
+    -- machine states, likely stored as smallint. It will use the same segment
1413
+    -- logic as boolean metrics, but with exact comparison instead of epsilon.
1414
+    CASE v_metric.metric_type
1415
+        WHEN 'numeric' THEN
1416
+            v_value := telemetry.value_at_from_table(
1417
+                v_metric.table_name,
1418
+                p_metric_name,
1419
+                p_device_id,
1420
+                p_at
1421
+            );
1422
+
1423
+            RETURN CASE
1424
+                WHEN v_value IS NULL THEN NULL
1425
+                -- PostgreSQL must cast jsonb scalars via text first; direct
1426
+                -- casts such as (v_value)::double precision are not supported.
1427
+                ELSE v_value::text::double precision
1428
+            END;
1429
+        WHEN 'boolean' THEN
1430
+            RAISE EXCEPTION 'metric % is boolean; use telemetry.boolean_value_at(...)', p_metric_name;
1431
+        WHEN 'state' THEN
1432
+            RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
1433
+        ELSE
1434
+            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
1435
+    END CASE;
1436
+END;
1437
+$$;
1438
+
1439
+CREATE OR REPLACE FUNCTION telemetry.boolean_value_at(
1440
+    p_metric_name text,
1441
+    p_device_id text,
1442
+    p_at timestamptz
1443
+)
1444
+RETURNS boolean
1445
+LANGUAGE plpgsql
1446
+STABLE
1447
+AS $$
1448
+DECLARE
1449
+    v_metric telemetry.metrics%ROWTYPE;
1450
+    v_value jsonb;
1451
+BEGIN
1452
+    v_metric := telemetry.require_metric(p_metric_name);
1453
+
1454
+    -- 'state' is reserved for future implementation of discrete enumerated
1455
+    -- machine states, likely stored as smallint. It will use the same segment
1456
+    -- logic as boolean metrics, but with exact comparison instead of epsilon.
1457
+    CASE v_metric.metric_type
1458
+        WHEN 'boolean' THEN
1459
+            v_value := telemetry.value_at_from_table(
1460
+                v_metric.table_name,
1461
+                p_metric_name,
1462
+                p_device_id,
1463
+                p_at
1464
+            );
1465
+
1466
+            RETURN CASE
1467
+                WHEN v_value IS NULL THEN NULL
1468
+                -- PostgreSQL must cast jsonb scalars via text first; direct
1469
+                -- casts such as (v_value)::boolean are not supported.
1470
+                ELSE v_value::text::boolean
1471
+            END;
1472
+        WHEN 'numeric' THEN
1473
+            RAISE EXCEPTION 'metric % is numeric; use telemetry.value_at(...)', p_metric_name;
1474
+        WHEN 'state' THEN
1475
+            RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
1476
+        ELSE
1477
+            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
1478
+    END CASE;
1479
+END;
1480
+$$;
1481
+
1482
+CREATE OR REPLACE FUNCTION telemetry.segments_between_from_table(
1483
+    p_table_name text,
1484
+    p_metric_name text,
1485
+    p_device_id text,
1486
+    p_from timestamptz,
1487
+    p_to timestamptz
1488
+)
1489
+RETURNS TABLE (
1490
+    device_id text,
1491
+    start_time timestamptz,
1492
+    end_time timestamptz,
1493
+    value_json jsonb,
1494
+    samples_count integer
1495
+)
1496
+LANGUAGE plpgsql
1497
+STABLE
1498
+AS $$
1499
+DECLARE
1500
+    v_table_sql text := telemetry.metric_table_sql(p_table_name);
1501
+BEGIN
1502
+    RETURN QUERY EXECUTE format(
1503
+        'WITH watermark AS (
1504
+             SELECT w.last_observed_at
1505
+             FROM telemetry.metric_device_watermarks AS w
1506
+             WHERE w.metric_name = $1
1507
+               AND w.device_id = $2
1508
+         ),
1509
+         open_segment AS (
1510
+             SELECT s.*
1511
+             FROM %s AS s
1512
+             WHERE s.device_id = $2
1513
+               AND s.end_time IS NULL
1514
+             LIMIT 1
1515
+         ),
1516
+         stored_segments AS (
1517
+             SELECT
1518
+                 s.device_id,
1519
+                 GREATEST(s.start_time, $3) AS start_time,
1520
+                 LEAST(
1521
+                     CASE
1522
+                         WHEN s.end_time IS NOT NULL THEN s.end_time
1523
+                         WHEN s.value IS NULL THEN $4
1524
+                         ELSE LEAST(
1525
+                             COALESCE(
1526
+                                 GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
1527
+                                 + mp.max_sampling_interval,
1528
+                                 $4
1529
+                             ),
1530
+                             $4
1531
+                         )
1532
+                     END,
1533
+                     $4
1534
+                 ) AS end_time,
1535
+                 to_jsonb(s.value) AS value_json,
1536
+                 s.samples_count
1537
+             FROM %s AS s
1538
+             JOIN telemetry.metric_policies AS mp
1539
+               ON mp.policy_id = s.policy_id
1540
+             LEFT JOIN watermark AS w ON true
1541
+             WHERE s.device_id = $2
1542
+               AND tstzrange(
1543
+                     s.start_time,
1544
+                     CASE
1545
+                         WHEN s.end_time IS NOT NULL THEN s.end_time
1546
+                         WHEN s.value IS NULL THEN ''infinity''::timestamptz
1547
+                         ELSE COALESCE(
1548
+                             GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
1549
+                             + mp.max_sampling_interval,
1550
+                             ''infinity''::timestamptz
1551
+                         )
1552
+                     END,
1553
+                     ''[)''
1554
+                   ) && tstzrange($3, $4, ''[)'')
1555
+         ),
1556
+         synthetic_tail AS (
1557
+             SELECT
1558
+                 $2::text AS device_id,
1559
+                 GREATEST(
1560
+                     GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
1561
+                     + mp.max_sampling_interval,
1562
+                     $3
1563
+                 ) AS start_time,
1564
+                 $4 AS end_time,
1565
+                 NULL::jsonb AS value_json,
1566
+                 0 AS samples_count
1567
+             FROM open_segment AS s
1568
+             JOIN telemetry.metric_policies AS mp
1569
+               ON mp.policy_id = s.policy_id
1570
+             JOIN watermark AS w ON true
1571
+             WHERE s.value IS NOT NULL
1572
+               AND GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
1573
+                   + mp.max_sampling_interval < $4
1574
+         )
1575
+         SELECT *
1576
+         FROM (
1577
+             SELECT *
1578
+             FROM stored_segments
1579
+             WHERE end_time > start_time
1580
+
1581
+             UNION ALL
1582
+
1583
+             SELECT *
1584
+             FROM synthetic_tail
1585
+             WHERE end_time > start_time
1586
+         ) AS combined_segments
1587
+         ORDER BY start_time',
1588
+        v_table_sql,
1589
+        v_table_sql
1590
+    )
1591
+    USING p_metric_name, p_device_id, p_from, p_to;
1592
+END;
1593
+$$;
1594
+
1595
+CREATE OR REPLACE FUNCTION telemetry.metric_segments(
1596
+    p_metric_name text,
1597
+    p_device_id text,
1598
+    p_from timestamptz,
1599
+    p_to timestamptz
1600
+)
1601
+RETURNS TABLE (
1602
+    device_id text,
1603
+    start_time timestamptz,
1604
+    end_time timestamptz,
1605
+    value double precision,
1606
+    samples_count integer
1607
+)
1608
+LANGUAGE plpgsql
1609
+STABLE
1610
+AS $$
1611
+DECLARE
1612
+    v_metric telemetry.metrics%ROWTYPE;
1613
+BEGIN
1614
+    IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN
1615
+        RAISE EXCEPTION 'p_from and p_to must define a non-empty interval';
1616
+    END IF;
1617
+
1618
+    v_metric := telemetry.require_metric(p_metric_name);
1619
+
1620
+    -- 'state' is reserved for future implementation of discrete enumerated
1621
+    -- machine states, likely stored as smallint. It will use the same segment
1622
+    -- logic as boolean metrics, but with exact comparison instead of epsilon.
1623
+    CASE v_metric.metric_type
1624
+        WHEN 'numeric' THEN
1625
+            RETURN QUERY
1626
+            SELECT
1627
+                s.device_id,
1628
+                s.start_time,
1629
+                s.end_time,
1630
+                CASE
1631
+                    WHEN s.value_json IS NULL THEN NULL
1632
+                    -- PostgreSQL must cast jsonb scalars via text first;
1633
+                    -- direct casts such as (s.value_json)::double precision
1634
+                    -- are not supported.
1635
+                    ELSE s.value_json::text::double precision
1636
+                END AS value,
1637
+                s.samples_count
1638
+            FROM telemetry.segments_between_from_table(
1639
+                v_metric.table_name,
1640
+                p_metric_name,
1641
+                p_device_id,
1642
+                p_from,
1643
+                p_to
1644
+            ) AS s;
1645
+        WHEN 'boolean' THEN
1646
+            RAISE EXCEPTION 'metric % is boolean; use telemetry.boolean_segments_between(...)', p_metric_name;
1647
+        WHEN 'state' THEN
1648
+            RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
1649
+        ELSE
1650
+            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
1651
+    END CASE;
1652
+END;
1653
+$$;
1654
+
1655
+CREATE OR REPLACE FUNCTION telemetry.boolean_segments_between(
1656
+    p_metric_name text,
1657
+    p_device_id text,
1658
+    p_from timestamptz,
1659
+    p_to timestamptz
1660
+)
1661
+RETURNS TABLE (
1662
+    device_id text,
1663
+    start_time timestamptz,
1664
+    end_time timestamptz,
1665
+    value boolean,
1666
+    samples_count integer
1667
+)
1668
+LANGUAGE plpgsql
1669
+STABLE
1670
+AS $$
1671
+DECLARE
1672
+    v_metric telemetry.metrics%ROWTYPE;
1673
+BEGIN
1674
+    IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN
1675
+        RAISE EXCEPTION 'p_from and p_to must define a non-empty interval';
1676
+    END IF;
1677
+
1678
+    v_metric := telemetry.require_metric(p_metric_name);
1679
+
1680
+    -- 'state' is reserved for future implementation of discrete enumerated
1681
+    -- machine states, likely stored as smallint. It will use the same segment
1682
+    -- logic as boolean metrics, but with exact comparison instead of epsilon.
1683
+    CASE v_metric.metric_type
1684
+        WHEN 'boolean' THEN
1685
+            RETURN QUERY
1686
+            SELECT
1687
+                s.device_id,
1688
+                s.start_time,
1689
+                s.end_time,
1690
+                CASE
1691
+                    WHEN s.value_json IS NULL THEN NULL
1692
+                    -- PostgreSQL must cast jsonb scalars via text first;
1693
+                    -- direct casts such as (s.value_json)::boolean are not
1694
+                    -- supported.
1695
+                    ELSE s.value_json::text::boolean
1696
+                END AS value,
1697
+                s.samples_count
1698
+            FROM telemetry.segments_between_from_table(
1699
+                v_metric.table_name,
1700
+                p_metric_name,
1701
+                p_device_id,
1702
+                p_from,
1703
+                p_to
1704
+            ) AS s;
1705
+        WHEN 'numeric' THEN
1706
+            RAISE EXCEPTION 'metric % is numeric; use telemetry.metric_segments(...)', p_metric_name;
1707
+        WHEN 'state' THEN
1708
+            RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
1709
+        ELSE
1710
+            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
1711
+    END CASE;
1712
+END;
1713
+$$;
1714
+
1715
+CREATE OR REPLACE FUNCTION telemetry.samples_from_table(
1716
+    p_table_name text,
1717
+    p_metric_name text,
1718
+    p_device_id text,
1719
+    p_from timestamptz,
1720
+    p_to timestamptz,
1721
+    p_points integer
1722
+)
1723
+RETURNS TABLE (
1724
+    sample_at timestamptz,
1725
+    value_json jsonb
1726
+)
1727
+LANGUAGE plpgsql
1728
+STABLE
1729
+AS $$
1730
+DECLARE
1731
+    v_table_sql text := telemetry.metric_table_sql(p_table_name);
1732
+BEGIN
1733
+    RETURN QUERY EXECUTE format(
1734
+        'WITH watermark AS (
1735
+             SELECT w.last_observed_at
1736
+             FROM telemetry.metric_device_watermarks AS w
1737
+             WHERE w.metric_name = $1
1738
+               AND w.device_id = $2
1739
+         ),
1740
+         sample_points AS (
1741
+             SELECT $3 + (($4 - $3) * gs.i::double precision / ($5 - 1)::double precision) AS sample_at
1742
+             FROM generate_series(0, $5 - 1) AS gs(i)
1743
+         )
1744
+         SELECT
1745
+             sp.sample_at,
1746
+             to_jsonb(seg.value) AS value_json
1747
+         FROM sample_points AS sp
1748
+         LEFT JOIN watermark AS w ON true
1749
+         LEFT JOIN LATERAL (
1750
+             SELECT s.value
1751
+             FROM %s AS s
1752
+             JOIN telemetry.metric_policies AS mp
1753
+               ON mp.policy_id = s.policy_id
1754
+             WHERE s.device_id = $2
1755
+               AND tstzrange(
1756
+                     s.start_time,
1757
+                     CASE
1758
+                         WHEN s.end_time IS NOT NULL THEN s.end_time
1759
+                         WHEN s.value IS NULL THEN ''infinity''::timestamptz
1760
+                         ELSE COALESCE(
1761
+                             GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
1762
+                             + mp.max_sampling_interval,
1763
+                             ''infinity''::timestamptz
1764
+                         )
1765
+                     END,
1766
+                     ''[)''
1767
+                   ) @> sp.sample_at
1768
+             LIMIT 1
1769
+         ) AS seg ON true
1770
+         ORDER BY sp.sample_at',
1771
+        v_table_sql
1772
+    )
1773
+    USING p_metric_name, p_device_id, p_from, p_to, p_points;
1774
+END;
1775
+$$;
1776
+
1777
+CREATE OR REPLACE FUNCTION telemetry.sample_metric(
1778
+    p_metric_name text,
1779
+    p_device_id text,
1780
+    p_from timestamptz,
1781
+    p_to timestamptz,
1782
+    p_points integer
1783
+)
1784
+RETURNS TABLE (
1785
+    sample_at timestamptz,
1786
+    value double precision
1787
+)
1788
+LANGUAGE plpgsql
1789
+STABLE
1790
+AS $$
1791
+DECLARE
1792
+    v_metric telemetry.metrics%ROWTYPE;
1793
+BEGIN
1794
+    IF p_points IS NULL OR p_points < 2 THEN
1795
+        RAISE EXCEPTION 'p_points must be at least 2';
1796
+    END IF;
1797
+
1798
+    IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN
1799
+        RAISE EXCEPTION 'p_from and p_to must define a non-empty interval';
1800
+    END IF;
1801
+
1802
+    v_metric := telemetry.require_metric(p_metric_name);
1803
+
1804
+    -- 'state' is reserved for future implementation of discrete enumerated
1805
+    -- machine states, likely stored as smallint. It will use the same segment
1806
+    -- logic as boolean metrics, but with exact comparison instead of epsilon.
1807
+    CASE v_metric.metric_type
1808
+        WHEN 'numeric' THEN
1809
+            RETURN QUERY
1810
+            SELECT
1811
+                s.sample_at,
1812
+                CASE
1813
+                    WHEN s.value_json IS NULL THEN NULL
1814
+                    -- PostgreSQL must cast jsonb scalars via text first;
1815
+                    -- direct casts such as (s.value_json)::double precision
1816
+                    -- are not supported.
1817
+                    ELSE s.value_json::text::double precision
1818
+                END AS value
1819
+            FROM telemetry.samples_from_table(
1820
+                v_metric.table_name,
1821
+                p_metric_name,
1822
+                p_device_id,
1823
+                p_from,
1824
+                p_to,
1825
+                p_points
1826
+            ) AS s;
1827
+        WHEN 'boolean' THEN
1828
+            RAISE EXCEPTION 'metric % is boolean; use telemetry.boolean_samples(...)', p_metric_name;
1829
+        WHEN 'state' THEN
1830
+            RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
1831
+        ELSE
1832
+            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
1833
+    END CASE;
1834
+END;
1835
+$$;
1836
+
1837
+CREATE OR REPLACE FUNCTION telemetry.boolean_samples(
1838
+    p_metric_name text,
1839
+    p_device_id text,
1840
+    p_from timestamptz,
1841
+    p_to timestamptz,
1842
+    p_points integer
1843
+)
1844
+RETURNS TABLE (
1845
+    sample_at timestamptz,
1846
+    value boolean
1847
+)
1848
+LANGUAGE plpgsql
1849
+STABLE
1850
+AS $$
1851
+DECLARE
1852
+    v_metric telemetry.metrics%ROWTYPE;
1853
+BEGIN
1854
+    IF p_points IS NULL OR p_points < 2 THEN
1855
+        RAISE EXCEPTION 'p_points must be at least 2';
1856
+    END IF;
1857
+
1858
+    IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN
1859
+        RAISE EXCEPTION 'p_from and p_to must define a non-empty interval';
1860
+    END IF;
1861
+
1862
+    v_metric := telemetry.require_metric(p_metric_name);
1863
+
1864
+    -- 'state' is reserved for future implementation of discrete enumerated
1865
+    -- machine states, likely stored as smallint. It will use the same segment
1866
+    -- logic as boolean metrics, but with exact comparison instead of epsilon.
1867
+    CASE v_metric.metric_type
1868
+        WHEN 'boolean' THEN
1869
+            RETURN QUERY
1870
+            SELECT
1871
+                s.sample_at,
1872
+                CASE
1873
+                    WHEN s.value_json IS NULL THEN NULL
1874
+                    -- PostgreSQL must cast jsonb scalars via text first;
1875
+                    -- direct casts such as (s.value_json)::boolean are not
1876
+                    -- supported.
1877
+                    ELSE s.value_json::text::boolean
1878
+                END AS value
1879
+            FROM telemetry.samples_from_table(
1880
+                v_metric.table_name,
1881
+                p_metric_name,
1882
+                p_device_id,
1883
+                p_from,
1884
+                p_to,
1885
+                p_points
1886
+            ) AS s;
1887
+        WHEN 'numeric' THEN
1888
+            RAISE EXCEPTION 'metric % is numeric; use telemetry.sample_metric(...)', p_metric_name;
1889
+        WHEN 'state' THEN
1890
+            RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
1891
+        ELSE
1892
+            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
1893
+    END CASE;
1894
+END;
1895
+$$;
1896
+
1897
+-- Maintenance utilities.
1898
+
1899
+CREATE OR REPLACE FUNCTION telemetry.verify_segments(p_metric_name text)
1900
+RETURNS TABLE (
1901
+    metric_name text,
1902
+    device_id text,
1903
+    issue text,
1904
+    segment_id bigint,
1905
+    related_segment_id bigint,
1906
+    start_time timestamptz,
1907
+    end_time timestamptz,
1908
+    details text
1909
+)
1910
+LANGUAGE plpgsql
1911
+STABLE
1912
+AS $$
1913
+DECLARE
1914
+    v_metric telemetry.metrics%ROWTYPE;
1915
+    v_table_sql text;
1916
+BEGIN
1917
+    v_metric := telemetry.require_metric(p_metric_name);
1918
+    v_table_sql := telemetry.metric_table_sql(v_metric.table_name);
1919
+
1920
+    RETURN QUERY EXECUTE format(
1921
+        'WITH ordered AS (
1922
+             SELECT
1923
+                 s.segment_id,
1924
+                 s.device_id,
1925
+                 s.policy_id,
1926
+                 s.start_time,
1927
+                 s.end_time,
1928
+                 lag(s.segment_id) OVER (
1929
+                     PARTITION BY s.device_id
1930
+                     ORDER BY s.start_time, s.segment_id
1931
+                 ) AS prev_segment_id,
1932
+                 lag(s.end_time) OVER (
1933
+                     PARTITION BY s.device_id
1934
+                     ORDER BY s.start_time, s.segment_id
1935
+                 ) AS prev_end_time,
1936
+                 lag(s.policy_id) OVER (
1937
+                     PARTITION BY s.device_id
1938
+                     ORDER BY s.start_time, s.segment_id
1939
+                 ) AS prev_policy_id
1940
+             FROM %s AS s
1941
+         ),
1942
+         open_counts AS (
1943
+             SELECT
1944
+                 s.device_id,
1945
+                 count(*)::integer AS open_count
1946
+             FROM %s AS s
1947
+             WHERE s.end_time IS NULL
1948
+             GROUP BY s.device_id
1949
+             HAVING count(*) > 1
1950
+         )
1951
+         SELECT
1952
+             $1::text AS metric_name,
1953
+             o.device_id,
1954
+             ''invalid_interval''::text AS issue,
1955
+             o.segment_id,
1956
+             NULL::bigint AS related_segment_id,
1957
+             o.start_time,
1958
+             o.end_time,
1959
+             ''end_time must be greater than start_time''::text AS details
1960
+         FROM ordered AS o
1961
+         WHERE o.end_time IS NOT NULL
1962
+           AND o.end_time <= o.start_time
1963
+
1964
+         UNION ALL
1965
+
1966
+         SELECT
1967
+             $1::text,
1968
+             oc.device_id,
1969
+             ''multiple_open_segments''::text,
1970
+             NULL::bigint,
1971
+             NULL::bigint,
1972
+             NULL::timestamptz,
1973
+             NULL::timestamptz,
1974
+             format(''%%s open segments found'', oc.open_count)
1975
+         FROM open_counts AS oc
1976
+
1977
+         UNION ALL
1978
+
1979
+         SELECT
1980
+             $1::text,
1981
+             o.device_id,
1982
+             ''overlap''::text,
1983
+             o.segment_id,
1984
+             o.prev_segment_id,
1985
+             o.start_time,
1986
+             o.end_time,
1987
+             format(''previous segment ends at %%s'', o.prev_end_time)
1988
+         FROM ordered AS o
1989
+         WHERE o.prev_end_time IS NOT NULL
1990
+           AND o.prev_end_time > o.start_time
1991
+
1992
+         UNION ALL
1993
+
1994
+         SELECT
1995
+             $1::text,
1996
+             o.device_id,
1997
+             ''unexpected_gap''::text,
1998
+             o.segment_id,
1999
+             o.prev_segment_id,
2000
+             o.start_time,
2001
+             o.end_time,
2002
+             format(
2003
+                 ''stored gap of %%s violates expected continuity (max_sampling_interval=%%s)'',
2004
+                 o.start_time - o.prev_end_time,
2005
+                 mp.max_sampling_interval
2006
+             )
2007
+         FROM ordered AS o
2008
+         JOIN telemetry.metric_policies AS mp
2009
+           ON mp.policy_id = o.prev_policy_id
2010
+         WHERE o.prev_end_time IS NOT NULL
2011
+           AND o.start_time > o.prev_end_time
2012
+
2013
+         ORDER BY device_id, start_time NULLS FIRST, segment_id NULLS FIRST',
2014
+        v_table_sql,
2015
+        v_table_sql
2016
+    )
2017
+    USING p_metric_name;
2018
+END;
2019
+$$;
2020
+
2021
+CREATE OR REPLACE FUNCTION telemetry.compact_segments(
2022
+    p_metric_name text,
2023
+    p_before timestamptz
2024
+)
2025
+RETURNS TABLE (
2026
+    device_id text,
2027
+    kept_segment_id bigint,
2028
+    merged_segments integer,
2029
+    start_time timestamptz,
2030
+    end_time timestamptz,
2031
+    total_samples integer
2032
+)
2033
+LANGUAGE plpgsql
2034
+AS $$
2035
+DECLARE
2036
+    v_metric telemetry.metrics%ROWTYPE;
2037
+    v_table_sql text;
2038
+    v_merge record;
2039
+BEGIN
2040
+    IF p_before IS NULL THEN
2041
+        RAISE EXCEPTION 'p_before is required';
2042
+    END IF;
2043
+
2044
+    v_metric := telemetry.require_metric(p_metric_name);
2045
+    v_table_sql := telemetry.metric_table_sql(v_metric.table_name);
2046
+
2047
+    FOR v_merge IN EXECUTE format(
2048
+        'WITH candidates AS (
2049
+             SELECT
2050
+                 s.segment_id,
2051
+                 s.device_id,
2052
+                 s.start_time,
2053
+                 s.end_time,
2054
+                 s.samples_count,
2055
+                 s.policy_id,
2056
+                 s.value,
2057
+                 lag(s.end_time) OVER (
2058
+                     PARTITION BY s.device_id
2059
+                     ORDER BY s.start_time, s.segment_id
2060
+                 ) AS prev_end_time,
2061
+                 lag(s.policy_id) OVER (
2062
+                     PARTITION BY s.device_id
2063
+                     ORDER BY s.start_time, s.segment_id
2064
+                 ) AS prev_policy_id,
2065
+                 lag(s.value) OVER (
2066
+                     PARTITION BY s.device_id
2067
+                     ORDER BY s.start_time, s.segment_id
2068
+                 ) AS prev_value
2069
+             FROM %s AS s
2070
+             WHERE s.end_time IS NOT NULL
2071
+               AND s.end_time <= $1
2072
+         ),
2073
+         grouped AS (
2074
+             SELECT
2075
+                 c.*,
2076
+                 sum(
2077
+                     CASE
2078
+                        WHEN c.prev_end_time IS NULL
2079
+                          OR c.start_time <> c.prev_end_time
2080
+                          OR c.policy_id IS DISTINCT FROM c.prev_policy_id
2081
+                          OR c.value IS DISTINCT FROM c.prev_value
2082
+                        THEN 1
2083
+                        ELSE 0
2084
+                     END
2085
+                 ) OVER (
2086
+                     PARTITION BY c.device_id
2087
+                     ORDER BY c.start_time, c.segment_id
2088
+                 ) AS grp
2089
+             FROM candidates AS c
2090
+         ),
2091
+         aggregates AS (
2092
+             SELECT
2093
+                 g.device_id,
2094
+                 (array_agg(g.segment_id ORDER BY g.start_time, g.segment_id))[1] AS keep_segment_id,
2095
+                 array_remove(
2096
+                     array_agg(g.segment_id ORDER BY g.start_time, g.segment_id),
2097
+                     (array_agg(g.segment_id ORDER BY g.start_time, g.segment_id))[1]
2098
+                 ) AS delete_segment_ids,
2099
+                 min(g.start_time) AS merged_start_time,
2100
+                 max(g.end_time) AS merged_end_time,
2101
+                 sum(g.samples_count)::integer AS total_samples,
2102
+                 count(*)::integer AS merged_segments
2103
+             FROM grouped AS g
2104
+             GROUP BY g.device_id, g.grp
2105
+             HAVING count(*) > 1
2106
+         )
2107
+         SELECT
2108
+             a.device_id,
2109
+             a.keep_segment_id,
2110
+             a.delete_segment_ids,
2111
+             a.merged_segments,
2112
+             a.merged_start_time,
2113
+             a.merged_end_time,
2114
+             a.total_samples
2115
+         FROM aggregates AS a
2116
+         ORDER BY a.device_id, a.merged_start_time',
2117
+        v_table_sql
2118
+    )
2119
+    USING p_before
2120
+    LOOP
2121
+        EXECUTE format(
2122
+            'DELETE FROM %s
2123
+             WHERE segment_id = ANY($1)',
2124
+            v_table_sql
2125
+        )
2126
+        USING v_merge.delete_segment_ids;
2127
+
2128
+        EXECUTE format(
2129
+            'UPDATE %s
2130
+             SET end_time = $1,
2131
+                 samples_count = $2
2132
+             WHERE segment_id = $3',
2133
+            v_table_sql
2134
+        )
2135
+        USING v_merge.merged_end_time, v_merge.total_samples, v_merge.keep_segment_id;
2136
+
2137
+        device_id := v_merge.device_id;
2138
+        kept_segment_id := v_merge.keep_segment_id;
2139
+        merged_segments := v_merge.merged_segments;
2140
+        start_time := v_merge.merged_start_time;
2141
+        end_time := v_merge.merged_end_time;
2142
+        total_samples := v_merge.total_samples;
2143
+        RETURN NEXT;
2144
+    END LOOP;
2145
+
2146
+    RETURN;
2147
+END;
2148
+$$;
2149
+
2150
+CREATE OR REPLACE FUNCTION telemetry.inactive_devices(p_threshold interval)
2151
+RETURNS TABLE (
2152
+    device_pk bigint,
2153
+    device_id text,
2154
+    device_type text,
2155
+    location text,
2156
+    last_seen timestamptz,
2157
+    inactive_for interval
2158
+)
2159
+LANGUAGE sql
2160
+STABLE
2161
+AS $$
2162
+    SELECT
2163
+        d.device_pk,
2164
+        d.device_id,
2165
+        d.device_type,
2166
+        d.location,
2167
+        d.last_seen,
2168
+        now() - d.last_seen AS inactive_for
2169
+    FROM telemetry.devices AS d
2170
+    WHERE d.last_seen IS NOT NULL
2171
+      AND now() - d.last_seen > $1
2172
+    ORDER BY inactive_for DESC, d.device_id;
2173
+$$;
2174
+
2175
+-- Example metric registrations.
2176
+
2177
+SELECT telemetry.register_numeric_metric(
2178
+    p_metric_name => 'ambient_temperature',
2179
+    p_table_name => 'ambient_temperature_segments',
2180
+    p_domain_name => 'environmental',
2181
+    p_epsilon => 0.05,
2182
+    p_min_value => -40,
2183
+    p_max_value => 60,
2184
+    p_rounding_precision => 0.1,
2185
+    p_max_sampling_interval => '5 minutes',
2186
+    p_allow_null => true
2187
+);
2188
+
2189
+SELECT telemetry.register_numeric_metric(
2190
+    p_metric_name => 'cpu_temperature',
2191
+    p_table_name => 'cpu_temperature_segments',
2192
+    p_domain_name => 'system',
2193
+    p_epsilon => 0.5,
2194
+    p_min_value => 0,
2195
+    p_max_value => 120,
2196
+    p_rounding_precision => 1,
2197
+    p_max_sampling_interval => '10 seconds',
2198
+    p_allow_null => true
2199
+);
2200
+
2201
+SELECT telemetry.register_numeric_metric(
2202
+    p_metric_name => 'humidity',
2203
+    p_table_name => 'humidity_segments',
2204
+    p_domain_name => 'environmental',
2205
+    p_epsilon => 0.5,
2206
+    p_min_value => 0,
2207
+    p_max_value => 100,
2208
+    p_rounding_precision => 0.1,
2209
+    p_max_sampling_interval => '5 minutes',
2210
+    p_allow_null => true
2211
+);
2212
+COMMIT;