telemetrydatabase / schema / telemetry_schema.sql
1 contributor
2212 lines | 66.699kb
BEGIN;

-- Extensions.

CREATE SCHEMA telemetry;
CREATE EXTENSION btree_gist;

-- Enums.

-- 'state' is reserved for future implementation of discrete enumerated
-- machine states such as thermostat_mode, pump_state, hvac_mode, or
-- door_state. Values will likely be stored as smallint and use the
-- same segment engine as boolean metrics, but with exact comparison
-- over enumerated values.
CREATE TYPE telemetry.metric_type_enum AS ENUM ('numeric', 'boolean', 'state');

CREATE TYPE telemetry.comparison_mode_enum AS ENUM ('epsilon', 'exact');

-- Registry utility functions.

CREATE OR REPLACE FUNCTION telemetry.normalize_metric_table_name(p_table_name text)
RETURNS text
LANGUAGE plpgsql
IMMUTABLE
AS $$
DECLARE
    v_parts text[];
BEGIN
    IF p_table_name IS NULL OR btrim(p_table_name) = '' THEN
        RAISE EXCEPTION 'table_name is required';
    END IF;

    v_parts := pg_catalog.parse_ident(p_table_name, false);

    CASE COALESCE(array_length(v_parts, 1), 0)
        WHEN 1 THEN
            RETURN v_parts[1];
        WHEN 2 THEN
            IF v_parts[1] <> 'telemetry' THEN
                RAISE EXCEPTION 'metric tables must live in schema telemetry: %', p_table_name;
            END IF;

            RETURN v_parts[2];
        ELSE
            RAISE EXCEPTION 'invalid metric table reference: %', p_table_name;
    END CASE;
END;
$$;

CREATE OR REPLACE FUNCTION telemetry.metric_table_sql(p_table_name text)
RETURNS text
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
    v_table_name text := telemetry.normalize_metric_table_name(p_table_name);
    v_table_sql text := format('telemetry.%I', v_table_name);
BEGIN
    IF to_regclass(v_table_sql) IS NULL THEN
        RAISE EXCEPTION 'could not resolve metric table %', v_table_sql;
    END IF;

    RETURN v_table_sql;
END;
$$;

-- Core registry tables.

CREATE TABLE telemetry.metrics (
    metric_pk bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    metric_name text NOT NULL UNIQUE,
    table_name text NOT NULL UNIQUE,
    domain_name text NOT NULL DEFAULT 'generic',
    metric_type telemetry.metric_type_enum NOT NULL,
    comparison_mode telemetry.comparison_mode_enum NOT NULL,
    epsilon double precision,
    min_value double precision,
    max_value double precision,
    rounding_precision double precision,
    allow_null boolean NOT NULL DEFAULT true,
    max_sampling_interval interval NOT NULL,
    created_at timestamptz NOT NULL DEFAULT now(),
    updated_at timestamptz NOT NULL DEFAULT now(),
    CHECK (min_value IS NULL OR max_value IS NULL OR min_value <= max_value),
    CONSTRAINT metrics_policy_shape_check
        CHECK (
            (metric_type = 'numeric'::telemetry.metric_type_enum
             AND comparison_mode = 'epsilon'::telemetry.comparison_mode_enum
             AND epsilon IS NOT NULL
             AND rounding_precision IS NOT NULL)
            OR
            (metric_type IN (
                 'boolean'::telemetry.metric_type_enum,
                 'state'::telemetry.metric_type_enum
             )
             AND comparison_mode = 'exact'::telemetry.comparison_mode_enum
             AND epsilon IS NULL
             AND min_value IS NULL
             AND max_value IS NULL
             AND rounding_precision IS NULL)
        ),
    CONSTRAINT metrics_table_name_check
        CHECK (table_name = telemetry.normalize_metric_table_name(table_name))
);

-- Policy tables.

CREATE TABLE telemetry.metric_policies (
    policy_id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    metric_name text NOT NULL
        REFERENCES telemetry.metrics(metric_name)
        ON UPDATE CASCADE
        ON DELETE CASCADE,
    valid_from timestamptz NOT NULL,
    comparison_mode telemetry.comparison_mode_enum NOT NULL,
    epsilon double precision,
    min_value double precision,
    max_value double precision,
    rounding_precision double precision,
    allow_null boolean NOT NULL,
    max_sampling_interval interval NOT NULL,
    created_at timestamptz NOT NULL DEFAULT now(),
    UNIQUE (metric_name, valid_from)
);

CREATE TABLE telemetry.metric_retention_policies (
    metric_name text PRIMARY KEY
        REFERENCES telemetry.metrics(metric_name)
        ON UPDATE CASCADE
        ON DELETE CASCADE,
    raw_retention interval NULL,
    compact_after interval NULL
);

-- Device registry tables.

CREATE TABLE telemetry.devices (
    device_pk bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    device_id text NOT NULL UNIQUE,
    device_type text NULL,
    location text NULL,
    metadata jsonb NULL,
    last_seen timestamptz NULL,
    created_at timestamptz NOT NULL DEFAULT now(),
    updated_at timestamptz NOT NULL DEFAULT now()
);

CREATE TABLE telemetry.metric_device_watermarks (
    metric_name text NOT NULL
        REFERENCES telemetry.metrics(metric_name)
        ON UPDATE CASCADE
        ON DELETE CASCADE,
    device_id text NOT NULL
        REFERENCES telemetry.devices(device_id)
        ON UPDATE CASCADE
        ON DELETE CASCADE,
    last_observed_at timestamptz NOT NULL,
    PRIMARY KEY (metric_name, device_id)
);

-- Runtime helpers.

CREATE OR REPLACE FUNCTION telemetry.round_to_increment(
    p_value double precision,
    p_increment double precision
)
RETURNS double precision
LANGUAGE sql
IMMUTABLE
STRICT
AS $$
    SELECT CASE
        WHEN p_increment <= 0 THEN p_value
        ELSE round(p_value / p_increment) * p_increment
    END;
$$;

CREATE OR REPLACE FUNCTION telemetry.ensure_device(p_device_id text)
RETURNS void
LANGUAGE sql
AS $$
    INSERT INTO telemetry.devices (device_id)
    VALUES ($1)
    ON CONFLICT (device_id) DO NOTHING;
$$;

CREATE OR REPLACE FUNCTION telemetry.touch_device_last_seen(
    p_device_id text,
    p_observed_at timestamptz
)
RETURNS void
LANGUAGE sql
AS $$
    UPDATE telemetry.devices
    SET last_seen = CASE
        WHEN last_seen IS NULL OR last_seen < $2 THEN $2
        ELSE last_seen
    END
    WHERE device_id = $1;
$$;

CREATE OR REPLACE FUNCTION telemetry.require_device(p_device_id text)
RETURNS telemetry.devices
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
    v_device telemetry.devices%ROWTYPE;
BEGIN
    SELECT *
    INTO v_device
    FROM telemetry.devices AS d
    WHERE d.device_id = p_device_id;

    IF NOT FOUND THEN
        RAISE EXCEPTION 'unknown device: %', p_device_id;
    END IF;

    RETURN v_device;
END;
$$;

CREATE OR REPLACE FUNCTION telemetry.lock_key_from_text(p_value text)
RETURNS integer
LANGUAGE sql
IMMUTABLE
STRICT
AS $$
    SELECT (hashtextextended($1, 0) % 2147483647)::integer;
$$;

-- Segment template and table creation.

CREATE OR REPLACE FUNCTION telemetry.create_segment_table(
    p_table_name text,
    p_value_type text
)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
    v_base_table_name text := telemetry.normalize_metric_table_name(p_table_name);
    v_table_name text := format('telemetry.%I', v_base_table_name);
    v_metric_name text;
    v_initial_policy_id bigint;
    v_open_idx text := format('%s_open_segment_uidx', v_base_table_name);
    v_start_idx text := format('%s_device_start_cover_idx', v_base_table_name);
    v_device_period_gist_idx text := format('%s_device_period_gist', v_base_table_name);
    v_period_excl text := format('%s_device_period_excl', v_base_table_name);
    v_device_pk_fk text := format('%s_device_pk_fkey', v_base_table_name);
    v_device_fk text := format('%s_device_id_fkey', v_base_table_name);
    v_policy_fk text := format('%s_policy_id_fkey', v_base_table_name);
    v_samples_check text := format('%s_samples_count_check', v_base_table_name);
    v_end_time_check text := format('%s_check', v_base_table_name);
