telemetrydatabase / schema / telemetry_schema.sql
Newer Older
2212 lines | 66.699kb
Bogdan Timofte authored 2 weeks ago
1
BEGIN;
2

            
3
-- Extensions.
4

            
5
CREATE SCHEMA telemetry;
6
CREATE EXTENSION btree_gist;
7

            
8
-- Enums.
9

            
10
-- 'state' is reserved for future implementation of discrete enumerated
11
-- machine states such as thermostat_mode, pump_state, hvac_mode, or
12
-- door_state. Values will likely be stored as smallint and use the
13
-- same segment engine as boolean metrics, but with exact comparison
14
-- over enumerated values.
15
CREATE TYPE telemetry.metric_type_enum AS ENUM ('numeric', 'boolean', 'state');
16

            
17
CREATE TYPE telemetry.comparison_mode_enum AS ENUM ('epsilon', 'exact');
18

            
19
-- Registry utility functions.
20

            
21
CREATE OR REPLACE FUNCTION telemetry.normalize_metric_table_name(p_table_name text)
22
RETURNS text
23
LANGUAGE plpgsql
24
IMMUTABLE
25
AS $$
26
DECLARE
27
    v_parts text[];
28
BEGIN
29
    IF p_table_name IS NULL OR btrim(p_table_name) = '' THEN
30
        RAISE EXCEPTION 'table_name is required';
31
    END IF;
32

            
33
    v_parts := pg_catalog.parse_ident(p_table_name, false);
34

            
35
    CASE COALESCE(array_length(v_parts, 1), 0)
36
        WHEN 1 THEN
37
            RETURN v_parts[1];
38
        WHEN 2 THEN
39
            IF v_parts[1] <> 'telemetry' THEN
40
                RAISE EXCEPTION 'metric tables must live in schema telemetry: %', p_table_name;
41
            END IF;
42

            
43
            RETURN v_parts[2];
44
        ELSE
45
            RAISE EXCEPTION 'invalid metric table reference: %', p_table_name;
46
    END CASE;
47
END;
48
$$;
49

            
50
CREATE OR REPLACE FUNCTION telemetry.metric_table_sql(p_table_name text)
51
RETURNS text
52
LANGUAGE plpgsql
53
STABLE
54
AS $$
55
DECLARE
56
    v_table_name text := telemetry.normalize_metric_table_name(p_table_name);
57
    v_table_sql text := format('telemetry.%I', v_table_name);
58
BEGIN
59
    IF to_regclass(v_table_sql) IS NULL THEN
60
        RAISE EXCEPTION 'could not resolve metric table %', v_table_sql;
61
    END IF;
62

            
63
    RETURN v_table_sql;
64
END;
65
$$;
66

            
67
-- Core registry tables.
68

            
69
CREATE TABLE telemetry.metrics (
70
    metric_pk bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
71
    metric_name text NOT NULL UNIQUE,
72
    table_name text NOT NULL UNIQUE,
73
    domain_name text NOT NULL DEFAULT 'generic',
74
    metric_type telemetry.metric_type_enum NOT NULL,
75
    comparison_mode telemetry.comparison_mode_enum NOT NULL,
76
    epsilon double precision,
77
    min_value double precision,
78
    max_value double precision,
79
    rounding_precision double precision,
80
    allow_null boolean NOT NULL DEFAULT true,
81
    max_sampling_interval interval NOT NULL,
82
    created_at timestamptz NOT NULL DEFAULT now(),
83
    updated_at timestamptz NOT NULL DEFAULT now(),
84
    CHECK (min_value IS NULL OR max_value IS NULL OR min_value <= max_value),
85
    CONSTRAINT metrics_policy_shape_check
86
        CHECK (
87
            (metric_type = 'numeric'::telemetry.metric_type_enum
88
             AND comparison_mode = 'epsilon'::telemetry.comparison_mode_enum
89
             AND epsilon IS NOT NULL
90
             AND rounding_precision IS NOT NULL)
91
            OR
92
            (metric_type IN (
93
                 'boolean'::telemetry.metric_type_enum,
94
                 'state'::telemetry.metric_type_enum
95
             )
96
             AND comparison_mode = 'exact'::telemetry.comparison_mode_enum
97
             AND epsilon IS NULL
98
             AND min_value IS NULL
99
             AND max_value IS NULL
100
             AND rounding_precision IS NULL)
101
        ),
102
    CONSTRAINT metrics_table_name_check
103
        CHECK (table_name = telemetry.normalize_metric_table_name(table_name))
104
);
105

            
106
-- Policy tables.
107

            
108
CREATE TABLE telemetry.metric_policies (
109
    policy_id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
110
    metric_name text NOT NULL
111
        REFERENCES telemetry.metrics(metric_name)
112
        ON UPDATE CASCADE
113
        ON DELETE CASCADE,
114
    valid_from timestamptz NOT NULL,
115
    comparison_mode telemetry.comparison_mode_enum NOT NULL,
116
    epsilon double precision,
117
    min_value double precision,
118
    max_value double precision,
119
    rounding_precision double precision,
120
    allow_null boolean NOT NULL,
121
    max_sampling_interval interval NOT NULL,
122
    created_at timestamptz NOT NULL DEFAULT now(),
123
    UNIQUE (metric_name, valid_from)
124
);
125

            
126
CREATE TABLE telemetry.metric_retention_policies (
127
    metric_name text PRIMARY KEY
128
        REFERENCES telemetry.metrics(metric_name)
129
        ON UPDATE CASCADE
130
        ON DELETE CASCADE,
131
    raw_retention interval NULL,
132
    compact_after interval NULL
133
);
134

            
135
-- Device registry tables.
136

            
137
CREATE TABLE telemetry.devices (
138
    device_pk bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
139
    device_id text NOT NULL UNIQUE,
140
    device_type text NULL,
141
    location text NULL,
142
    metadata jsonb NULL,
143
    last_seen timestamptz NULL,
144
    created_at timestamptz NOT NULL DEFAULT now(),
145
    updated_at timestamptz NOT NULL DEFAULT now()
146
);
147

            
148
CREATE TABLE telemetry.metric_device_watermarks (
149
    metric_name text NOT NULL
150
        REFERENCES telemetry.metrics(metric_name)
151
        ON UPDATE CASCADE
152
        ON DELETE CASCADE,
153
    device_id text NOT NULL
154
        REFERENCES telemetry.devices(device_id)
155
        ON UPDATE CASCADE
156
        ON DELETE CASCADE,
157
    last_observed_at timestamptz NOT NULL,
158
    PRIMARY KEY (metric_name, device_id)
159
);
160

            
161
-- Runtime helpers.
162

            
163
CREATE OR REPLACE FUNCTION telemetry.round_to_increment(
164
    p_value double precision,
165
    p_increment double precision
166
)
167
RETURNS double precision
168
LANGUAGE sql
169
IMMUTABLE
170
STRICT
171
AS $$
172
    SELECT CASE
173
        WHEN p_increment <= 0 THEN p_value
174
        ELSE round(p_value / p_increment) * p_increment
175
    END;
176
$$;
177

            
178
CREATE OR REPLACE FUNCTION telemetry.ensure_device(p_device_id text)
179
RETURNS void
180
LANGUAGE sql
181
AS $$
182
    INSERT INTO telemetry.devices (device_id)
183
    VALUES ($1)
184
    ON CONFLICT (device_id) DO NOTHING;
185
$$;
186

            
187
CREATE OR REPLACE FUNCTION telemetry.touch_device_last_seen(
188
    p_device_id text,
189
    p_observed_at timestamptz
190
)
191
RETURNS void
192
LANGUAGE sql
193
AS $$
194
    UPDATE telemetry.devices
195
    SET last_seen = CASE
196
        WHEN last_seen IS NULL OR last_seen < $2 THEN $2
197
        ELSE last_seen
198
    END
199
    WHERE device_id = $1;
200
$$;
201

            
202
CREATE OR REPLACE FUNCTION telemetry.require_device(p_device_id text)
203
RETURNS telemetry.devices
204
LANGUAGE plpgsql
205
STABLE
206
AS $$
207
DECLARE
208
    v_device telemetry.devices%ROWTYPE;
209
BEGIN
210
    SELECT *
211
    INTO v_device
212
    FROM telemetry.devices AS d
213
    WHERE d.device_id = p_device_id;
214

            
215
    IF NOT FOUND THEN
216
        RAISE EXCEPTION 'unknown device: %', p_device_id;
217
    END IF;
218

            
219
    RETURN v_device;
220
END;
221
$$;
222

            
223
CREATE OR REPLACE FUNCTION telemetry.lock_key_from_text(p_value text)
224
RETURNS integer
225
LANGUAGE sql
226
IMMUTABLE
227
STRICT
228
AS $$
229
    SELECT (hashtextextended($1, 0) % 2147483647)::integer;
230
$$;
231

            
232
-- Segment template and table creation.
233

            
234
CREATE OR REPLACE FUNCTION telemetry.create_segment_table(
235
    p_table_name text,
236
    p_value_type text
237
)
238
RETURNS void
239
LANGUAGE plpgsql
240
AS $$
241
DECLARE
242
    v_base_table_name text := telemetry.normalize_metric_table_name(p_table_name);
243
    v_table_name text := format('telemetry.%I', v_base_table_name);
244
    v_metric_name text;
245
    v_initial_policy_id bigint;
246
    v_open_idx text := format('%s_open_segment_uidx', v_base_table_name);
247
    v_start_idx text := format('%s_device_start_cover_idx', v_base_table_name);
248
    v_device_period_gist_idx text := format('%s_device_period_gist', v_base_table_name);
249
    v_period_excl text := format('%s_device_period_excl', v_base_table_name);
250
    v_device_pk_fk text := format('%s_device_pk_fkey', v_base_table_name);
251
    v_device_fk text := format('%s_device_id_fkey', v_base_table_name);
252
    v_policy_fk text := format('%s_policy_id_fkey', v_base_table_name);
253
    v_samples_check text := format('%s_samples_count_check', v_base_table_name);