BEGIN
    IF p_value_type NOT IN ('double precision', 'boolean', 'smallint') THEN
        RAISE EXCEPTION 'unsupported segment value type: %', p_value_type;
    END IF;

    SELECT m.metric_name
    INTO v_metric_name
    FROM telemetry.metrics AS m
    WHERE m.table_name = v_base_table_name;

    IF v_metric_name IS NULL THEN
        RAISE EXCEPTION
            'metric metadata must exist before creating segment table %',
            v_base_table_name;
    END IF;

    SELECT mp.policy_id
    INTO v_initial_policy_id
    FROM telemetry.metric_policies AS mp
    WHERE mp.metric_name = v_metric_name
      AND mp.valid_from = '-infinity'::timestamptz;

    IF v_initial_policy_id IS NULL THEN
        RAISE EXCEPTION
            'initial policy row is missing for metric %',
            v_metric_name;
    END IF;

    EXECUTE format(
        'CREATE TABLE IF NOT EXISTS %s (
            segment_id bigserial PRIMARY KEY,
            device_pk bigint NOT NULL,
            device_id text NOT NULL,
            start_time timestamptz NOT NULL,
            end_time timestamptz NULL,
            value %s NULL,
            -- samples_count = 0 is intentional for synthetic gap segments inserted
            -- during lazy gap detection; those intervals represent missing data, not samples.
            samples_count integer NOT NULL DEFAULT 1
                CONSTRAINT %I CHECK (samples_count >= 0),
            policy_id bigint NOT NULL,
            segment_period tstzrange GENERATED ALWAYS AS (
                tstzrange(start_time, COALESCE(end_time, ''infinity''::timestamptz), ''[)'')
            ) STORED,
            CONSTRAINT %I CHECK (end_time IS NULL OR end_time > start_time),
            CONSTRAINT %I
                FOREIGN KEY (device_pk)
                REFERENCES telemetry.devices(device_pk)
                ON UPDATE CASCADE
                ON DELETE CASCADE,
            CONSTRAINT %I
                FOREIGN KEY (device_id)
                REFERENCES telemetry.devices(device_id)
                ON UPDATE CASCADE
                ON DELETE CASCADE,
            CONSTRAINT %I
                FOREIGN KEY (policy_id)
                REFERENCES telemetry.metric_policies(policy_id)
        ) WITH (
            fillfactor = 90,
            autovacuum_vacuum_scale_factor = 0.02,
            autovacuum_analyze_scale_factor = 0.01
        )',
        v_table_name,
        p_value_type,
        v_samples_check,
        v_end_time_check,
        v_device_pk_fk,
        v_device_fk,
        v_policy_fk
    );

    EXECUTE format(
        'CREATE UNIQUE INDEX IF NOT EXISTS %I
         ON %s (device_id)
         WHERE end_time IS NULL',
        v_open_idx,
        v_table_name
    );

    EXECUTE format(
        'CREATE INDEX IF NOT EXISTS %I
         ON %s (device_id, start_time DESC)
         INCLUDE (end_time, value, samples_count)',
        v_start_idx,
        v_table_name
    );

    EXECUTE format(
        'CREATE INDEX IF NOT EXISTS %I
         ON %s
         USING gist (device_id, segment_period)',
        v_device_period_gist_idx,
        v_table_name
    );

    BEGIN
        EXECUTE format(
            'ALTER TABLE %s
             ADD CONSTRAINT %I
             EXCLUDE USING gist (
                 device_id WITH =,
                 segment_period WITH &&
             )',
            v_table_name,
            v_period_excl
        );
    EXCEPTION
        WHEN duplicate_object THEN
            NULL;
        WHEN duplicate_table THEN
            NULL;
    END;
END;
$$;


-- Metric registration.

CREATE OR REPLACE FUNCTION telemetry.register_numeric_metric(
    p_metric_name text,
    p_table_name text,
    p_domain_name text,
    p_epsilon double precision,
    p_min_value double precision,
    p_max_value double precision,
    p_rounding_precision double precision,
    p_max_sampling_interval interval,
    p_allow_null boolean DEFAULT true
)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
    v_table_name text := telemetry.normalize_metric_table_name(p_table_name);
BEGIN
    INSERT INTO telemetry.metrics (
        metric_name,
        table_name,
        domain_name,
        metric_type,
        comparison_mode,
        epsilon,
        min_value,
        max_value,
        rounding_precision,
        max_sampling_interval,
        allow_null
    )
    VALUES (
        p_metric_name,
        v_table_name,
        p_domain_name,
        'numeric',
        'epsilon',
        p_epsilon,
        p_min_value,
        p_max_value,
        p_rounding_precision,
        p_max_sampling_interval,
        p_allow_null
    )
    ON CONFLICT (metric_name) DO UPDATE
    SET table_name = EXCLUDED.table_name,
        domain_name = EXCLUDED.domain_name,
        metric_type = EXCLUDED.metric_type,
        comparison_mode = EXCLUDED.comparison_mode,
        epsilon = EXCLUDED.epsilon,
        min_value = EXCLUDED.min_value,
        max_value = EXCLUDED.max_value,
        rounding_precision = EXCLUDED.rounding_precision,
        max_sampling_interval = EXCLUDED.max_sampling_interval,
        allow_null = EXCLUDED.allow_null,
        updated_at = now();

    INSERT INTO telemetry.metric_policies (
        metric_name,
        valid_from,
        comparison_mode,
        epsilon,
        min_value,
        max_value,
        rounding_precision,
        allow_null,
        max_sampling_interval
    )
    VALUES (
        p_metric_name,
        '-infinity'::timestamptz,
        'epsilon',
        p_epsilon,
        p_min_value,
        p_max_value,
        p_rounding_precision,
        p_allow_null,
        p_max_sampling_interval
    )
    ON CONFLICT (metric_name, valid_from) DO NOTHING;

    PERFORM telemetry.create_segment_table(v_table_name, 'double precision');
    PERFORM telemetry.metric_table_sql(v_table_name);
END;
$$;

CREATE OR REPLACE FUNCTION telemetry.register_boolean_metric(
    p_metric_name text,
    p_table_name text,
    p_domain_name text,
    p_max_sampling_interval interval,
    p_allow_null boolean DEFAULT true
)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
    v_table_name text := telemetry.normalize_metric_table_name(p_table_name);
BEGIN
    INSERT INTO telemetry.metrics (
        metric_name,
        table_name,
        domain_name,
        metric_type,
        comparison_mode,
        epsilon,
        min_value,
        max_value,
        rounding_precision,
        max_sampling_interval,
        allow_null
    )
    VALUES (
        p_metric_name,
        v_table_name,
        p_domain_name,
        'boolean',
        'exact',
        NULL,
        NULL,
        NULL,
        NULL,
        p_max_sampling_interval,
        p_allow_null
    )
    ON CONFLICT (metric_name) DO UPDATE
    SET table_name = EXCLUDED.table_name,
        domain_name = EXCLUDED.domain_name,
        metric_type = EXCLUDED.metric_type,
        comparison_mode = EXCLUDED.comparison_mode,
        epsilon = EXCLUDED.epsilon,
        min_value = EXCLUDED.min_value,
        max_value = EXCLUDED.max_value,
        rounding_precision = EXCLUDED.rounding_precision,
        max_sampling_interval = EXCLUDED.max_sampling_interval,
        allow_null = EXCLUDED.allow_null,
        updated_at = now();

    INSERT INTO telemetry.metric_policies (
        metric_name,
        valid_from,
        comparison_mode,
        epsilon,
        min_value,
        max_value,
        rounding_precision,
        allow_null,
        max_sampling_interval
    )
    VALUES (
        p_metric_name,
        '-infinity'::timestamptz,
        'exact',
        NULL,
        NULL,
        NULL,
        NULL,
        p_allow_null,
        p_max_sampling_interval
    )
    ON CONFLICT (metric_name, valid_from) DO NOTHING;

    PERFORM telemetry.create_segment_table(v_table_name, 'boolean');
    PERFORM telemetry.metric_table_sql(v_table_name);
END;
$$;

-- Metric and policy lookup.

CREATE OR REPLACE FUNCTION telemetry.require_metric(p_metric_name text)
RETURNS telemetry.metrics
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
    v_metric telemetry.metrics%ROWTYPE;
BEGIN
    SELECT *
    INTO v_metric
    FROM telemetry.metrics AS m
    WHERE m.metric_name = p_metric_name;

    IF NOT FOUND THEN
        RAISE EXCEPTION 'unknown metric: %', p_metric_name;
    END IF;

    RETURN v_metric;
END;
$$;

CREATE OR REPLACE FUNCTION telemetry.require_metric_policy(
    p_metric_name text,
    p_observed_at timestamptz
)
RETURNS telemetry.metric_policies
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
    v_policy telemetry.metric_policies%ROWTYPE;
BEGIN
    SELECT *
    INTO v_policy
    FROM telemetry.metric_policies AS mp
    WHERE mp.metric_name = p_metric_name
      AND mp.valid_from <= p_observed_at
    ORDER BY mp.valid_from DESC
    LIMIT 1;

    IF NOT FOUND THEN
        RAISE EXCEPTION
            'no active policy for metric % at %',
            p_metric_name,
            p_observed_at;
    END IF;

    RETURN v_policy;
END;
$$;

CREATE OR REPLACE FUNCTION telemetry.add_metric_policy(
    p_metric_name text,
    p_valid_from timestamptz,
    p_comparison_mode telemetry.comparison_mode_enum,
    p_epsilon double precision,
    p_min_value double precision,
    p_max_value double precision,
    p_rounding_precision double precision,
    p_max_sampling_interval interval,
    p_allow_null boolean
)
RETURNS bigint
LANGUAGE plpgsql
AS $$
DECLARE
    v_metric telemetry.metrics%ROWTYPE;
    v_policy_id bigint;
BEGIN
    IF p_valid_from IS NULL THEN
        RAISE EXCEPTION 'valid_from is required';
    END IF;

    IF p_max_sampling_interval IS NULL THEN
        RAISE EXCEPTION 'max_sampling_interval is required';
    END IF;

    v_metric := telemetry.require_metric(p_metric_name);

    IF p_min_value IS NOT NULL
       AND p_max_value IS NOT NULL
       AND p_min_value > p_max_value THEN
        RAISE EXCEPTION 'min_value % exceeds max_value % for metric %',
            p_min_value,
            p_max_value,
            p_metric_name;
    END IF;

    CASE v_metric.metric_type
        WHEN 'numeric' THEN
            IF p_comparison_mode <> 'epsilon'::telemetry.comparison_mode_enum THEN
                RAISE EXCEPTION
                    'numeric metric % requires comparison_mode epsilon',
                    p_metric_name;
            END IF;

            IF p_epsilon IS NULL OR p_rounding_precision IS NULL THEN
                RAISE EXCEPTION
                    'numeric metric % requires epsilon and rounding_precision',
                    p_metric_name;
            END IF;
        WHEN 'boolean', 'state' THEN
            IF p_comparison_mode <> 'exact'::telemetry.comparison_mode_enum THEN
                RAISE EXCEPTION
                    'metric % with type % requires comparison_mode exact',
                    p_metric_name,
                    v_metric.metric_type;
            END IF;

            IF p_epsilon IS NOT NULL
               OR p_min_value IS NOT NULL
               OR p_max_value IS NOT NULL
               OR p_rounding_precision IS NOT NULL THEN
                RAISE EXCEPTION
                    'metric % with type % does not accept epsilon/min/max/rounding_precision in policies',
                    p_metric_name,
                    v_metric.metric_type;
            END IF;
        ELSE
            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
    END CASE;

    IF EXISTS (
        SELECT 1
        FROM telemetry.metric_policies AS mp
        WHERE mp.metric_name = p_metric_name
          AND mp.valid_from = p_valid_from
    ) THEN
        RAISE EXCEPTION
            'policy already exists for metric % at %',
            p_metric_name,
            p_valid_from;
    END IF;

    INSERT INTO telemetry.metric_policies (
        metric_name,
        valid_from,
        comparison_mode,
        epsilon,
        min_value,
        max_value,
        rounding_precision,
        allow_null,
        max_sampling_interval
    )
    VALUES (
        p_metric_name,
        p_valid_from,
        p_comparison_mode,
        p_epsilon,
        p_min_value,
        p_max_value,
        p_rounding_precision,
        p_allow_null,
        p_max_sampling_interval
    )
    RETURNING policy_id
    INTO v_policy_id;

    UPDATE telemetry.metrics AS m
    SET comparison_mode = p_comparison_mode,
        epsilon = p_epsilon,
        min_value = p_min_value,
        max_value = p_max_value,
        rounding_precision = p_rounding_precision,
        allow_null = p_allow_null,
        max_sampling_interval = p_max_sampling_interval,
        updated_at = now()
    WHERE m.metric_name = p_metric_name
      AND NOT EXISTS (
          SELECT 1
          FROM telemetry.metric_policies AS later
          WHERE later.metric_name = p_metric_name
            AND later.valid_from > p_valid_from
      );

    RETURN v_policy_id;
END;
$$;

-- Ingestion functions.

CREATE OR REPLACE FUNCTION telemetry.assert_append_only(
    p_metric_name text,
    p_device_id text,
    p_observed_at timestamptz
)
RETURNS timestamptz
LANGUAGE plpgsql
AS $$
DECLARE
    v_last_observed_at timestamptz;
BEGIN
    SELECT w.last_observed_at
    INTO v_last_observed_at
    FROM telemetry.metric_device_watermarks AS w
    WHERE w.metric_name = p_metric_name
      AND w.device_id = p_device_id;

    IF FOUND AND p_observed_at <= v_last_observed_at THEN
        RAISE EXCEPTION
            'out-of-order measurement for metric % device %: incoming % <= last observed %',
            p_metric_name,
            p_device_id,
            p_observed_at,
            v_last_observed_at;
    END IF;

    RETURN v_last_observed_at;
END;
$$;

CREATE OR REPLACE FUNCTION telemetry.bump_watermark(
    p_metric_name text,
    p_device_id text,
    p_observed_at timestamptz
)
RETURNS void
LANGUAGE sql
AS $$
    INSERT INTO telemetry.metric_device_watermarks (
        metric_name,
        device_id,
        last_observed_at
    )
    VALUES ($1, $2, $3)
    ON CONFLICT ON CONSTRAINT metric_device_watermarks_pkey DO UPDATE
    SET last_observed_at = EXCLUDED.last_observed_at;
$$;

CREATE OR REPLACE FUNCTION telemetry.ingest_segment(
    p_table_name text,
    p_metric_type telemetry.metric_type_enum,
    p_comparison_mode telemetry.comparison_mode_enum,
    p_policy_id bigint,
    p_policy_valid_from timestamptz,
    p_device_pk bigint,
    p_device_id text,
    p_value anyelement,
    p_observed_at timestamptz,
    p_last_observed_at timestamptz,
    p_epsilon double precision,
    p_max_sampling_interval interval,
    p_allow_null boolean
)
RETURNS text
LANGUAGE plpgsql
AS $$
DECLARE
    v_current record;
    v_has_current boolean;
    v_gap_start timestamptz;
    v_rowcount bigint;
    v_equal boolean;
    v_policy_changed boolean;
    v_effective_last_observed_at timestamptz := p_last_observed_at;
    v_table_name text := telemetry.normalize_metric_table_name(p_table_name);
    v_table_sql text := telemetry.metric_table_sql(p_table_name);