254
    v_end_time_check text := format('%s_check', v_base_table_name);
255
BEGIN
256
    IF p_value_type NOT IN ('double precision', 'boolean', 'smallint') THEN
257
        RAISE EXCEPTION 'unsupported segment value type: %', p_value_type;
258
    END IF;
259

            
260
    SELECT m.metric_name
261
    INTO v_metric_name
262
    FROM telemetry.metrics AS m
263
    WHERE m.table_name = v_base_table_name;
264

            
265
    IF v_metric_name IS NULL THEN
266
        RAISE EXCEPTION
267
            'metric metadata must exist before creating segment table %',
268
            v_base_table_name;
269
    END IF;
270

            
271
    SELECT mp.policy_id
272
    INTO v_initial_policy_id
273
    FROM telemetry.metric_policies AS mp
274
    WHERE mp.metric_name = v_metric_name
275
      AND mp.valid_from = '-infinity'::timestamptz;
276

            
277
    IF v_initial_policy_id IS NULL THEN
278
        RAISE EXCEPTION
279
            'initial policy row is missing for metric %',
280
            v_metric_name;
281
    END IF;
282

            
283
    EXECUTE format(
284
        'CREATE TABLE IF NOT EXISTS %s (
285
            segment_id bigserial PRIMARY KEY,
286
            device_pk bigint NOT NULL,
287
            device_id text NOT NULL,
288
            start_time timestamptz NOT NULL,
289
            end_time timestamptz NULL,
290
            value %s NULL,
291
            -- samples_count = 0 is intentional for synthetic gap segments inserted
292
            -- during lazy gap detection; those intervals represent missing data, not samples.
293
            samples_count integer NOT NULL DEFAULT 1
294
                CONSTRAINT %I CHECK (samples_count >= 0),
295
            policy_id bigint NOT NULL,
296
            segment_period tstzrange GENERATED ALWAYS AS (
297
                tstzrange(start_time, COALESCE(end_time, ''infinity''::timestamptz), ''[)'')
298
            ) STORED,
299
            CONSTRAINT %I CHECK (end_time IS NULL OR end_time > start_time),
300
            CONSTRAINT %I
301
                FOREIGN KEY (device_pk)
302
                REFERENCES telemetry.devices(device_pk)
303
                ON UPDATE CASCADE
304
                ON DELETE CASCADE,
305
            CONSTRAINT %I
306
                FOREIGN KEY (device_id)
307
                REFERENCES telemetry.devices(device_id)
308
                ON UPDATE CASCADE
309
                ON DELETE CASCADE,
310
            CONSTRAINT %I
311
                FOREIGN KEY (policy_id)
312
                REFERENCES telemetry.metric_policies(policy_id)
313
        ) WITH (
314
            fillfactor = 90,
315
            autovacuum_vacuum_scale_factor = 0.02,
316
            autovacuum_analyze_scale_factor = 0.01
317
        )',
318
        v_table_name,
319
        p_value_type,
320
        v_samples_check,
321
        v_end_time_check,
322
        v_device_pk_fk,
323
        v_device_fk,
324
        v_policy_fk
325
    );
326

            
327
    EXECUTE format(
328
        'CREATE UNIQUE INDEX IF NOT EXISTS %I
329
         ON %s (device_id)
330
         WHERE end_time IS NULL',
331
        v_open_idx,
332
        v_table_name
333
    );
334

            
335
    EXECUTE format(
336
        'CREATE INDEX IF NOT EXISTS %I
337
         ON %s (device_id, start_time DESC)
338
         INCLUDE (end_time, value, samples_count)',
339
        v_start_idx,
340
        v_table_name
341
    );
342

            
343
    EXECUTE format(
344
        'CREATE INDEX IF NOT EXISTS %I
345
         ON %s
346
         USING gist (device_id, segment_period)',
347
        v_device_period_gist_idx,
348
        v_table_name
349
    );
350

            
351
    BEGIN
352
        EXECUTE format(
353
            'ALTER TABLE %s
354
             ADD CONSTRAINT %I
355
             EXCLUDE USING gist (
356
                 device_id WITH =,
357
                 segment_period WITH &&
358
             )',
359
            v_table_name,
360
            v_period_excl
361
        );
362
    EXCEPTION
363
        WHEN duplicate_object THEN
364
            NULL;
365
        WHEN duplicate_table THEN
366
            NULL;
367
    END;
368
END;
369
$$;
370

            
371

            
372
-- Metric registration.
373

            
374
CREATE OR REPLACE FUNCTION telemetry.register_numeric_metric(
375
    p_metric_name text,
376
    p_table_name text,
377
    p_domain_name text,
378
    p_epsilon double precision,
379
    p_min_value double precision,
380
    p_max_value double precision,
381
    p_rounding_precision double precision,
382
    p_max_sampling_interval interval,
383
    p_allow_null boolean DEFAULT true
384
)
385
RETURNS void
386
LANGUAGE plpgsql
387
AS $$
388
DECLARE
389
    v_table_name text := telemetry.normalize_metric_table_name(p_table_name);
390
BEGIN
391
    INSERT INTO telemetry.metrics (
392
        metric_name,
393
        table_name,
394
        domain_name,
395
        metric_type,
396
        comparison_mode,
397
        epsilon,
398
        min_value,
399
        max_value,
400
        rounding_precision,
401
        max_sampling_interval,
402
        allow_null
403
    )
404
    VALUES (
405
        p_metric_name,
406
        v_table_name,
407
        p_domain_name,
408
        'numeric',
409
        'epsilon',
410
        p_epsilon,
411
        p_min_value,
412
        p_max_value,
413
        p_rounding_precision,
414
        p_max_sampling_interval,
415
        p_allow_null
416
    )
417
    ON CONFLICT (metric_name) DO UPDATE
418
    SET table_name = EXCLUDED.table_name,
419
        domain_name = EXCLUDED.domain_name,
420
        metric_type = EXCLUDED.metric_type,
421
        comparison_mode = EXCLUDED.comparison_mode,
422
        epsilon = EXCLUDED.epsilon,
423
        min_value = EXCLUDED.min_value,
424
        max_value = EXCLUDED.max_value,
425
        rounding_precision = EXCLUDED.rounding_precision,
426
        max_sampling_interval = EXCLUDED.max_sampling_interval,
427
        allow_null = EXCLUDED.allow_null,
428
        updated_at = now();
429

            
430
    INSERT INTO telemetry.metric_policies (
431
        metric_name,
432
        valid_from,
433
        comparison_mode,
434
        epsilon,
435
        min_value,
436
        max_value,
437
        rounding_precision,
438
        allow_null,
439
        max_sampling_interval
440
    )
441
    VALUES (
442
        p_metric_name,
443
        '-infinity'::timestamptz,
444
        'epsilon',
445
        p_epsilon,
446
        p_min_value,
447
        p_max_value,
448
        p_rounding_precision,
449
        p_allow_null,
450
        p_max_sampling_interval
451
    )
452
    ON CONFLICT (metric_name, valid_from) DO NOTHING;
453

            
454
    PERFORM telemetry.create_segment_table(v_table_name, 'double precision');
455
    PERFORM telemetry.metric_table_sql(v_table_name);
456
END;
457
$$;
458

            
459
CREATE OR REPLACE FUNCTION telemetry.register_boolean_metric(
460
    p_metric_name text,
461
    p_table_name text,
462
    p_domain_name text,
463
    p_max_sampling_interval interval,
464
    p_allow_null boolean DEFAULT true
465
)
466
RETURNS void
467
LANGUAGE plpgsql
468
AS $$
469
DECLARE
470
    v_table_name text := telemetry.normalize_metric_table_name(p_table_name);
471
BEGIN
472
    INSERT INTO telemetry.metrics (
473
        metric_name,
474
        table_name,
475
        domain_name,
476
        metric_type,
477
        comparison_mode,
478
        epsilon,
479
        min_value,
480
        max_value,
481
        rounding_precision,
482
        max_sampling_interval,
483
        allow_null
484
    )
485
    VALUES (
486
        p_metric_name,
487
        v_table_name,
488
        p_domain_name,
489
        'boolean',
490
        'exact',
491
        NULL,
492
        NULL,
493
        NULL,
494
        NULL,
495
        p_max_sampling_interval,
496
        p_allow_null
497
    )
498
    ON CONFLICT (metric_name) DO UPDATE
499
    SET table_name = EXCLUDED.table_name,
500
        domain_name = EXCLUDED.domain_name,
501
        metric_type = EXCLUDED.metric_type,
502
        comparison_mode = EXCLUDED.comparison_mode,
503
        epsilon = EXCLUDED.epsilon,
504
        min_value = EXCLUDED.min_value,
505
        max_value = EXCLUDED.max_value,
506
        rounding_precision = EXCLUDED.rounding_precision,
507
        max_sampling_interval = EXCLUDED.max_sampling_interval,
508
        allow_null = EXCLUDED.allow_null,
509
        updated_at = now();
510

            
511
    INSERT INTO telemetry.metric_policies (
512
        metric_name,
513
        valid_from,
514
        comparison_mode,
515
        epsilon,
516
        min_value,
517
        max_value,
518
        rounding_precision,
519
        allow_null,
520
        max_sampling_interval
521
    )
522
    VALUES (
523
        p_metric_name,
524
        '-infinity'::timestamptz,
525
        'exact',
526
        NULL,
527
        NULL,
528
        NULL,
529
        NULL,
530
        p_allow_null,
531
        p_max_sampling_interval
532
    )
533
    ON CONFLICT (metric_name, valid_from) DO NOTHING;
534

            
535
    PERFORM telemetry.create_segment_table(v_table_name, 'boolean');
536
    PERFORM telemetry.metric_table_sql(v_table_name);
537
END;
538
$$;
539

            
540
-- Metric and policy lookup.
541

            
542
CREATE OR REPLACE FUNCTION telemetry.require_metric(p_metric_name text)
543
RETURNS telemetry.metrics
544
LANGUAGE plpgsql
545
STABLE
546
AS $$
547
DECLARE
548
    v_metric telemetry.metrics%ROWTYPE;