BEGIN
    EXECUTE format(
        'SELECT segment_id, device_pk, device_id, start_time, end_time, value, samples_count, policy_id
         FROM %s
         WHERE device_id = $1
           AND end_time IS NULL
         FOR UPDATE',
        v_table_sql
    )
    INTO v_current
    USING p_device_id;

    GET DIAGNOSTICS v_rowcount = ROW_COUNT;
    v_has_current := v_rowcount > 0;

    IF p_value IS NULL AND NOT p_allow_null THEN
        RAISE EXCEPTION 'metric table % does not allow explicit NULL measurements', v_table_name;
    END IF;

    IF NOT v_has_current THEN
        EXECUTE format(
            'INSERT INTO %s (
                device_pk,
                device_id,
                start_time,
                end_time,
                value,
                samples_count,
                policy_id
             )
             VALUES ($1, $2, $3, NULL, $4, 1, $5)',
            v_table_sql
        )
        USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id;

        RETURN CASE WHEN p_value IS NULL THEN 'opened_null' ELSE 'opened' END;
    END IF;

    v_policy_changed := v_current.policy_id IS DISTINCT FROM p_policy_id;

    IF v_policy_changed
       AND p_policy_valid_from > v_current.start_time THEN
        EXECUTE format(
            'UPDATE %s
             SET end_time = $1
             WHERE segment_id = $2',
            v_table_sql
        )
        USING p_policy_valid_from, v_current.segment_id;

        EXECUTE format(
            'INSERT INTO %s (
                device_pk,
                device_id,
                start_time,
                end_time,
                value,
                samples_count,
                policy_id
             )
             -- samples_count = 0 marks a synthetic continuation across a policy boundary.
             VALUES ($1, $2, $3, NULL, $4, 0, $5)
             RETURNING segment_id, device_pk, device_id, start_time, end_time, value, samples_count, policy_id',
            v_table_sql
        )
        INTO v_current
        USING v_current.device_pk, v_current.device_id, p_policy_valid_from, v_current.value, p_policy_id;

        v_policy_changed := false;
        v_effective_last_observed_at := GREATEST(
            COALESCE(p_last_observed_at, p_policy_valid_from),
            p_policy_valid_from
        );
    END IF;

    IF v_effective_last_observed_at IS NOT NULL
       AND p_observed_at - v_effective_last_observed_at > p_max_sampling_interval
       AND v_current.value IS NOT NULL THEN
        v_gap_start := v_effective_last_observed_at + p_max_sampling_interval;

        EXECUTE format(
            'UPDATE %s
             SET end_time = $1
             WHERE segment_id = $2',
            v_table_sql
        )
        USING v_gap_start, v_current.segment_id;

        IF p_value IS NULL THEN
            EXECUTE format(
                'INSERT INTO %s (
                    device_pk,
                    device_id,
                    start_time,
                    end_time,
                    value,
                    samples_count,
                    policy_id
                 )
                 VALUES ($1, $2, $3, NULL, NULL, 1, $4)',
                v_table_sql
            )
            USING p_device_pk, p_device_id, v_gap_start, p_policy_id;

            RETURN 'gap_to_null';
        END IF;

        EXECUTE format(
            'INSERT INTO %s (
                device_pk,
                device_id,
                start_time,
                end_time,
                value,
                samples_count,
                policy_id
             )
             -- samples_count = 0 marks a historian-generated gap interval.
             VALUES ($1, $2, $3, $4, NULL, 0, $5)',
            v_table_sql
        )
        USING p_device_pk, p_device_id, v_gap_start, p_observed_at, p_policy_id;

        EXECUTE format(
            'INSERT INTO %s (
                device_pk,
                device_id,
                start_time,
                end_time,
                value,
                samples_count,
                policy_id
             )
             VALUES ($1, $2, $3, NULL, $4, 1, $5)',
            v_table_sql
        )
        USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id;

        RETURN 'gap_split';
    END IF;

    IF v_current.samples_count = 0
       AND p_observed_at = v_current.start_time THEN
        IF p_value IS NULL THEN
            EXECUTE format(
                'UPDATE %s
                 SET value = NULL,
                     samples_count = 1
                 WHERE segment_id = $1',
                v_table_sql
            )
            USING v_current.segment_id;

            RETURN CASE
                WHEN v_current.value IS NULL THEN 'extended_null'
                ELSE 'value_to_null'
            END;
        END IF;

        IF v_current.value IS NULL THEN
            EXECUTE format(
                'UPDATE %s
                 SET value = $1,
                     samples_count = 1
                 WHERE segment_id = $2',
                v_table_sql
            )
            USING p_value, v_current.segment_id;

            RETURN 'null_to_value';
        END IF;

        CASE p_comparison_mode
            WHEN 'epsilon' THEN
                EXECUTE
                    'SELECT abs($1::double precision - $2::double precision) <= $3'
                INTO v_equal
                USING v_current.value, p_value, p_epsilon;
            WHEN 'exact' THEN
                EXECUTE
                    'SELECT $1 IS NOT DISTINCT FROM $2'
                INTO v_equal
                USING v_current.value, p_value;
            ELSE
                RAISE EXCEPTION 'unsupported comparison_mode % for table %', p_comparison_mode, v_table_name;
        END CASE;

        EXECUTE format(
            'UPDATE %s
             SET value = $1,
                 samples_count = 1
             WHERE segment_id = $2',
            v_table_sql
        )
        USING p_value, v_current.segment_id;

        RETURN CASE
            WHEN v_equal THEN 'extended'
            ELSE 'split'
        END;
    END IF;

    IF v_current.value IS NULL THEN
        IF p_value IS NULL THEN
            IF v_policy_changed THEN
                EXECUTE format(
                    'UPDATE %s
                     SET end_time = $1
                     WHERE segment_id = $2',
                    v_table_sql
                )
                USING p_observed_at, v_current.segment_id;

                EXECUTE format(
                    'INSERT INTO %s (
                        device_pk,
                        device_id,
                        start_time,
                        end_time,
                        value,
                        samples_count,
                        policy_id
                     )
                     VALUES ($1, $2, $3, NULL, NULL, 1, $4)',
                    v_table_sql
                )
                USING p_device_pk, p_device_id, p_observed_at, p_policy_id;

                RETURN 'split';
            END IF;

            EXECUTE format(
                'UPDATE %s
                 SET samples_count = samples_count + 1
                 WHERE segment_id = $1',
                v_table_sql
            )
            USING v_current.segment_id;

            RETURN 'extended_null';
        END IF;

        EXECUTE format(
            'UPDATE %s
             SET end_time = $1
             WHERE segment_id = $2',
            v_table_sql
        )
        USING p_observed_at, v_current.segment_id;

        EXECUTE format(
            'INSERT INTO %s (
                device_pk,
                device_id,
                start_time,
                end_time,
                value,
                samples_count,
                policy_id
             )
             VALUES ($1, $2, $3, NULL, $4, 1, $5)',
            v_table_sql
        )
        USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id;

        RETURN 'null_to_value';
    END IF;

    IF p_value IS NULL THEN
        EXECUTE format(
            'UPDATE %s
             SET end_time = $1
             WHERE segment_id = $2',
            v_table_sql
        )
        USING p_observed_at, v_current.segment_id;

        EXECUTE format(
            'INSERT INTO %s (
                device_pk,
                device_id,
                start_time,
                end_time,
                value,
                samples_count,
                policy_id
             )
             VALUES ($1, $2, $3, NULL, NULL, 1, $4)',
            v_table_sql
        )
        USING p_device_pk, p_device_id, p_observed_at, p_policy_id;

        RETURN 'value_to_null';
    END IF;

    CASE p_comparison_mode
        WHEN 'epsilon' THEN
            EXECUTE
                'SELECT abs($1::double precision - $2::double precision) <= $3'
            INTO v_equal
            USING v_current.value, p_value, p_epsilon;
        WHEN 'exact' THEN
            EXECUTE
                'SELECT $1 IS NOT DISTINCT FROM $2'
            INTO v_equal
            USING v_current.value, p_value;
        ELSE
            RAISE EXCEPTION 'unsupported comparison_mode % for table %', p_comparison_mode, v_table_name;
    END CASE;

    IF v_equal AND NOT v_policy_changed THEN
        EXECUTE format(
            'UPDATE %s
             SET samples_count = samples_count + 1
             WHERE segment_id = $1',
            v_table_sql
        )
        USING v_current.segment_id;

        RETURN 'extended';
    END IF;

    EXECUTE format(
        'UPDATE %s
         SET end_time = $1
         WHERE segment_id = $2',
        v_table_sql
    )
    USING p_observed_at, v_current.segment_id;

    EXECUTE format(
        'INSERT INTO %s (
            device_pk,
            device_id,
            start_time,
            end_time,
            value,
            samples_count,
            policy_id
         )
         VALUES ($1, $2, $3, NULL, $4, 1, $5)',
        v_table_sql
    )
    USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id;

    RETURN 'split';