549
BEGIN
550
    SELECT *
551
    INTO v_metric
552
    FROM telemetry.metrics AS m
553
    WHERE m.metric_name = p_metric_name;
554

            
555
    IF NOT FOUND THEN
556
        RAISE EXCEPTION 'unknown metric: %', p_metric_name;
557
    END IF;
558

            
559
    RETURN v_metric;
560
END;
561
$$;
562

            
563
CREATE OR REPLACE FUNCTION telemetry.require_metric_policy(
564
    p_metric_name text,
565
    p_observed_at timestamptz
566
)
567
RETURNS telemetry.metric_policies
568
LANGUAGE plpgsql
569
STABLE
570
AS $$
571
DECLARE
572
    v_policy telemetry.metric_policies%ROWTYPE;
573
BEGIN
574
    SELECT *
575
    INTO v_policy
576
    FROM telemetry.metric_policies AS mp
577
    WHERE mp.metric_name = p_metric_name
578
      AND mp.valid_from <= p_observed_at
579
    ORDER BY mp.valid_from DESC
580
    LIMIT 1;
581

            
582
    IF NOT FOUND THEN
583
        RAISE EXCEPTION
584
            'no active policy for metric % at %',
585
            p_metric_name,
586
            p_observed_at;
587
    END IF;
588

            
589
    RETURN v_policy;
590
END;
591
$$;
592

            
593
CREATE OR REPLACE FUNCTION telemetry.add_metric_policy(
594
    p_metric_name text,
595
    p_valid_from timestamptz,
596
    p_comparison_mode telemetry.comparison_mode_enum,
597
    p_epsilon double precision,
598
    p_min_value double precision,
599
    p_max_value double precision,
600
    p_rounding_precision double precision,
601
    p_max_sampling_interval interval,
602
    p_allow_null boolean
603
)
604
RETURNS bigint
605
LANGUAGE plpgsql
606
AS $$
607
DECLARE
608
    v_metric telemetry.metrics%ROWTYPE;
609
    v_policy_id bigint;
610
BEGIN
611
    IF p_valid_from IS NULL THEN
612
        RAISE EXCEPTION 'valid_from is required';
613
    END IF;
614

            
615
    IF p_max_sampling_interval IS NULL THEN
616
        RAISE EXCEPTION 'max_sampling_interval is required';
617
    END IF;
618

            
619
    v_metric := telemetry.require_metric(p_metric_name);
620

            
621
    IF p_min_value IS NOT NULL
622
       AND p_max_value IS NOT NULL
623
       AND p_min_value > p_max_value THEN
624
        RAISE EXCEPTION 'min_value % exceeds max_value % for metric %',
625
            p_min_value,
626
            p_max_value,
627
            p_metric_name;
628
    END IF;
629

            
630
    CASE v_metric.metric_type
631
        WHEN 'numeric' THEN
632
            IF p_comparison_mode <> 'epsilon'::telemetry.comparison_mode_enum THEN
633
                RAISE EXCEPTION
634
                    'numeric metric % requires comparison_mode epsilon',
635
                    p_metric_name;
636
            END IF;
637

            
638
            IF p_epsilon IS NULL OR p_rounding_precision IS NULL THEN
639
                RAISE EXCEPTION
640
                    'numeric metric % requires epsilon and rounding_precision',
641
                    p_metric_name;
642
            END IF;
643
        WHEN 'boolean', 'state' THEN
644
            IF p_comparison_mode <> 'exact'::telemetry.comparison_mode_enum THEN
645
                RAISE EXCEPTION
646
                    'metric % with type % requires comparison_mode exact',
647
                    p_metric_name,
648
                    v_metric.metric_type;
649
            END IF;
650

            
651
            IF p_epsilon IS NOT NULL
652
               OR p_min_value IS NOT NULL
653
               OR p_max_value IS NOT NULL
654
               OR p_rounding_precision IS NOT NULL THEN
655
                RAISE EXCEPTION
656
                    'metric % with type % does not accept epsilon/min/max/rounding_precision in policies',
657
                    p_metric_name,
658
                    v_metric.metric_type;
659
            END IF;
660
        ELSE
661
            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
662
    END CASE;
663

            
664
    IF EXISTS (
665
        SELECT 1
666
        FROM telemetry.metric_policies AS mp
667
        WHERE mp.metric_name = p_metric_name
668
          AND mp.valid_from = p_valid_from
669
    ) THEN
670
        RAISE EXCEPTION
671
            'policy already exists for metric % at %',
672
            p_metric_name,
673
            p_valid_from;
674
    END IF;
675

            
676
    INSERT INTO telemetry.metric_policies (
677
        metric_name,
678
        valid_from,
679
        comparison_mode,
680
        epsilon,
681
        min_value,
682
        max_value,
683
        rounding_precision,
684
        allow_null,
685
        max_sampling_interval
686
    )
687
    VALUES (
688
        p_metric_name,
689
        p_valid_from,
690
        p_comparison_mode,
691
        p_epsilon,
692
        p_min_value,
693
        p_max_value,
694
        p_rounding_precision,
695
        p_allow_null,
696
        p_max_sampling_interval
697
    )
698
    RETURNING policy_id
699
    INTO v_policy_id;
700

            
701
    UPDATE telemetry.metrics AS m
702
    SET comparison_mode = p_comparison_mode,
703
        epsilon = p_epsilon,
704
        min_value = p_min_value,
705
        max_value = p_max_value,
706
        rounding_precision = p_rounding_precision,
707
        allow_null = p_allow_null,
708
        max_sampling_interval = p_max_sampling_interval,
709
        updated_at = now()
710
    WHERE m.metric_name = p_metric_name
711
      AND NOT EXISTS (
712
          SELECT 1
713
          FROM telemetry.metric_policies AS later
714
          WHERE later.metric_name = p_metric_name
715
            AND later.valid_from > p_valid_from
716
      );
717

            
718
    RETURN v_policy_id;
719
END;
720
$$;
721

            
722
-- Ingestion functions.
723

            
724
CREATE OR REPLACE FUNCTION telemetry.assert_append_only(
725
    p_metric_name text,
726
    p_device_id text,
727
    p_observed_at timestamptz
728
)
729
RETURNS timestamptz
730
LANGUAGE plpgsql
731
AS $$
732
DECLARE
733
    v_last_observed_at timestamptz;
734
BEGIN
735
    SELECT w.last_observed_at
736
    INTO v_last_observed_at
737
    FROM telemetry.metric_device_watermarks AS w
738
    WHERE w.metric_name = p_metric_name
739
      AND w.device_id = p_device_id;
740

            
741
    IF FOUND AND p_observed_at <= v_last_observed_at THEN
742
        RAISE EXCEPTION
743
            'out-of-order measurement for metric % device %: incoming % <= last observed %',
744
            p_metric_name,
745
            p_device_id,
746
            p_observed_at,
747
            v_last_observed_at;
748
    END IF;
749

            
750
    RETURN v_last_observed_at;
751
END;
752
$$;
753

            
754
CREATE OR REPLACE FUNCTION telemetry.bump_watermark(
755
    p_metric_name text,
756
    p_device_id text,
757
    p_observed_at timestamptz
758
)
759
RETURNS void
760
LANGUAGE sql
761
AS $$
762
    INSERT INTO telemetry.metric_device_watermarks (
763
        metric_name,
764
        device_id,
765
        last_observed_at
766
    )
767
    VALUES ($1, $2, $3)
768
    ON CONFLICT ON CONSTRAINT metric_device_watermarks_pkey DO UPDATE
769
    SET last_observed_at = EXCLUDED.last_observed_at;
770
$$;
771

            
772
CREATE OR REPLACE FUNCTION telemetry.ingest_segment(
773
    p_table_name text,
774
    p_metric_type telemetry.metric_type_enum,
775
    p_comparison_mode telemetry.comparison_mode_enum,
776
    p_policy_id bigint,
777
    p_policy_valid_from timestamptz,
778
    p_device_pk bigint,
779
    p_device_id text,
780
    p_value anyelement,
781
    p_observed_at timestamptz,
782
    p_last_observed_at timestamptz,
783
    p_epsilon double precision,
784
    p_max_sampling_interval interval,
785
    p_allow_null boolean
786
)
787
RETURNS text
788
LANGUAGE plpgsql
789
AS $$
790
DECLARE
791
    v_current record;
792
    v_has_current boolean;
793
    v_gap_start timestamptz;
794
    v_rowcount bigint;
795
    v_equal boolean;
796
    v_policy_changed boolean;
797
    v_effective_last_observed_at timestamptz := p_last_observed_at;
798
    v_table_name text := telemetry.normalize_metric_table_name(p_table_name);
799
    v_table_sql text := telemetry.metric_table_sql(p_table_name);
800
BEGIN
801
    EXECUTE format(
802
        'SELECT segment_id, device_pk, device_id, start_time, end_time, value, samples_count, policy_id
803
         FROM %s
804
         WHERE device_id = $1
805
           AND end_time IS NULL
806
         FOR UPDATE',
807
        v_table_sql
808
    )
809
    INTO v_current
810
    USING p_device_id;
811

            
812
    GET DIAGNOSTICS v_rowcount = ROW_COUNT;
813
    v_has_current := v_rowcount > 0;
814

            
815
    IF p_value IS NULL AND NOT p_allow_null THEN
816
        RAISE EXCEPTION 'metric table % does not allow explicit NULL measurements', v_table_name;
817
    END IF;
818

            
819
    IF NOT v_has_current THEN
820
        EXECUTE format(
821
            'INSERT INTO %s (
822
                device_pk,
823
                device_id,
824
                start_time,
825
                end_time,
826
                value,
827
                samples_count,
828
                policy_id
829
             )
830
             VALUES ($1, $2, $3, NULL, $4, 1, $5)',
831
            v_table_sql
832
        )
833
        USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id;
834

            
835
        RETURN CASE WHEN p_value IS NULL THEN 'opened_null' ELSE 'opened' END;
836
    END IF;
837

            
838
    v_policy_changed := v_current.policy_id IS DISTINCT FROM p_policy_id;
839

            
840
    IF v_policy_changed
841
       AND p_policy_valid_from > v_current.start_time THEN
842
        EXECUTE format(
843
            'UPDATE %s
844
             SET end_time = $1
845
             WHERE segment_id = $2',
846
            v_table_sql
847
        )
848
        USING p_policy_valid_from, v_current.segment_id;
849

            
850
        EXECUTE format(
851
            'INSERT INTO %s (
852
                device_pk,
853
                device_id,
854
                start_time,
855
                end_time,
856
                value,
857
                samples_count,
858
                policy_id
859
             )
860
             -- samples_count = 0 marks a synthetic continuation across a policy boundary.
861
             VALUES ($1, $2, $3, NULL, $4, 0, $5)
862
             RETURNING segment_id, device_pk, device_id, start_time, end_time, value, samples_count, policy_id',
863
            v_table_sql
864
        )
865
        INTO v_current
866
        USING v_current.device_pk, v_current.device_id, p_policy_valid_from, v_current.value, p_policy_id;
867

            
868
        v_policy_changed := false;
869
        v_effective_last_observed_at := GREATEST(
870
            COALESCE(p_last_observed_at, p_policy_valid_from),
871
            p_policy_valid_from
872
        );
873
    END IF;
874

            
875
    IF v_effective_last_observed_at IS NOT NULL
876
       AND p_observed_at - v_effective_last_observed_at > p_max_sampling_interval
877
       AND v_current.value IS NOT NULL THEN
878
        v_gap_start := v_effective_last_observed_at + p_max_sampling_interval;
879

            
880
        EXECUTE format(
881
            'UPDATE %s
882
             SET end_time = $1
883
             WHERE segment_id = $2',
884
            v_table_sql
885
        )
886
        USING v_gap_start, v_current.segment_id;
887

            
888
        IF p_value IS NULL THEN
889
            EXECUTE format(
890
                'INSERT INTO %s (
891
                    device_pk,
892
                    device_id,
893
                    start_time,
894
                    end_time,
895
                    value,
896
                    samples_count,
897
                    policy_id
898
                 )
899
                 VALUES ($1, $2, $3, NULL, NULL, 1, $4)',
900
                v_table_sql
901
            )
902
            USING p_device_pk, p_device_id, v_gap_start, p_policy_id;
903

            
904
            RETURN 'gap_to_null';
905
        END IF;
906

            
907
        EXECUTE format(
908
            'INSERT INTO %s (
909
                device_pk,
910
                device_id,
911
                start_time,
912
                end_time,
913
                value,
914
                samples_count,
915
                policy_id
916
             )
917
             -- samples_count = 0 marks a historian-generated gap interval.
918
             VALUES ($1, $2, $3, $4, NULL, 0, $5)',
919
            v_table_sql
920
        )
921
        USING p_device_pk, p_device_id, v_gap_start, p_observed_at, p_policy_id;
922

            
923
        EXECUTE format(
924
            'INSERT INTO %s (
925
                device_pk,
926
                device_id,
927
                start_time,
928
                end_time,
929
                value,
930
                samples_count,
931
                policy_id
932
             )
933
             VALUES ($1, $2, $3, NULL, $4, 1, $5)',
934
            v_table_sql
935
        )
936
        USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id;
937

            
938
        RETURN 'gap_split';
939
    END IF;
940

            
941
    IF v_current.samples_count = 0
942
       AND p_observed_at = v_current.start_time THEN
943
        IF p_value IS NULL THEN
944
            EXECUTE format(
945
                'UPDATE %s
946
                 SET value = NULL,
947
                     samples_count = 1
948
                 WHERE segment_id = $1',
949
                v_table_sql
950
            )
951
            USING v_current.segment_id;
952

            
953
            RETURN CASE
954
                WHEN v_current.value IS NULL THEN 'extended_null'
955
                ELSE 'value_to_null'
956
            END;
957
        END IF;
958

            
959
        IF v_current.value IS NULL THEN
960
            EXECUTE format(
961
                'UPDATE %s
962
                 SET value = $1,
963
                     samples_count = 1
964
                 WHERE segment_id = $2',
965
                v_table_sql
966
            )
967
            USING p_value, v_current.segment_id;
968

            
969
            RETURN 'null_to_value';
970
        END IF;
971

            
972
        CASE p_comparison_mode
973
            WHEN 'epsilon' THEN
974
                EXECUTE
975
                    'SELECT abs($1::double precision - $2::double precision) <= $3'
976
                INTO v_equal
977
                USING v_current.value, p_value, p_epsilon;
978
            WHEN 'exact' THEN
979
                EXECUTE
980
                    'SELECT $1 IS NOT DISTINCT FROM $2'
981
                INTO v_equal
982
                USING v_current.value, p_value;
983
            ELSE
984
                RAISE EXCEPTION 'unsupported comparison_mode % for table %', p_comparison_mode, v_table_name;
985
        END CASE;
986

            
987
        EXECUTE format(
988
            'UPDATE %s
989
             SET value = $1,
990
                 samples_count = 1
991
             WHERE segment_id = $2',
992
            v_table_sql
993
        )
994
        USING p_value, v_current.segment_id;
995

            
996
        RETURN CASE
997
            WHEN v_equal THEN 'extended'
998
            ELSE 'split'
999
        END;
1000
    END IF;
1001

            
1002
    IF v_current.value IS NULL THEN
1003
        IF p_value IS NULL THEN
1004
            IF v_policy_changed THEN
1005
                EXECUTE format(
1006
                    'UPDATE %s
1007
                     SET end_time = $1
1008
                     WHERE segment_id = $2',
1009
                    v_table_sql
1010
                )
1011
                USING p_observed_at, v_current.segment_id;
1012

            
1013
                EXECUTE format(
1014
                    'INSERT INTO %s (
1015
                        device_pk,
1016
                        device_id,
1017
                        start_time,
1018
                        end_time,
1019
                        value,
1020
                        samples_count,
1021
                        policy_id
1022
                     )
1023
                     VALUES ($1, $2, $3, NULL, NULL, 1, $4)',
1024
                    v_table_sql
1025
                )
1026
                USING p_device_pk, p_device_id, p_observed_at, p_policy_id;
1027

            
1028
                RETURN 'split';
1029
            END IF;
1030

            
1031
            EXECUTE format(
1032
                'UPDATE %s
1033
                 SET samples_count = samples_count + 1
1034
                 WHERE segment_id = $1',
1035
                v_table_sql
1036
            )
1037
            USING v_current.segment_id;
1038

            
1039
            RETURN 'extended_null';
1040
        END IF;
1041

            
1042
        EXECUTE format(
1043
            'UPDATE %s
1044
             SET end_time = $1
1045
             WHERE segment_id = $2',
1046
            v_table_sql
1047
        )
1048
        USING p_observed_at, v_current.segment_id;
1049

            
1050
        EXECUTE format(
1051
            'INSERT INTO %s (
1052
                device_pk,
1053
                device_id,
1054
                start_time,
1055
                end_time,
1056
                value,
1057
                samples_count,
1058
                policy_id
1059
             )
1060
             VALUES ($1, $2, $3, NULL, $4, 1, $5)',
1061
            v_table_sql
1062
        )
1063
        USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id;
1064

            
1065
        RETURN 'null_to_value';
1066
    END IF;
1067

            
1068
    IF p_value IS NULL THEN
1069
        EXECUTE format(
1070
            'UPDATE %s
1071
             SET end_time = $1
1072
             WHERE segment_id = $2',
1073
            v_table_sql
1074
        )
1075
        USING p_observed_at, v_current.segment_id;
1076

            
1077
        EXECUTE format(
1078
            'INSERT INTO %s (
1079
                device_pk,
1080
                device_id,
1081
                start_time,
1082
                end_time,
1083
                value,
1084
                samples_count,
1085
                policy_id
1086
             )
1087
             VALUES ($1, $2, $3, NULL, NULL, 1, $4)',
1088
            v_table_sql
1089
        )
1090
        USING p_device_pk, p_device_id, p_observed_at, p_policy_id;
1091

            
1092
        RETURN 'value_to_null';
1093
    END IF;
1094

            
1095
    CASE p_comparison_mode
1096
        WHEN 'epsilon' THEN
1097
            EXECUTE
1098
                'SELECT abs($1::double precision - $2::double precision) <= $3'
1099
            INTO v_equal
1100
            USING v_current.value, p_value, p_epsilon;
1101
        WHEN 'exact' THEN
1102
            EXECUTE
1103
                'SELECT $1 IS NOT DISTINCT FROM $2'
1104
            INTO v_equal
1105
            USING v_current.value, p_value;
1106
        ELSE
1107
            RAISE EXCEPTION 'unsupported comparison_mode % for table %', p_comparison_mode, v_table_name;
1108
    END CASE;
1109

            
1110
    IF v_equal AND NOT v_policy_changed THEN
1111
        EXECUTE format(
1112
            'UPDATE %s
1113
             SET samples_count = samples_count + 1
1114
             WHERE segment_id = $1',
1115
            v_table_sql
1116
        )
1117
        USING v_current.segment_id;
1118

            
1119
        RETURN 'extended';
1120
    END IF;
1121

            
1122
    EXECUTE format(
1123
        'UPDATE %s
1124
         SET end_time = $1
1125
         WHERE segment_id = $2',
1126
        v_table_sql
1127
    )
1128
    USING p_observed_at, v_current.segment_id;
1129

            
1130
    EXECUTE format(
1131
        'INSERT INTO %s (
1132
            device_pk,
1133
            device_id,
1134
            start_time,
1135
            end_time,
1136
            value,
1137
            samples_count,
1138
            policy_id
1139
         )
1140
         VALUES ($1, $2, $3, NULL, $4, 1, $5)',
1141
        v_table_sql
1142
    )
1143
    USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id;
1144

            
1145
    RETURN 'split';