END;
$$;

CREATE OR REPLACE FUNCTION telemetry.ingest_measurement(
    p_metric_name text,
    p_device_id text,
    p_value double precision,
    p_observed_at timestamptz
)
RETURNS TABLE (
    metric_name text,
    device_id text,
    table_name text,
    normalized_value double precision,
    action text
)
LANGUAGE plpgsql
AS $$
DECLARE
    v_metric telemetry.metrics%ROWTYPE;
    v_policy telemetry.metric_policies%ROWTYPE;
    v_device telemetry.devices%ROWTYPE;
    v_last_observed_at timestamptz;
    v_normalized double precision;
    v_action text;
    v_metric_lock_key integer;
    v_device_lock_key integer;
BEGIN
    IF p_metric_name IS NULL OR btrim(p_metric_name) = '' THEN
        RAISE EXCEPTION 'metric_name is required';
    END IF;

    IF p_device_id IS NULL OR btrim(p_device_id) = '' THEN
        RAISE EXCEPTION 'device_id is required';
    END IF;

    IF p_observed_at IS NULL THEN
        RAISE EXCEPTION 'observed_at is required';
    END IF;

    v_metric := telemetry.require_metric(p_metric_name);
    PERFORM telemetry.ensure_device(p_device_id);
    v_device := telemetry.require_device(p_device_id);

    -- 'state' is reserved for future implementation of discrete enumerated
    -- machine states, likely stored as smallint. It will use the same segment
    -- logic as boolean metrics, but with exact comparison instead of epsilon.
    CASE v_metric.metric_type
        WHEN 'numeric' THEN
            NULL;
        WHEN 'boolean' THEN
            RAISE EXCEPTION 'metric % is boolean; use the boolean overload of ingest_measurement(...)', p_metric_name;
        WHEN 'state' THEN
            RAISE EXCEPTION 'metric % is state; state ingestion is not implemented yet', p_metric_name;
        ELSE
            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
    END CASE;

    v_metric_lock_key := telemetry.lock_key_from_text(p_metric_name);
    v_device_lock_key := telemetry.lock_key_from_text(p_device_id);
    PERFORM pg_advisory_xact_lock(v_metric_lock_key, v_device_lock_key);
    v_last_observed_at := telemetry.assert_append_only(p_metric_name, p_device_id, p_observed_at);
    v_policy := telemetry.require_metric_policy(p_metric_name, p_observed_at);

    IF p_value IS NULL THEN
        v_normalized := NULL;
    ELSE
        v_normalized := telemetry.round_to_increment(p_value, v_policy.rounding_precision);

        IF v_policy.min_value IS NOT NULL AND v_normalized < v_policy.min_value THEN
            RAISE EXCEPTION 'value % is below min_value % for metric %',
                v_normalized, v_policy.min_value, p_metric_name;
        END IF;

        IF v_policy.max_value IS NOT NULL AND v_normalized > v_policy.max_value THEN
            RAISE EXCEPTION 'value % is above max_value % for metric %',
                v_normalized, v_policy.max_value, p_metric_name;
        END IF;
    END IF;

    v_action := telemetry.ingest_segment(
        v_metric.table_name,
        v_metric.metric_type,
        v_policy.comparison_mode,
        v_policy.policy_id,
        v_policy.valid_from,
        v_device.device_pk,
        p_device_id,
        v_normalized,
        p_observed_at,
        v_last_observed_at,
        v_policy.epsilon,
        v_policy.max_sampling_interval,
        v_policy.allow_null
    );

    PERFORM telemetry.bump_watermark(p_metric_name, p_device_id, p_observed_at);
    PERFORM telemetry.touch_device_last_seen(p_device_id, p_observed_at);

    RETURN QUERY
    SELECT
        p_metric_name,
        p_device_id,
        v_metric.table_name,
        v_normalized,
        v_action;
END;
$$;

CREATE OR REPLACE FUNCTION telemetry.ingest_measurement(
    p_metric_name text,
    p_device_id text,
    p_value boolean,
    p_observed_at timestamptz
)
RETURNS TABLE (
    metric_name text,
    device_id text,
    table_name text,
    normalized_value boolean,
    action text
)
LANGUAGE plpgsql
AS $$
DECLARE
    v_metric telemetry.metrics%ROWTYPE;
    v_policy telemetry.metric_policies%ROWTYPE;
    v_device telemetry.devices%ROWTYPE;
    v_last_observed_at timestamptz;
    v_action text;
    v_metric_lock_key integer;
    v_device_lock_key integer;
BEGIN
    IF p_metric_name IS NULL OR btrim(p_metric_name) = '' THEN
        RAISE EXCEPTION 'metric_name is required';
    END IF;

    IF p_device_id IS NULL OR btrim(p_device_id) = '' THEN
        RAISE EXCEPTION 'device_id is required';
    END IF;

    IF p_observed_at IS NULL THEN
        RAISE EXCEPTION 'observed_at is required';
    END IF;

    v_metric := telemetry.require_metric(p_metric_name);
    PERFORM telemetry.ensure_device(p_device_id);
    v_device := telemetry.require_device(p_device_id);

    -- 'state' is reserved for future implementation of discrete enumerated
    -- machine states, likely stored as smallint. It will use the same segment
    -- logic as boolean metrics, but with exact comparison instead of epsilon.
    CASE v_metric.metric_type
        WHEN 'boolean' THEN
            NULL;
        WHEN 'numeric' THEN
            RAISE EXCEPTION 'metric % is numeric; use the numeric overload of ingest_measurement(...)', p_metric_name;
        WHEN 'state' THEN
            RAISE EXCEPTION 'metric % is state; state ingestion is not implemented yet', p_metric_name;
        ELSE
            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
    END CASE;

    v_metric_lock_key := telemetry.lock_key_from_text(p_metric_name);
    v_device_lock_key := telemetry.lock_key_from_text(p_device_id);
    PERFORM pg_advisory_xact_lock(v_metric_lock_key, v_device_lock_key);
    v_last_observed_at := telemetry.assert_append_only(p_metric_name, p_device_id, p_observed_at);
    v_policy := telemetry.require_metric_policy(p_metric_name, p_observed_at);

    v_action := telemetry.ingest_segment(
        v_metric.table_name,
        v_metric.metric_type,
        v_policy.comparison_mode,
        v_policy.policy_id,
        v_policy.valid_from,
        v_device.device_pk,
        p_device_id,
        p_value,
        p_observed_at,
        v_last_observed_at,
        v_policy.epsilon,
        v_policy.max_sampling_interval,
        v_policy.allow_null
    );

    PERFORM telemetry.bump_watermark(p_metric_name, p_device_id, p_observed_at);
    PERFORM telemetry.touch_device_last_seen(p_device_id, p_observed_at);

    RETURN QUERY
    SELECT
        p_metric_name,
        p_device_id,
        v_metric.table_name,
        p_value,
        v_action;
END;
$$;

-- Query functions.

CREATE OR REPLACE FUNCTION telemetry.value_at_from_table(
    p_table_name text,
    p_metric_name text,
    p_device_id text,
    p_at timestamptz
)
RETURNS jsonb
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
    v_value jsonb;
    v_table_sql text := telemetry.metric_table_sql(p_table_name);