1146
END;
1147
$$;
1148

            
1149
CREATE OR REPLACE FUNCTION telemetry.ingest_measurement(
1150
    p_metric_name text,
1151
    p_device_id text,
1152
    p_value double precision,
1153
    p_observed_at timestamptz
1154
)
1155
RETURNS TABLE (
1156
    metric_name text,
1157
    device_id text,
1158
    table_name text,
1159
    normalized_value double precision,
1160
    action text
1161
)
1162
LANGUAGE plpgsql
1163
AS $$
1164
DECLARE
1165
    v_metric telemetry.metrics%ROWTYPE;
1166
    v_policy telemetry.metric_policies%ROWTYPE;
1167
    v_device telemetry.devices%ROWTYPE;
1168
    v_last_observed_at timestamptz;
1169
    v_normalized double precision;
1170
    v_action text;
1171
    v_metric_lock_key integer;
1172
    v_device_lock_key integer;
1173
BEGIN
1174
    IF p_metric_name IS NULL OR btrim(p_metric_name) = '' THEN
1175
        RAISE EXCEPTION 'metric_name is required';
1176
    END IF;
1177

            
1178
    IF p_device_id IS NULL OR btrim(p_device_id) = '' THEN
1179
        RAISE EXCEPTION 'device_id is required';
1180
    END IF;
1181

            
1182
    IF p_observed_at IS NULL THEN
1183
        RAISE EXCEPTION 'observed_at is required';
1184
    END IF;
1185

            
1186
    v_metric := telemetry.require_metric(p_metric_name);
1187
    PERFORM telemetry.ensure_device(p_device_id);
1188
    v_device := telemetry.require_device(p_device_id);
1189

            
1190
    -- 'state' is reserved for future implementation of discrete enumerated
1191
    -- machine states, likely stored as smallint. It will use the same segment
1192
    -- logic as boolean metrics, but with exact comparison instead of epsilon.
1193
    CASE v_metric.metric_type
1194
        WHEN 'numeric' THEN
1195
            NULL;
1196
        WHEN 'boolean' THEN
1197
            RAISE EXCEPTION 'metric % is boolean; use the boolean overload of ingest_measurement(...)', p_metric_name;
1198
        WHEN 'state' THEN
1199
            RAISE EXCEPTION 'metric % is state; state ingestion is not implemented yet', p_metric_name;
1200
        ELSE
1201
            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
1202
    END CASE;
1203

            
1204
    v_metric_lock_key := telemetry.lock_key_from_text(p_metric_name);
1205
    v_device_lock_key := telemetry.lock_key_from_text(p_device_id);
1206
    PERFORM pg_advisory_xact_lock(v_metric_lock_key, v_device_lock_key);
1207
    v_last_observed_at := telemetry.assert_append_only(p_metric_name, p_device_id, p_observed_at);
1208
    v_policy := telemetry.require_metric_policy(p_metric_name, p_observed_at);
1209

            
1210
    IF p_value IS NULL THEN
1211
        v_normalized := NULL;
1212
    ELSE
1213
        v_normalized := telemetry.round_to_increment(p_value, v_policy.rounding_precision);
1214

            
1215
        IF v_policy.min_value IS NOT NULL AND v_normalized < v_policy.min_value THEN
1216
            RAISE EXCEPTION 'value % is below min_value % for metric %',
1217
                v_normalized, v_policy.min_value, p_metric_name;
1218
        END IF;
1219

            
1220
        IF v_policy.max_value IS NOT NULL AND v_normalized > v_policy.max_value THEN
1221
            RAISE EXCEPTION 'value % is above max_value % for metric %',
1222
                v_normalized, v_policy.max_value, p_metric_name;
1223
        END IF;
1224
    END IF;
1225

            
1226
    v_action := telemetry.ingest_segment(
1227
        v_metric.table_name,
1228
        v_metric.metric_type,
1229
        v_policy.comparison_mode,
1230
        v_policy.policy_id,
1231
        v_policy.valid_from,
1232
        v_device.device_pk,
1233
        p_device_id,
1234
        v_normalized,
1235
        p_observed_at,
1236
        v_last_observed_at,
1237
        v_policy.epsilon,
1238
        v_policy.max_sampling_interval,
1239
        v_policy.allow_null
1240
    );
1241

            
1242
    PERFORM telemetry.bump_watermark(p_metric_name, p_device_id, p_observed_at);
1243
    PERFORM telemetry.touch_device_last_seen(p_device_id, p_observed_at);
1244

            
1245
    RETURN QUERY
1246
    SELECT
1247
        p_metric_name,
1248
        p_device_id,
1249
        v_metric.table_name,
1250
        v_normalized,
1251
        v_action;
1252
END;
1253
$$;
1254

            
1255
CREATE OR REPLACE FUNCTION telemetry.ingest_measurement(
1256
    p_metric_name text,
1257
    p_device_id text,
1258
    p_value boolean,
1259
    p_observed_at timestamptz
1260
)
1261
RETURNS TABLE (
1262
    metric_name text,
1263
    device_id text,
1264
    table_name text,
1265
    normalized_value boolean,
1266
    action text
1267
)
1268
LANGUAGE plpgsql
1269
AS $$
1270
DECLARE
1271
    v_metric telemetry.metrics%ROWTYPE;
1272
    v_policy telemetry.metric_policies%ROWTYPE;
1273
    v_device telemetry.devices%ROWTYPE;
1274
    v_last_observed_at timestamptz;
1275
    v_action text;
1276
    v_metric_lock_key integer;
1277
    v_device_lock_key integer;
1278
BEGIN
1279
    IF p_metric_name IS NULL OR btrim(p_metric_name) = '' THEN
1280
        RAISE EXCEPTION 'metric_name is required';
1281
    END IF;
1282

            
1283
    IF p_device_id IS NULL OR btrim(p_device_id) = '' THEN
1284
        RAISE EXCEPTION 'device_id is required';
1285
    END IF;
1286

            
1287
    IF p_observed_at IS NULL THEN
1288
        RAISE EXCEPTION 'observed_at is required';
1289
    END IF;
1290

            
1291
    v_metric := telemetry.require_metric(p_metric_name);
1292
    PERFORM telemetry.ensure_device(p_device_id);
1293
    v_device := telemetry.require_device(p_device_id);
1294

            
1295
    -- 'state' is reserved for future implementation of discrete enumerated
1296
    -- machine states, likely stored as smallint. It will use the same segment
1297
    -- logic as boolean metrics, but with exact comparison instead of epsilon.
1298
    CASE v_metric.metric_type
1299
        WHEN 'boolean' THEN
1300
            NULL;
1301
        WHEN 'numeric' THEN
1302
            RAISE EXCEPTION 'metric % is numeric; use the numeric overload of ingest_measurement(...)', p_metric_name;
1303
        WHEN 'state' THEN
1304
            RAISE EXCEPTION 'metric % is state; state ingestion is not implemented yet', p_metric_name;
1305
        ELSE
1306
            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
1307
    END CASE;
1308

            
1309
    v_metric_lock_key := telemetry.lock_key_from_text(p_metric_name);
1310
    v_device_lock_key := telemetry.lock_key_from_text(p_device_id);
1311
    PERFORM pg_advisory_xact_lock(v_metric_lock_key, v_device_lock_key);
1312
    v_last_observed_at := telemetry.assert_append_only(p_metric_name, p_device_id, p_observed_at);
1313
    v_policy := telemetry.require_metric_policy(p_metric_name, p_observed_at);
1314

            
1315
    v_action := telemetry.ingest_segment(
1316
        v_metric.table_name,
1317
        v_metric.metric_type,
1318
        v_policy.comparison_mode,
1319
        v_policy.policy_id,
1320
        v_policy.valid_from,
1321
        v_device.device_pk,
1322
        p_device_id,
1323
        p_value,
1324
        p_observed_at,
1325
        v_last_observed_at,
1326
        v_policy.epsilon,
1327
        v_policy.max_sampling_interval,
1328
        v_policy.allow_null
1329
    );
1330

            
1331
    PERFORM telemetry.bump_watermark(p_metric_name, p_device_id, p_observed_at);
1332
    PERFORM telemetry.touch_device_last_seen(p_device_id, p_observed_at);
1333

            
1334
    RETURN QUERY
1335
    SELECT
1336
        p_metric_name,
1337
        p_device_id,
1338
        v_metric.table_name,
1339
        p_value,
1340
        v_action;
1341
END;
1342
$$;
1343

            
1344
-- Query functions.
1345

            
1346
CREATE OR REPLACE FUNCTION telemetry.value_at_from_table(
1347
    p_table_name text,
1348
    p_metric_name text,
1349
    p_device_id text,
1350
    p_at timestamptz
1351
)
1352
RETURNS jsonb
1353
LANGUAGE plpgsql
1354
STABLE
1355
AS $$
1356
DECLARE
1357
    v_value jsonb;
1358
    v_table_sql text := telemetry.metric_table_sql(p_table_name);
1359
BEGIN
1360
    EXECUTE format(
1361
        'WITH watermark AS (
1362
             SELECT w.last_observed_at
1363
             FROM telemetry.metric_device_watermarks AS w
1364
             WHERE w.metric_name = $1
1365
               AND w.device_id = $2
1366
         )
1367
         SELECT to_jsonb(s.value)
1368
         FROM %s AS s
1369
         JOIN telemetry.metric_policies AS mp
1370
           ON mp.policy_id = s.policy_id
1371
         LEFT JOIN watermark AS w ON true
1372
         WHERE s.device_id = $2
1373
           AND tstzrange(
1374
                 s.start_time,
1375
                 CASE
1376
                     WHEN s.end_time IS NOT NULL THEN s.end_time
1377
                     WHEN s.value IS NULL THEN ''infinity''::timestamptz
1378
                     ELSE COALESCE(
1379
                         GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
1380
                         + mp.max_sampling_interval,
1381
                         ''infinity''::timestamptz
1382
                     )
1383
                 END,
1384
                 ''[)''
1385
               ) @> $3
1386
         LIMIT 1',
1387
        v_table_sql
1388
    )
1389
    INTO v_value
1390
    USING p_metric_name, p_device_id, p_at;
1391

            
1392
    RETURN v_value;
1393
END;
1394
$$;
1395

            
1396
CREATE OR REPLACE FUNCTION telemetry.value_at(
1397
    p_metric_name text,
1398
    p_device_id text,
1399
    p_at timestamptz
1400
)
1401
RETURNS double precision
1402
LANGUAGE plpgsql
1403
STABLE
1404
AS $$
1405
DECLARE
1406
    v_metric telemetry.metrics%ROWTYPE;
1407
    v_value jsonb;
1408
BEGIN
1409
    v_metric := telemetry.require_metric(p_metric_name);
1410

            
1411
    -- 'state' is reserved for future implementation of discrete enumerated
1412
    -- machine states, likely stored as smallint. It will use the same segment
1413
    -- logic as boolean metrics, but with exact comparison instead of epsilon.
1414
    CASE v_metric.metric_type
1415
        WHEN 'numeric' THEN
1416
            v_value := telemetry.value_at_from_table(
1417
                v_metric.table_name,
1418
                p_metric_name,
1419
                p_device_id,
1420
                p_at
1421
            );
1422

            
1423
            RETURN CASE
1424
                WHEN v_value IS NULL THEN NULL
1425
                -- PostgreSQL must cast jsonb scalars via text first; direct
1426
                -- casts such as (v_value)::double precision are not supported.
1427
                ELSE v_value::text::double precision
1428
            END;
1429
        WHEN 'boolean' THEN
1430
            RAISE EXCEPTION 'metric % is boolean; use telemetry.boolean_value_at(...)', p_metric_name;
1431
        WHEN 'state' THEN
1432
            RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
1433
        ELSE
1434
            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
1435
    END CASE;
1436
END;
1437
$$;
1438

            
1439
CREATE OR REPLACE FUNCTION telemetry.boolean_value_at(
1440
    p_metric_name text,
1441
    p_device_id text,
1442
    p_at timestamptz
1443
)
1444
RETURNS boolean
1445
LANGUAGE plpgsql
1446
STABLE
1447
AS $$
1448
DECLARE
1449
    v_metric telemetry.metrics%ROWTYPE;
1450
    v_value jsonb;
1451
BEGIN
1452
    v_metric := telemetry.require_metric(p_metric_name);
1453

            
1454
    -- 'state' is reserved for future implementation of discrete enumerated
1455
    -- machine states, likely stored as smallint. It will use the same segment
1456
    -- logic as boolean metrics, but with exact comparison instead of epsilon.
1457
    CASE v_metric.metric_type
1458
        WHEN 'boolean' THEN
1459
            v_value := telemetry.value_at_from_table(
1460
                v_metric.table_name,
1461
                p_metric_name,
1462
                p_device_id,
1463
                p_at
1464
            );
1465

            
1466
            RETURN CASE
1467
                WHEN v_value IS NULL THEN NULL
1468
                -- PostgreSQL must cast jsonb scalars via text first; direct
1469
                -- casts such as (v_value)::boolean are not supported.
1470
                ELSE v_value::text::boolean
1471
            END;
1472
        WHEN 'numeric' THEN
1473
            RAISE EXCEPTION 'metric % is numeric; use telemetry.value_at(...)', p_metric_name;
1474
        WHEN 'state' THEN
1475
            RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
1476
        ELSE
1477
            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
1478
    END CASE;
1479
END;
1480
$$;
1481

            
1482
CREATE OR REPLACE FUNCTION telemetry.segments_between_from_table(
1483
    p_table_name text,
1484
    p_metric_name text,
1485
    p_device_id text,
1486
    p_from timestamptz,
1487
    p_to timestamptz
1488
)
1489
RETURNS TABLE (
1490
    device_id text,
1491
    start_time timestamptz,
1492
    end_time timestamptz,
1493
    value_json jsonb,
1494
    samples_count integer
1495
)
1496
LANGUAGE plpgsql
1497
STABLE
1498
AS $$
1499
DECLARE
1500
    v_table_sql text := telemetry.metric_table_sql(p_table_name);
1501
BEGIN
1502
    RETURN QUERY EXECUTE format(
1503
        'WITH watermark AS (
1504
             SELECT w.last_observed_at
1505
             FROM telemetry.metric_device_watermarks AS w
1506
             WHERE w.metric_name = $1
1507
               AND w.device_id = $2
1508
         ),
1509
         open_segment AS (
1510
             SELECT s.*
1511
             FROM %s AS s
1512
             WHERE s.device_id = $2
1513
               AND s.end_time IS NULL
1514
             LIMIT 1
1515
         ),
1516
         stored_segments AS (
1517
             SELECT
1518
                 s.device_id,
1519
                 GREATEST(s.start_time, $3) AS start_time,
1520
                 LEAST(
1521
                     CASE
1522
                         WHEN s.end_time IS NOT NULL THEN s.end_time
1523
                         WHEN s.value IS NULL THEN $4
1524
                         ELSE LEAST(
1525
                             COALESCE(
1526
                                 GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
1527
                                 + mp.max_sampling_interval,
1528
                                 $4
1529
                             ),
1530
                             $4
1531
                         )
1532
                     END,
1533
                     $4
1534
                 ) AS end_time,
1535
                 to_jsonb(s.value) AS value_json,
1536
                 s.samples_count
1537
             FROM %s AS s
1538
             JOIN telemetry.metric_policies AS mp
1539
               ON mp.policy_id = s.policy_id
1540
             LEFT JOIN watermark AS w ON true
1541
             WHERE s.device_id = $2
1542
               AND tstzrange(
1543
                     s.start_time,
1544
                     CASE
1545
                         WHEN s.end_time IS NOT NULL THEN s.end_time
1546
                         WHEN s.value IS NULL THEN ''infinity''::timestamptz
1547
                         ELSE COALESCE(
1548
                             GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
1549
                             + mp.max_sampling_interval,
1550
                             ''infinity''::timestamptz
1551
                         )
1552
                     END,
1553
                     ''[)''
1554
                   ) && tstzrange($3, $4, ''[)'')
1555
         ),
1556
         synthetic_tail AS (
1557
             SELECT
1558
                 $2::text AS device_id,
1559
                 GREATEST(
1560
                     GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
1561
                     + mp.max_sampling_interval,
1562
                     $3
1563
                 ) AS start_time,
1564
                 $4 AS end_time,
1565
                 NULL::jsonb AS value_json,
1566
                 0 AS samples_count
1567
             FROM open_segment AS s
1568
             JOIN telemetry.metric_policies AS mp
1569
               ON mp.policy_id = s.policy_id
1570
             JOIN watermark AS w ON true
1571
             WHERE s.value IS NOT NULL
1572
               AND GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
1573
                   + mp.max_sampling_interval < $4
1574
         )
1575
         SELECT *
1576
         FROM (
1577
             SELECT *
1578
             FROM stored_segments
1579
             WHERE end_time > start_time
1580

            
1581
             UNION ALL
1582

            
1583
             SELECT *
1584
             FROM synthetic_tail
1585
             WHERE end_time > start_time
1586
         ) AS combined_segments
1587
         ORDER BY start_time',
1588
        v_table_sql,
1589
        v_table_sql
1590
    )
1591
    USING p_metric_name, p_device_id, p_from, p_to;
1592
END;
1593
$$;
1594

            
1595
CREATE OR REPLACE FUNCTION telemetry.metric_segments(
1596
    p_metric_name text,
1597
    p_device_id text,
1598
    p_from timestamptz,
1599
    p_to timestamptz
1600
)
1601
RETURNS TABLE (
1602
    device_id text,
1603
    start_time timestamptz,
1604
    end_time timestamptz,
1605
    value double precision,
1606
    samples_count integer
1607
)
1608
LANGUAGE plpgsql
1609
STABLE
1610
AS $$
1611
DECLARE
1612
    v_metric telemetry.metrics%ROWTYPE;
1613
BEGIN
1614
    IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN
1615
        RAISE EXCEPTION 'p_from and p_to must define a non-empty interval';
1616
    END IF;
1617

            
1618
    v_metric := telemetry.require_metric(p_metric_name);
1619

            
1620
    -- 'state' is reserved for future implementation of discrete enumerated
1621
    -- machine states, likely stored as smallint. It will use the same segment
1622
    -- logic as boolean metrics, but with exact comparison instead of epsilon.
1623
    CASE v_metric.metric_type
1624
        WHEN 'numeric' THEN
1625
            RETURN QUERY
1626
            SELECT
1627
                s.device_id,
1628
                s.start_time,
1629
                s.end_time,
1630
                CASE
1631
                    WHEN s.value_json IS NULL THEN NULL
1632
                    -- PostgreSQL must cast jsonb scalars via text first;
1633
                    -- direct casts such as (s.value_json)::double precision
1634
                    -- are not supported.
1635
                    ELSE s.value_json::text::double precision
1636
                END AS value,
1637
                s.samples_count
1638
            FROM telemetry.segments_between_from_table(
1639
                v_metric.table_name,
1640
                p_metric_name,
1641
                p_device_id,
1642
                p_from,
1643
                p_to
1644
            ) AS s;
1645
        WHEN 'boolean' THEN
1646
            RAISE EXCEPTION 'metric % is boolean; use telemetry.boolean_segments_between(...)', p_metric_name;
1647
        WHEN 'state' THEN
1648
            RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
1649
        ELSE
1650
            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
1651
    END CASE;
1652
END;
1653
$$;
1654

            
1655
CREATE OR REPLACE FUNCTION telemetry.boolean_segments_between(
1656
    p_metric_name text,
1657
    p_device_id text,
1658
    p_from timestamptz,
1659
    p_to timestamptz
1660
)
1661
RETURNS TABLE (
1662
    device_id text,
1663
    start_time timestamptz,
1664
    end_time timestamptz,
1665
    value boolean,
1666
    samples_count integer
1667
)
1668
LANGUAGE plpgsql
1669
STABLE
1670
AS $$
1671
DECLARE
1672
    v_metric telemetry.metrics%ROWTYPE;