BEGIN
    EXECUTE format(
        'WITH watermark AS (
             SELECT w.last_observed_at
             FROM telemetry.metric_device_watermarks AS w
             WHERE w.metric_name = $1
               AND w.device_id = $2
         )
         SELECT to_jsonb(s.value)
         FROM %s AS s
         JOIN telemetry.metric_policies AS mp
           ON mp.policy_id = s.policy_id
         LEFT JOIN watermark AS w ON true
         WHERE s.device_id = $2
           AND tstzrange(
                 s.start_time,
                 CASE
                     WHEN s.end_time IS NOT NULL THEN s.end_time
                     WHEN s.value IS NULL THEN ''infinity''::timestamptz
                     ELSE COALESCE(
                         GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
                         + mp.max_sampling_interval,
                         ''infinity''::timestamptz
                     )
                 END,
                 ''[)''
               ) @> $3
         LIMIT 1',
        v_table_sql
    )
    INTO v_value
    USING p_metric_name, p_device_id, p_at;

    RETURN v_value;
END;
$$;

CREATE OR REPLACE FUNCTION telemetry.value_at(
    p_metric_name text,
    p_device_id text,
    p_at timestamptz
)
RETURNS double precision
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
    v_metric telemetry.metrics%ROWTYPE;
    v_value jsonb;
BEGIN
    v_metric := telemetry.require_metric(p_metric_name);

    -- 'state' is reserved for future implementation of discrete enumerated
    -- machine states, likely stored as smallint. It will use the same segment
    -- logic as boolean metrics, but with exact comparison instead of epsilon.
    CASE v_metric.metric_type
        WHEN 'numeric' THEN
            v_value := telemetry.value_at_from_table(
                v_metric.table_name,
                p_metric_name,
                p_device_id,
                p_at
            );

            RETURN CASE
                WHEN v_value IS NULL THEN NULL
                -- PostgreSQL must cast jsonb scalars via text first; direct
                -- casts such as (v_value)::double precision are not supported.
                ELSE v_value::text::double precision
            END;
        WHEN 'boolean' THEN
            RAISE EXCEPTION 'metric % is boolean; use telemetry.boolean_value_at(...)', p_metric_name;
        WHEN 'state' THEN
            RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
        ELSE
            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
    END CASE;
END;
$$;

CREATE OR REPLACE FUNCTION telemetry.boolean_value_at(
    p_metric_name text,
    p_device_id text,
    p_at timestamptz
)
RETURNS boolean
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
    v_metric telemetry.metrics%ROWTYPE;
    v_value jsonb;
BEGIN
    v_metric := telemetry.require_metric(p_metric_name);

    -- 'state' is reserved for future implementation of discrete enumerated
    -- machine states, likely stored as smallint. It will use the same segment
    -- logic as boolean metrics, but with exact comparison instead of epsilon.
    CASE v_metric.metric_type
        WHEN 'boolean' THEN
            v_value := telemetry.value_at_from_table(
                v_metric.table_name,
                p_metric_name,
                p_device_id,
                p_at
            );

            RETURN CASE
                WHEN v_value IS NULL THEN NULL
                -- PostgreSQL must cast jsonb scalars via text first; direct
                -- casts such as (v_value)::boolean are not supported.
                ELSE v_value::text::boolean
            END;
        WHEN 'numeric' THEN
            RAISE EXCEPTION 'metric % is numeric; use telemetry.value_at(...)', p_metric_name;
        WHEN 'state' THEN
            RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
        ELSE
            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
    END CASE;
END;
$$;

CREATE OR REPLACE FUNCTION telemetry.segments_between_from_table(
    p_table_name text,
    p_metric_name text,
    p_device_id text,
    p_from timestamptz,
    p_to timestamptz
)
RETURNS TABLE (
    device_id text,
    start_time timestamptz,
    end_time timestamptz,
    value_json jsonb,
    samples_count integer
)
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
    v_table_sql text := telemetry.metric_table_sql(p_table_name);
BEGIN
    RETURN QUERY EXECUTE format(
        'WITH watermark AS (
             SELECT w.last_observed_at
             FROM telemetry.metric_device_watermarks AS w
             WHERE w.metric_name = $1
               AND w.device_id = $2
         ),
         open_segment AS (
             SELECT s.*
             FROM %s AS s
             WHERE s.device_id = $2
               AND s.end_time IS NULL
             LIMIT 1
         ),
         stored_segments AS (
             SELECT
                 s.device_id,
                 GREATEST(s.start_time, $3) AS start_time,
                 LEAST(
                     CASE
                         WHEN s.end_time IS NOT NULL THEN s.end_time
                         WHEN s.value IS NULL THEN $4
                         ELSE LEAST(
                             COALESCE(
                                 GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
                                 + mp.max_sampling_interval,
                                 $4
                             ),
                             $4
                         )
                     END,
                     $4
                 ) AS end_time,
                 to_jsonb(s.value) AS value_json,
                 s.samples_count
             FROM %s AS s
             JOIN telemetry.metric_policies AS mp
               ON mp.policy_id = s.policy_id
             LEFT JOIN watermark AS w ON true
             WHERE s.device_id = $2
               AND tstzrange(
                     s.start_time,
                     CASE
                         WHEN s.end_time IS NOT NULL THEN s.end_time
                         WHEN s.value IS NULL THEN ''infinity''::timestamptz
                         ELSE COALESCE(
                             GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
                             + mp.max_sampling_interval,
                             ''infinity''::timestamptz
                         )
                     END,
                     ''[)''
                   ) && tstzrange($3, $4, ''[)'')
         ),
         synthetic_tail AS (
             SELECT
                 $2::text AS device_id,
                 GREATEST(
                     GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
                     + mp.max_sampling_interval,
                     $3
                 ) AS start_time,
                 $4 AS end_time,
                 NULL::jsonb AS value_json,
                 0 AS samples_count
             FROM open_segment AS s
             JOIN telemetry.metric_policies AS mp
               ON mp.policy_id = s.policy_id
             JOIN watermark AS w ON true
             WHERE s.value IS NOT NULL
               AND GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
                   + mp.max_sampling_interval < $4
         )
         SELECT *
         FROM (
             SELECT *
             FROM stored_segments
             WHERE end_time > start_time

             UNION ALL

             SELECT *
             FROM synthetic_tail
             WHERE end_time > start_time
         ) AS combined_segments
         ORDER BY start_time',
        v_table_sql,
        v_table_sql
    )
    USING p_metric_name, p_device_id, p_from, p_to;
END;
$$;

CREATE OR REPLACE FUNCTION telemetry.metric_segments(
    p_metric_name text,
    p_device_id text,
    p_from timestamptz,
    p_to timestamptz
)
RETURNS TABLE (
    device_id text,
    start_time timestamptz,
    end_time timestamptz,
    value double precision,
    samples_count integer
)
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
    v_metric telemetry.metrics%ROWTYPE;
BEGIN
    IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN
        RAISE EXCEPTION 'p_from and p_to must define a non-empty interval';
    END IF;

    v_metric := telemetry.require_metric(p_metric_name);

    -- 'state' is reserved for future implementation of discrete enumerated
    -- machine states, likely stored as smallint. It will use the same segment
    -- logic as boolean metrics, but with exact comparison instead of epsilon.
    CASE v_metric.metric_type
        WHEN 'numeric' THEN
            RETURN QUERY
            SELECT
                s.device_id,
                s.start_time,
                s.end_time,
                CASE
                    WHEN s.value_json IS NULL THEN NULL
                    -- PostgreSQL must cast jsonb scalars via text first;
                    -- direct casts such as (s.value_json)::double precision
                    -- are not supported.
                    ELSE s.value_json::text::double precision
                END AS value,
                s.samples_count
            FROM telemetry.segments_between_from_table(
                v_metric.table_name,
                p_metric_name,
                p_device_id,
                p_from,
                p_to
            ) AS s;
        WHEN 'boolean' THEN
            RAISE EXCEPTION 'metric % is boolean; use telemetry.boolean_segments_between(...)', p_metric_name;
        WHEN 'state' THEN
            RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
        ELSE
            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
    END CASE;