1673
BEGIN
1674
    IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN
1675
        RAISE EXCEPTION 'p_from and p_to must define a non-empty interval';
1676
    END IF;
1677

            
1678
    v_metric := telemetry.require_metric(p_metric_name);
1679

            
1680
    -- 'state' is reserved for future implementation of discrete enumerated
1681
    -- machine states, likely stored as smallint. It will use the same segment
1682
    -- logic as boolean metrics, but with exact comparison instead of epsilon.
1683
    CASE v_metric.metric_type
1684
        WHEN 'boolean' THEN
1685
            RETURN QUERY
1686
            SELECT
1687
                s.device_id,
1688
                s.start_time,
1689
                s.end_time,
1690
                CASE
1691
                    WHEN s.value_json IS NULL THEN NULL
1692
                    -- PostgreSQL must cast jsonb scalars via text first;
1693
                    -- direct casts such as (s.value_json)::boolean are not
1694
                    -- supported.
1695
                    ELSE s.value_json::text::boolean
1696
                END AS value,
1697
                s.samples_count
1698
            FROM telemetry.segments_between_from_table(
1699
                v_metric.table_name,
1700
                p_metric_name,
1701
                p_device_id,
1702
                p_from,
1703
                p_to
1704
            ) AS s;
1705
        WHEN 'numeric' THEN
1706
            RAISE EXCEPTION 'metric % is numeric; use telemetry.metric_segments(...)', p_metric_name;
1707
        WHEN 'state' THEN
1708
            RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
1709
        ELSE
1710
            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
1711
    END CASE;
1712
END;
1713
$$;
1714

            
1715
CREATE OR REPLACE FUNCTION telemetry.samples_from_table(
1716
    p_table_name text,
1717
    p_metric_name text,
1718
    p_device_id text,
1719
    p_from timestamptz,
1720
    p_to timestamptz,
1721
    p_points integer
1722
)
1723
RETURNS TABLE (
1724
    sample_at timestamptz,
1725
    value_json jsonb
1726
)
1727
LANGUAGE plpgsql
1728
STABLE
1729
AS $$
1730
DECLARE
1731
    v_table_sql text := telemetry.metric_table_sql(p_table_name);
1732
BEGIN
1733
    RETURN QUERY EXECUTE format(
1734
        'WITH watermark AS (
1735
             SELECT w.last_observed_at
1736
             FROM telemetry.metric_device_watermarks AS w
1737
             WHERE w.metric_name = $1
1738
               AND w.device_id = $2
1739
         ),
1740
         sample_points AS (
1741
             SELECT $3 + (($4 - $3) * gs.i::double precision / ($5 - 1)::double precision) AS sample_at
1742
             FROM generate_series(0, $5 - 1) AS gs(i)
1743
         )
1744
         SELECT
1745
             sp.sample_at,
1746
             to_jsonb(seg.value) AS value_json
1747
         FROM sample_points AS sp
1748
         LEFT JOIN watermark AS w ON true
1749
         LEFT JOIN LATERAL (
1750
             SELECT s.value
1751
             FROM %s AS s
1752
             JOIN telemetry.metric_policies AS mp
1753
               ON mp.policy_id = s.policy_id
1754
             WHERE s.device_id = $2
1755
               AND tstzrange(
1756
                     s.start_time,
1757
                     CASE
1758
                         WHEN s.end_time IS NOT NULL THEN s.end_time
1759
                         WHEN s.value IS NULL THEN ''infinity''::timestamptz
1760
                         ELSE COALESCE(
1761
                             GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
1762
                             + mp.max_sampling_interval,
1763
                             ''infinity''::timestamptz
1764
                         )
1765
                     END,
1766
                     ''[)''
1767
                   ) @> sp.sample_at
1768
             LIMIT 1
1769
         ) AS seg ON true
1770
         ORDER BY sp.sample_at',
1771
        v_table_sql
1772
    )
1773
    USING p_metric_name, p_device_id, p_from, p_to, p_points;
1774
END;
1775
$$;
1776

            
1777
CREATE OR REPLACE FUNCTION telemetry.sample_metric(
1778
    p_metric_name text,
1779
    p_device_id text,
1780
    p_from timestamptz,
1781
    p_to timestamptz,
1782
    p_points integer
1783
)
1784
RETURNS TABLE (
1785
    sample_at timestamptz,
1786
    value double precision
1787
)
1788
LANGUAGE plpgsql
1789
STABLE
1790
AS $$
1791
DECLARE
1792
    v_metric telemetry.metrics%ROWTYPE;
1793
BEGIN
1794
    IF p_points IS NULL OR p_points < 2 THEN
1795
        RAISE EXCEPTION 'p_points must be at least 2';
1796
    END IF;
1797

            
1798
    IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN
1799
        RAISE EXCEPTION 'p_from and p_to must define a non-empty interval';
1800
    END IF;
1801

            
1802
    v_metric := telemetry.require_metric(p_metric_name);
1803

            
1804
    -- 'state' is reserved for future implementation of discrete enumerated
1805
    -- machine states, likely stored as smallint. It will use the same segment
1806
    -- logic as boolean metrics, but with exact comparison instead of epsilon.
1807
    CASE v_metric.metric_type
1808
        WHEN 'numeric' THEN
1809
            RETURN QUERY
1810
            SELECT
1811
                s.sample_at,
1812
                CASE
1813
                    WHEN s.value_json IS NULL THEN NULL
1814
                    -- PostgreSQL must cast jsonb scalars via text first;
1815
                    -- direct casts such as (s.value_json)::double precision
1816
                    -- are not supported.
1817
                    ELSE s.value_json::text::double precision
1818
                END AS value
1819
            FROM telemetry.samples_from_table(
1820
                v_metric.table_name,
1821
                p_metric_name,
1822
                p_device_id,
1823
                p_from,
1824
                p_to,
1825
                p_points
1826
            ) AS s;
1827
        WHEN 'boolean' THEN
1828
            RAISE EXCEPTION 'metric % is boolean; use telemetry.boolean_samples(...)', p_metric_name;
1829
        WHEN 'state' THEN
1830
            RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
1831
        ELSE
1832
            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
1833
    END CASE;
1834
END;
1835
$$;
1836

            
1837
CREATE OR REPLACE FUNCTION telemetry.boolean_samples(
1838
    p_metric_name text,
1839
    p_device_id text,
1840
    p_from timestamptz,
1841
    p_to timestamptz,
1842
    p_points integer
1843
)
1844
RETURNS TABLE (
1845
    sample_at timestamptz,
1846
    value boolean
1847
)
1848
LANGUAGE plpgsql
1849
STABLE
1850
AS $$
1851
DECLARE
1852
    v_metric telemetry.metrics%ROWTYPE;
1853
BEGIN
1854
    IF p_points IS NULL OR p_points < 2 THEN
1855
        RAISE EXCEPTION 'p_points must be at least 2';
1856
    END IF;
1857

            
1858
    IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN
1859
        RAISE EXCEPTION 'p_from and p_to must define a non-empty interval';
1860
    END IF;
1861

            
1862
    v_metric := telemetry.require_metric(p_metric_name);
1863

            
1864
    -- 'state' is reserved for future implementation of discrete enumerated
1865
    -- machine states, likely stored as smallint. It will use the same segment
1866
    -- logic as boolean metrics, but with exact comparison instead of epsilon.
1867
    CASE v_metric.metric_type
1868
        WHEN 'boolean' THEN
1869
            RETURN QUERY
1870
            SELECT
1871
                s.sample_at,
1872
                CASE
1873
                    WHEN s.value_json IS NULL THEN NULL
1874
                    -- PostgreSQL must cast jsonb scalars via text first;
1875
                    -- direct casts such as (s.value_json)::boolean are not
1876
                    -- supported.
1877
                    ELSE s.value_json::text::boolean
1878
                END AS value
1879
            FROM telemetry.samples_from_table(
1880
                v_metric.table_name,
1881
                p_metric_name,
1882
                p_device_id,
1883
                p_from,
1884
                p_to,
1885
                p_points
1886
            ) AS s;
1887
        WHEN 'numeric' THEN
1888
            RAISE EXCEPTION 'metric % is numeric; use telemetry.sample_metric(...)', p_metric_name;
1889
        WHEN 'state' THEN
1890
            RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
1891
        ELSE
1892
            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
1893
    END CASE;
1894
END;
1895
$$;
1896

            
1897
-- Maintenance utilities.
1898

            
1899
CREATE OR REPLACE FUNCTION telemetry.verify_segments(p_metric_name text)
1900
RETURNS TABLE (
1901
    metric_name text,
1902
    device_id text,
1903
    issue text,
1904
    segment_id bigint,
1905
    related_segment_id bigint,
1906
    start_time timestamptz,
1907
    end_time timestamptz,
1908
    details text
1909
)
1910
LANGUAGE plpgsql
1911
STABLE
1912
AS $$
1913
DECLARE
1914
    v_metric telemetry.metrics%ROWTYPE;
1915
    v_table_sql text;
1916
BEGIN
1917
    v_metric := telemetry.require_metric(p_metric_name);
1918
    v_table_sql := telemetry.metric_table_sql(v_metric.table_name);
1919

            
1920
    RETURN QUERY EXECUTE format(
1921
        'WITH ordered AS (
1922
             SELECT
1923
                 s.segment_id,
1924
                 s.device_id,
1925
                 s.policy_id,
1926
                 s.start_time,
1927
                 s.end_time,
1928
                 lag(s.segment_id) OVER (
1929
                     PARTITION BY s.device_id
1930
                     ORDER BY s.start_time, s.segment_id
1931
                 ) AS prev_segment_id,
1932
                 lag(s.end_time) OVER (
1933
                     PARTITION BY s.device_id
1934
                     ORDER BY s.start_time, s.segment_id
1935
                 ) AS prev_end_time,
1936
                 lag(s.policy_id) OVER (
1937
                     PARTITION BY s.device_id
1938
                     ORDER BY s.start_time, s.segment_id
1939
                 ) AS prev_policy_id
1940
             FROM %s AS s
1941
         ),