END;
$$;

CREATE OR REPLACE FUNCTION telemetry.boolean_segments_between(
    p_metric_name text,
    p_device_id text,
    p_from timestamptz,
    p_to timestamptz
)
RETURNS TABLE (
    device_id text,
    start_time timestamptz,
    end_time timestamptz,
    value boolean,
    samples_count integer
)
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
    v_metric telemetry.metrics%ROWTYPE;
BEGIN
    IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN
        RAISE EXCEPTION 'p_from and p_to must define a non-empty interval';
    END IF;

    v_metric := telemetry.require_metric(p_metric_name);

    -- 'state' is reserved for future implementation of discrete enumerated
    -- machine states, likely stored as smallint. It will use the same segment
    -- logic as boolean metrics, but with exact comparison instead of epsilon.
    CASE v_metric.metric_type
        WHEN 'boolean' THEN
            RETURN QUERY
            SELECT
                s.device_id,
                s.start_time,
                s.end_time,
                CASE
                    WHEN s.value_json IS NULL THEN NULL
                    -- PostgreSQL must cast jsonb scalars via text first;
                    -- direct casts such as (s.value_json)::boolean are not
                    -- supported.
                    ELSE s.value_json::text::boolean
                END AS value,
                s.samples_count
            FROM telemetry.segments_between_from_table(
                v_metric.table_name,
                p_metric_name,
                p_device_id,
                p_from,
                p_to
            ) AS s;
        WHEN 'numeric' THEN
            RAISE EXCEPTION 'metric % is numeric; use telemetry.metric_segments(...)', p_metric_name;
        WHEN 'state' THEN
            RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
        ELSE
            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
    END CASE;
END;
$$;

CREATE OR REPLACE FUNCTION telemetry.samples_from_table(
    p_table_name text,
    p_metric_name text,
    p_device_id text,
    p_from timestamptz,
    p_to timestamptz,
    p_points integer
)
RETURNS TABLE (
    sample_at timestamptz,
    value_json jsonb
)
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
    v_table_sql text := telemetry.metric_table_sql(p_table_name);
BEGIN
    RETURN QUERY EXECUTE format(
        'WITH watermark AS (
             SELECT w.last_observed_at
             FROM telemetry.metric_device_watermarks AS w
             WHERE w.metric_name = $1
               AND w.device_id = $2
         ),
         sample_points AS (
             SELECT $3 + (($4 - $3) * gs.i::double precision / ($5 - 1)::double precision) AS sample_at
             FROM generate_series(0, $5 - 1) AS gs(i)
         )
         SELECT
             sp.sample_at,
             to_jsonb(seg.value) AS value_json
         FROM sample_points AS sp
         LEFT JOIN watermark AS w ON true
         LEFT JOIN LATERAL (
             SELECT s.value
             FROM %s AS s
             JOIN telemetry.metric_policies AS mp
               ON mp.policy_id = s.policy_id
             WHERE s.device_id = $2
               AND tstzrange(
                     s.start_time,
                     CASE
                         WHEN s.end_time IS NOT NULL THEN s.end_time
                         WHEN s.value IS NULL THEN ''infinity''::timestamptz
                         ELSE COALESCE(
                             GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
                             + mp.max_sampling_interval,
                             ''infinity''::timestamptz
                         )
                     END,
                     ''[)''
                   ) @> sp.sample_at
             LIMIT 1
         ) AS seg ON true
         ORDER BY sp.sample_at',
        v_table_sql
    )
    USING p_metric_name, p_device_id, p_from, p_to, p_points;
END;
$$;

CREATE OR REPLACE FUNCTION telemetry.sample_metric(
    p_metric_name text,
    p_device_id text,
    p_from timestamptz,
    p_to timestamptz,
    p_points integer
)
RETURNS TABLE (
    sample_at timestamptz,
    value double precision
)
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
    v_metric telemetry.metrics%ROWTYPE;
BEGIN
    IF p_points IS NULL OR p_points < 2 THEN
        RAISE EXCEPTION 'p_points must be at least 2';
    END IF;

    IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN
        RAISE EXCEPTION 'p_from and p_to must define a non-empty interval';
    END IF;

    v_metric := telemetry.require_metric(p_metric_name);

    -- 'state' is reserved for future implementation of discrete enumerated
    -- machine states, likely stored as smallint. It will use the same segment
    -- logic as boolean metrics, but with exact comparison instead of epsilon.
    CASE v_metric.metric_type
        WHEN 'numeric' THEN
            RETURN QUERY
            SELECT
                s.sample_at,
                CASE
                    WHEN s.value_json IS NULL THEN NULL
                    -- PostgreSQL must cast jsonb scalars via text first;
                    -- direct casts such as (s.value_json)::double precision
                    -- are not supported.
                    ELSE s.value_json::text::double precision
                END AS value
            FROM telemetry.samples_from_table(
                v_metric.table_name,
                p_metric_name,
                p_device_id,
                p_from,
                p_to,
                p_points
            ) AS s;
        WHEN 'boolean' THEN
            RAISE EXCEPTION 'metric % is boolean; use telemetry.boolean_samples(...)', p_metric_name;
        WHEN 'state' THEN
            RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
        ELSE
            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
    END CASE;
END;
$$;

CREATE OR REPLACE FUNCTION telemetry.boolean_samples(
    p_metric_name text,
    p_device_id text,
    p_from timestamptz,
    p_to timestamptz,
    p_points integer
)
RETURNS TABLE (
    sample_at timestamptz,
    value boolean
)
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
    v_metric telemetry.metrics%ROWTYPE;
BEGIN
    IF p_points IS NULL OR p_points < 2 THEN
        RAISE EXCEPTION 'p_points must be at least 2';
    END IF;

    IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN
        RAISE EXCEPTION 'p_from and p_to must define a non-empty interval';
    END IF;

    v_metric := telemetry.require_metric(p_metric_name);

    -- 'state' is reserved for future implementation of discrete enumerated
    -- machine states, likely stored as smallint. It will use the same segment
    -- logic as boolean metrics, but with exact comparison instead of epsilon.
    CASE v_metric.metric_type
        WHEN 'boolean' THEN
            RETURN QUERY
            SELECT
                s.sample_at,
                CASE
                    WHEN s.value_json IS NULL THEN NULL
                    -- PostgreSQL must cast jsonb scalars via text first;
                    -- direct casts such as (s.value_json)::boolean are not
                    -- supported.
                    ELSE s.value_json::text::boolean
                END AS value
            FROM telemetry.samples_from_table(
                v_metric.table_name,
                p_metric_name,
                p_device_id,
                p_from,
                p_to,
                p_points
            ) AS s;
        WHEN 'numeric' THEN
            RAISE EXCEPTION 'metric % is numeric; use telemetry.sample_metric(...)', p_metric_name;
        WHEN 'state' THEN
            RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
        ELSE
            RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
    END CASE;
END;
$$;

-- Maintenance utilities.

CREATE OR REPLACE FUNCTION telemetry.verify_segments(p_metric_name text)
RETURNS TABLE (
    metric_name text,
    device_id text,
    issue text,
    segment_id bigint,
    related_segment_id bigint,
    start_time timestamptz,
    end_time timestamptz,
    details text
)
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
    v_metric telemetry.metrics%ROWTYPE;
    v_table_sql text;
BEGIN
    v_metric := telemetry.require_metric(p_metric_name);
    v_table_sql := telemetry.metric_table_sql(v_metric.table_name);

    RETURN QUERY EXECUTE format(
        'WITH ordered AS (
             SELECT
                 s.segment_id,
                 s.device_id,
                 s.policy_id,
                 s.start_time,
                 s.end_time,
                 lag(s.segment_id) OVER (
                     PARTITION BY s.device_id
                     ORDER BY s.start_time, s.segment_id
                 ) AS prev_segment_id,
                 lag(s.end_time) OVER (
                     PARTITION BY s.device_id
                     ORDER BY s.start_time, s.segment_id
                 ) AS prev_end_time,
                 lag(s.policy_id) OVER (
                     PARTITION BY s.device_id
                     ORDER BY s.start_time, s.segment_id
                 ) AS prev_policy_id
             FROM %s AS s
         ),
         open_counts AS (
             SELECT
                 s.device_id,
                 count(*)::integer AS open_count
             FROM %s AS s
             WHERE s.end_time IS NULL
             GROUP BY s.device_id
             HAVING count(*) > 1
         )
         SELECT
             $1::text AS metric_name,
             o.device_id,
             ''invalid_interval''::text AS issue,
             o.segment_id,
             NULL::bigint AS related_segment_id,
             o.start_time,
             o.end_time,
             ''end_time must be greater than start_time''::text AS details
         FROM ordered AS o
         WHERE o.end_time IS NOT NULL
           AND o.end_time <= o.start_time

         UNION ALL

         SELECT
             $1::text,
             oc.device_id,
             ''multiple_open_segments''::text,
             NULL::bigint,
             NULL::bigint,
             NULL::timestamptz,
             NULL::timestamptz,
             format(''%%s open segments found'', oc.open_count)
         FROM open_counts AS oc

         UNION ALL

         SELECT
             $1::text,
             o.device_id,
             ''overlap''::text,
             o.segment_id,
             o.prev_segment_id,
             o.start_time,
             o.end_time,
             format(''previous segment ends at %%s'', o.prev_end_time)
         FROM ordered AS o
         WHERE o.prev_end_time IS NOT NULL
           AND o.prev_end_time > o.start_time

         UNION ALL

         SELECT
             $1::text,
             o.device_id,
             ''unexpected_gap''::text,
             o.segment_id,
             o.prev_segment_id,
             o.start_time,
             o.end_time,
             format(
                 ''stored gap of %%s violates expected continuity (max_sampling_interval=%%s)'',
                 o.start_time - o.prev_end_time,
                 mp.max_sampling_interval
             )
         FROM ordered AS o
         JOIN telemetry.metric_policies AS mp
           ON mp.policy_id = o.prev_policy_id
         WHERE o.prev_end_time IS NOT NULL
           AND o.start_time > o.prev_end_time

         ORDER BY device_id, start_time NULLS FIRST, segment_id NULLS FIRST',
        v_table_sql,
        v_table_sql
    )
    USING p_metric_name;
END;
$$;

CREATE OR REPLACE FUNCTION telemetry.compact_segments(
    p_metric_name text,
    p_before timestamptz
)
RETURNS TABLE (
    device_id text,
    kept_segment_id bigint,
    merged_segments integer,
    start_time timestamptz,
    end_time timestamptz,
    total_samples integer
)
LANGUAGE plpgsql
AS $$
DECLARE
    v_metric telemetry.metrics%ROWTYPE;
    v_table_sql text;
    v_merge record;
BEGIN
    IF p_before IS NULL THEN
        RAISE EXCEPTION 'p_before is required';
    END IF;

    v_metric := telemetry.require_metric(p_metric_name);
    v_table_sql := telemetry.metric_table_sql(v_metric.table_name);

    FOR v_merge IN EXECUTE format(
        'WITH candidates AS (
             SELECT
                 s.segment_id,
                 s.device_id,
                 s.start_time,
                 s.end_time,
                 s.samples_count,
                 s.policy_id,
                 s.value,
                 lag(s.end_time) OVER (
                     PARTITION BY s.device_id
                     ORDER BY s.start_time, s.segment_id
                 ) AS prev_end_time,
                 lag(s.policy_id) OVER (
                     PARTITION BY s.device_id
                     ORDER BY s.start_time, s.segment_id
                 ) AS prev_policy_id,
                 lag(s.value) OVER (
                     PARTITION BY s.device_id
                     ORDER BY s.start_time, s.segment_id
                 ) AS prev_value
             FROM %s AS s
             WHERE s.end_time IS NOT NULL
               AND s.end_time <= $1
         ),
         grouped AS (
             SELECT
                 c.*,
                 sum(
                     CASE
                        WHEN c.prev_end_time IS NULL
                          OR c.start_time <> c.prev_end_time
                          OR c.policy_id IS DISTINCT FROM c.prev_policy_id
                          OR c.value IS DISTINCT FROM c.prev_value
                        THEN 1
                        ELSE 0
                     END
                 ) OVER (
                     PARTITION BY c.device_id
                     ORDER BY c.start_time, c.segment_id
                 ) AS grp
             FROM candidates AS c
         ),
         aggregates AS (
             SELECT
                 g.device_id,
                 (array_agg(g.segment_id ORDER BY g.start_time, g.segment_id))[1] AS keep_segment_id,
                 array_remove(
                     array_agg(g.segment_id ORDER BY g.start_time, g.segment_id),
                     (array_agg(g.segment_id ORDER BY g.start_time, g.segment_id))[1]
                 ) AS delete_segment_ids,
                 min(g.start_time) AS merged_start_time,
                 max(g.end_time) AS merged_end_time,
                 sum(g.samples_count)::integer AS total_samples,
                 count(*)::integer AS merged_segments
             FROM grouped AS g
             GROUP BY g.device_id, g.grp
             HAVING count(*) > 1
         )
         SELECT
             a.device_id,
             a.keep_segment_id,
             a.delete_segment_ids,
             a.merged_segments,
             a.merged_start_time,
             a.merged_end_time,
             a.total_samples
         FROM aggregates AS a
         ORDER BY a.device_id, a.merged_start_time',
        v_table_sql
    )
    USING p_before
    LOOP
        EXECUTE format(
            'DELETE FROM %s
             WHERE segment_id = ANY($1)',
            v_table_sql
        )
        USING v_merge.delete_segment_ids;

        EXECUTE format(
            'UPDATE %s
             SET end_time = $1,
                 samples_count = $2
             WHERE segment_id = $3',
            v_table_sql
        )
        USING v_merge.merged_end_time, v_merge.total_samples, v_merge.keep_segment_id;

        device_id := v_merge.device_id;
        kept_segment_id := v_merge.keep_segment_id;
        merged_segments := v_merge.merged_segments;
        start_time := v_merge.merged_start_time;
        end_time := v_merge.merged_end_time;
        total_samples := v_merge.total_samples;
        RETURN NEXT;
    END LOOP;

    RETURN;
END;
$$;

CREATE OR REPLACE FUNCTION telemetry.inactive_devices(p_threshold interval)
RETURNS TABLE (
    device_pk bigint,
    device_id text,
    device_type text,
    location text,
    last_seen timestamptz,
    inactive_for interval
)
LANGUAGE sql
STABLE
AS $$
    SELECT
        d.device_pk,
        d.device_id,
        d.device_type,
        d.location,
        d.last_seen,
        now() - d.last_seen AS inactive_for
    FROM telemetry.devices AS d
    WHERE d.last_seen IS NOT NULL
      AND now() - d.last_seen > $1
    ORDER BY inactive_for DESC, d.device_id;
$$;

-- Example metric registrations.

SELECT telemetry.register_numeric_metric(
    p_metric_name => 'ambient_temperature',
    p_table_name => 'ambient_temperature_segments',
    p_domain_name => 'environmental',
    p_epsilon => 0.05,
    p_min_value => -40,
    p_max_value => 60,
    p_rounding_precision => 0.1,
    p_max_sampling_interval => '5 minutes',
    p_allow_null => true
);

SELECT telemetry.register_numeric_metric(
    p_metric_name => 'cpu_temperature',
    p_table_name => 'cpu_temperature_segments',
    p_domain_name => 'system',
    p_epsilon => 0.5,
    p_min_value => 0,
    p_max_value => 120,
    p_rounding_precision => 1,
    p_max_sampling_interval => '10 seconds',
    p_allow_null => true
);

SELECT telemetry.register_numeric_metric(
    p_metric_name => 'humidity',
    p_table_name => 'humidity_segments',
    p_domain_name => 'environmental',
    p_epsilon => 0.5,
    p_min_value => 0,
    p_max_value => 100,
    p_rounding_precision => 0.1,
    p_max_sampling_interval => '5 minutes',
    p_allow_null => true
);
COMMIT;