1942
         open_counts AS (
1943
             SELECT
1944
                 s.device_id,
1945
                 count(*)::integer AS open_count
1946
             FROM %s AS s
1947
             WHERE s.end_time IS NULL
1948
             GROUP BY s.device_id
1949
             HAVING count(*) > 1
1950
         )
1951
         SELECT
1952
             $1::text AS metric_name,
1953
             o.device_id,
1954
             ''invalid_interval''::text AS issue,
1955
             o.segment_id,
1956
             NULL::bigint AS related_segment_id,
1957
             o.start_time,
1958
             o.end_time,
1959
             ''end_time must be greater than start_time''::text AS details
1960
         FROM ordered AS o
1961
         WHERE o.end_time IS NOT NULL
1962
           AND o.end_time <= o.start_time
1963

            
1964
         UNION ALL
1965

            
1966
         SELECT
1967
             $1::text,
1968
             oc.device_id,
1969
             ''multiple_open_segments''::text,
1970
             NULL::bigint,
1971
             NULL::bigint,
1972
             NULL::timestamptz,
1973
             NULL::timestamptz,
1974
             format(''%%s open segments found'', oc.open_count)
1975
         FROM open_counts AS oc
1976

            
1977
         UNION ALL
1978

            
1979
         SELECT
1980
             $1::text,
1981
             o.device_id,
1982
             ''overlap''::text,
1983
             o.segment_id,
1984
             o.prev_segment_id,
1985
             o.start_time,
1986
             o.end_time,
1987
             format(''previous segment ends at %%s'', o.prev_end_time)
1988
         FROM ordered AS o
1989
         WHERE o.prev_end_time IS NOT NULL
1990
           AND o.prev_end_time > o.start_time
1991

            
1992
         UNION ALL
1993

            
1994
         SELECT
1995
             $1::text,
1996
             o.device_id,
1997
             ''unexpected_gap''::text,
1998
             o.segment_id,
1999
             o.prev_segment_id,
2000
             o.start_time,
2001
             o.end_time,
2002
             format(
2003
                 ''stored gap of %%s violates expected continuity (max_sampling_interval=%%s)'',
2004
                 o.start_time - o.prev_end_time,
2005
                 mp.max_sampling_interval
2006
             )
2007
         FROM ordered AS o
2008
         JOIN telemetry.metric_policies AS mp
2009
           ON mp.policy_id = o.prev_policy_id
2010
         WHERE o.prev_end_time IS NOT NULL
2011
           AND o.start_time > o.prev_end_time
2012

            
2013
         ORDER BY device_id, start_time NULLS FIRST, segment_id NULLS FIRST',
2014
        v_table_sql,
2015
        v_table_sql
2016
    )
2017
    USING p_metric_name;
2018
END;
2019
$$;
2020

            
2021
CREATE OR REPLACE FUNCTION telemetry.compact_segments(
2022
    p_metric_name text,
2023
    p_before timestamptz
2024
)
2025
RETURNS TABLE (
2026
    device_id text,
2027
    kept_segment_id bigint,
2028
    merged_segments integer,
2029
    start_time timestamptz,
2030
    end_time timestamptz,
2031
    total_samples integer
2032
)
2033
LANGUAGE plpgsql
2034
AS $$
2035
DECLARE
2036
    v_metric telemetry.metrics%ROWTYPE;
2037
    v_table_sql text;
2038
    v_merge record;
2039
BEGIN
2040
    IF p_before IS NULL THEN
2041
        RAISE EXCEPTION 'p_before is required';
2042
    END IF;
2043

            
2044
    v_metric := telemetry.require_metric(p_metric_name);
2045
    v_table_sql := telemetry.metric_table_sql(v_metric.table_name);
2046

            
2047
    FOR v_merge IN EXECUTE format(
2048
        'WITH candidates AS (
2049
             SELECT
2050
                 s.segment_id,
2051
                 s.device_id,
2052
                 s.start_time,
2053
                 s.end_time,
2054
                 s.samples_count,
2055
                 s.policy_id,
2056
                 s.value,
2057
                 lag(s.end_time) OVER (
2058
                     PARTITION BY s.device_id
2059
                     ORDER BY s.start_time, s.segment_id
2060
                 ) AS prev_end_time,
2061
                 lag(s.policy_id) OVER (
2062
                     PARTITION BY s.device_id
2063
                     ORDER BY s.start_time, s.segment_id
2064
                 ) AS prev_policy_id,
2065
                 lag(s.value) OVER (
2066
                     PARTITION BY s.device_id
2067
                     ORDER BY s.start_time, s.segment_id
2068
                 ) AS prev_value
2069
             FROM %s AS s
2070
             WHERE s.end_time IS NOT NULL
2071
               AND s.end_time <= $1
2072
         ),
2073
         grouped AS (
2074
             SELECT
2075
                 c.*,
2076
                 sum(
2077
                     CASE
2078
                        WHEN c.prev_end_time IS NULL
2079
                          OR c.start_time <> c.prev_end_time
2080
                          OR c.policy_id IS DISTINCT FROM c.prev_policy_id
2081
                          OR c.value IS DISTINCT FROM c.prev_value
2082
                        THEN 1
2083
                        ELSE 0
2084
                     END
2085
                 ) OVER (
2086
                     PARTITION BY c.device_id
2087
                     ORDER BY c.start_time, c.segment_id
2088
                 ) AS grp
2089
             FROM candidates AS c
2090
         ),
2091
         aggregates AS (
2092
             SELECT
2093
                 g.device_id,
2094
                 (array_agg(g.segment_id ORDER BY g.start_time, g.segment_id))[1] AS keep_segment_id,
2095
                 array_remove(
2096
                     array_agg(g.segment_id ORDER BY g.start_time, g.segment_id),
2097
                     (array_agg(g.segment_id ORDER BY g.start_time, g.segment_id))[1]
2098
                 ) AS delete_segment_ids,
2099
                 min(g.start_time) AS merged_start_time,
2100
                 max(g.end_time) AS merged_end_time,
2101
                 sum(g.samples_count)::integer AS total_samples,
2102
                 count(*)::integer AS merged_segments
2103
             FROM grouped AS g
2104
             GROUP BY g.device_id, g.grp
2105
             HAVING count(*) > 1
2106
         )
2107
         SELECT
2108
             a.device_id,
2109
             a.keep_segment_id,
2110
             a.delete_segment_ids,
2111
             a.merged_segments,
2112
             a.merged_start_time,
2113
             a.merged_end_time,
2114
             a.total_samples
2115
         FROM aggregates AS a
2116
         ORDER BY a.device_id, a.merged_start_time',
2117
        v_table_sql
2118
    )
2119
    USING p_before
2120
    LOOP
2121
        EXECUTE format(
2122
            'DELETE FROM %s
2123
             WHERE segment_id = ANY($1)',
2124
            v_table_sql
2125
        )
2126
        USING v_merge.delete_segment_ids;
2127

            
2128
        EXECUTE format(
2129
            'UPDATE %s
2130
             SET end_time = $1,
2131
                 samples_count = $2
2132
             WHERE segment_id = $3',
2133
            v_table_sql
2134
        )
2135
        USING v_merge.merged_end_time, v_merge.total_samples, v_merge.keep_segment_id;
2136

            
2137
        device_id := v_merge.device_id;
2138
        kept_segment_id := v_merge.keep_segment_id;
2139
        merged_segments := v_merge.merged_segments;
2140
        start_time := v_merge.merged_start_time;
2141
        end_time := v_merge.merged_end_time;
2142
        total_samples := v_merge.total_samples;
2143
        RETURN NEXT;
2144
    END LOOP;
2145

            
2146
    RETURN;
2147
END;
2148
$$;
2149

            
2150
CREATE OR REPLACE FUNCTION telemetry.inactive_devices(p_threshold interval)
2151
RETURNS TABLE (
2152
    device_pk bigint,
2153
    device_id text,
2154
    device_type text,
2155
    location text,
2156
    last_seen timestamptz,
2157
    inactive_for interval
2158
)
2159
LANGUAGE sql
2160
STABLE
2161
AS $$
2162
    SELECT
2163
        d.device_pk,
2164
        d.device_id,
2165
        d.device_type,
2166
        d.location,
2167
        d.last_seen,
2168
        now() - d.last_seen AS inactive_for
2169
    FROM telemetry.devices AS d
2170
    WHERE d.last_seen IS NOT NULL
2171
      AND now() - d.last_seen > $1
2172
    ORDER BY inactive_for DESC, d.device_id;
2173
$$;
2174

            
2175
-- Example metric registrations.
2176

            
2177
SELECT telemetry.register_numeric_metric(
2178
    p_metric_name => 'ambient_temperature',
2179
    p_table_name => 'ambient_temperature_segments',
2180
    p_domain_name => 'environmental',
2181
    p_epsilon => 0.05,
2182
    p_min_value => -40,
2183
    p_max_value => 60,
2184
    p_rounding_precision => 0.1,
2185
    p_max_sampling_interval => '5 minutes',
2186
    p_allow_null => true
2187
);
2188

            
2189
SELECT telemetry.register_numeric_metric(
2190
    p_metric_name => 'cpu_temperature',
2191
    p_table_name => 'cpu_temperature_segments',
2192
    p_domain_name => 'system',
2193
    p_epsilon => 0.5,
2194
    p_min_value => 0,
2195
    p_max_value => 120,
2196
    p_rounding_precision => 1,
2197
    p_max_sampling_interval => '10 seconds',
2198
    p_allow_null => true
2199
);
2200

            
2201
SELECT telemetry.register_numeric_metric(
2202
    p_metric_name => 'humidity',
2203
    p_table_name => 'humidity_segments',
2204
    p_domain_name => 'environmental',
2205
    p_epsilon => 0.5,
2206
    p_min_value => 0,
2207
    p_max_value => 100,
2208
    p_rounding_precision => 0.1,
2209
    p_max_sampling_interval => '5 minutes',
2210
    p_allow_null => true
2211
);
2212
COMMIT;