BEGIN; -- Extensions. CREATE SCHEMA telemetry; CREATE EXTENSION btree_gist; -- Enums. -- 'state' is reserved for future implementation of discrete enumerated -- machine states such as thermostat_mode, pump_state, hvac_mode, or -- door_state. Values will likely be stored as smallint and use the -- same segment engine as boolean metrics, but with exact comparison -- over enumerated values. CREATE TYPE telemetry.metric_type_enum AS ENUM ('numeric', 'boolean', 'state'); CREATE TYPE telemetry.comparison_mode_enum AS ENUM ('epsilon', 'exact'); -- Registry utility functions. CREATE OR REPLACE FUNCTION telemetry.normalize_metric_table_name(p_table_name text) RETURNS text LANGUAGE plpgsql IMMUTABLE AS $$ DECLARE v_parts text[]; BEGIN IF p_table_name IS NULL OR btrim(p_table_name) = '' THEN RAISE EXCEPTION 'table_name is required'; END IF; v_parts := pg_catalog.parse_ident(p_table_name, false); CASE COALESCE(array_length(v_parts, 1), 0) WHEN 1 THEN RETURN v_parts[1]; WHEN 2 THEN IF v_parts[1] <> 'telemetry' THEN RAISE EXCEPTION 'metric tables must live in schema telemetry: %', p_table_name; END IF; RETURN v_parts[2]; ELSE RAISE EXCEPTION 'invalid metric table reference: %', p_table_name; END CASE; END; $$; CREATE OR REPLACE FUNCTION telemetry.metric_table_sql(p_table_name text) RETURNS text LANGUAGE plpgsql STABLE AS $$ DECLARE v_table_name text := telemetry.normalize_metric_table_name(p_table_name); v_table_sql text := format('telemetry.%I', v_table_name); BEGIN IF to_regclass(v_table_sql) IS NULL THEN RAISE EXCEPTION 'could not resolve metric table %', v_table_sql; END IF; RETURN v_table_sql; END; $$; -- Core registry tables. CREATE TABLE telemetry.metrics ( metric_pk bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY, metric_name text NOT NULL UNIQUE, table_name text NOT NULL UNIQUE, domain_name text NOT NULL DEFAULT 'generic', metric_type telemetry.metric_type_enum NOT NULL, comparison_mode telemetry.comparison_mode_enum NOT NULL, epsilon double precision, min_value double precision, max_value double precision, rounding_precision double precision, allow_null boolean NOT NULL DEFAULT true, max_sampling_interval interval NOT NULL, created_at timestamptz NOT NULL DEFAULT now(), updated_at timestamptz NOT NULL DEFAULT now(), CHECK (min_value IS NULL OR max_value IS NULL OR min_value <= max_value), CONSTRAINT metrics_policy_shape_check CHECK ( (metric_type = 'numeric'::telemetry.metric_type_enum AND comparison_mode = 'epsilon'::telemetry.comparison_mode_enum AND epsilon IS NOT NULL AND rounding_precision IS NOT NULL) OR (metric_type IN ( 'boolean'::telemetry.metric_type_enum, 'state'::telemetry.metric_type_enum ) AND comparison_mode = 'exact'::telemetry.comparison_mode_enum AND epsilon IS NULL AND min_value IS NULL AND max_value IS NULL AND rounding_precision IS NULL) ), CONSTRAINT metrics_table_name_check CHECK (table_name = telemetry.normalize_metric_table_name(table_name)) ); -- Policy tables. CREATE TABLE telemetry.metric_policies ( policy_id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY, metric_name text NOT NULL REFERENCES telemetry.metrics(metric_name) ON UPDATE CASCADE ON DELETE CASCADE, valid_from timestamptz NOT NULL, comparison_mode telemetry.comparison_mode_enum NOT NULL, epsilon double precision, min_value double precision, max_value double precision, rounding_precision double precision, allow_null boolean NOT NULL, max_sampling_interval interval NOT NULL, created_at timestamptz NOT NULL DEFAULT now(), UNIQUE (metric_name, valid_from) ); CREATE TABLE telemetry.metric_retention_policies ( metric_name text PRIMARY KEY REFERENCES telemetry.metrics(metric_name) ON UPDATE CASCADE ON DELETE CASCADE, raw_retention interval NULL, compact_after interval NULL ); -- Device registry tables. CREATE TABLE telemetry.devices ( device_pk bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY, device_id text NOT NULL UNIQUE, device_type text NULL, location text NULL, metadata jsonb NULL, last_seen timestamptz NULL, created_at timestamptz NOT NULL DEFAULT now(), updated_at timestamptz NOT NULL DEFAULT now() ); CREATE TABLE telemetry.metric_device_watermarks ( metric_name text NOT NULL REFERENCES telemetry.metrics(metric_name) ON UPDATE CASCADE ON DELETE CASCADE, device_id text NOT NULL REFERENCES telemetry.devices(device_id) ON UPDATE CASCADE ON DELETE CASCADE, last_observed_at timestamptz NOT NULL, PRIMARY KEY (metric_name, device_id) ); -- Runtime helpers. CREATE OR REPLACE FUNCTION telemetry.round_to_increment( p_value double precision, p_increment double precision ) RETURNS double precision LANGUAGE sql IMMUTABLE STRICT AS $$ SELECT CASE WHEN p_increment <= 0 THEN p_value ELSE round(p_value / p_increment) * p_increment END; $$; CREATE OR REPLACE FUNCTION telemetry.ensure_device(p_device_id text) RETURNS void LANGUAGE sql AS $$ INSERT INTO telemetry.devices (device_id) VALUES ($1) ON CONFLICT (device_id) DO NOTHING; $$; CREATE OR REPLACE FUNCTION telemetry.touch_device_last_seen( p_device_id text, p_observed_at timestamptz ) RETURNS void LANGUAGE sql AS $$ UPDATE telemetry.devices SET last_seen = CASE WHEN last_seen IS NULL OR last_seen < $2 THEN $2 ELSE last_seen END WHERE device_id = $1; $$; CREATE OR REPLACE FUNCTION telemetry.require_device(p_device_id text) RETURNS telemetry.devices LANGUAGE plpgsql STABLE AS $$ DECLARE v_device telemetry.devices%ROWTYPE; BEGIN SELECT * INTO v_device FROM telemetry.devices AS d WHERE d.device_id = p_device_id; IF NOT FOUND THEN RAISE EXCEPTION 'unknown device: %', p_device_id; END IF; RETURN v_device; END; $$; CREATE OR REPLACE FUNCTION telemetry.lock_key_from_text(p_value text) RETURNS integer LANGUAGE sql IMMUTABLE STRICT AS $$ SELECT (hashtextextended($1, 0) % 2147483647)::integer; $$; -- Segment template and table creation. CREATE OR REPLACE FUNCTION telemetry.create_segment_table( p_table_name text, p_value_type text ) RETURNS void LANGUAGE plpgsql AS $$ DECLARE v_base_table_name text := telemetry.normalize_metric_table_name(p_table_name); v_table_name text := format('telemetry.%I', v_base_table_name); v_metric_name text; v_initial_policy_id bigint; v_open_idx text := format('%s_open_segment_uidx', v_base_table_name); v_start_idx text := format('%s_device_start_cover_idx', v_base_table_name); v_device_period_gist_idx text := format('%s_device_period_gist', v_base_table_name); v_period_excl text := format('%s_device_period_excl', v_base_table_name); v_device_pk_fk text := format('%s_device_pk_fkey', v_base_table_name); v_device_fk text := format('%s_device_id_fkey', v_base_table_name); v_policy_fk text := format('%s_policy_id_fkey', v_base_table_name); v_samples_check text := format('%s_samples_count_check', v_base_table_name); v_end_time_check text := format('%s_check', v_base_table_name); BEGIN IF p_value_type NOT IN ('double precision', 'boolean', 'smallint') THEN RAISE EXCEPTION 'unsupported segment value type: %', p_value_type; END IF; SELECT m.metric_name INTO v_metric_name FROM telemetry.metrics AS m WHERE m.table_name = v_base_table_name; IF v_metric_name IS NULL THEN RAISE EXCEPTION 'metric metadata must exist before creating segment table %', v_base_table_name; END IF; SELECT mp.policy_id INTO v_initial_policy_id FROM telemetry.metric_policies AS mp WHERE mp.metric_name = v_metric_name AND mp.valid_from = '-infinity'::timestamptz; IF v_initial_policy_id IS NULL THEN RAISE EXCEPTION 'initial policy row is missing for metric %', v_metric_name; END IF; EXECUTE format( 'CREATE TABLE IF NOT EXISTS %s ( segment_id bigserial PRIMARY KEY, device_pk bigint NOT NULL, device_id text NOT NULL, start_time timestamptz NOT NULL, end_time timestamptz NULL, value %s NULL, -- samples_count = 0 is intentional for synthetic gap segments inserted -- during lazy gap detection; those intervals represent missing data, not samples. samples_count integer NOT NULL DEFAULT 1 CONSTRAINT %I CHECK (samples_count >= 0), policy_id bigint NOT NULL, segment_period tstzrange GENERATED ALWAYS AS ( tstzrange(start_time, COALESCE(end_time, ''infinity''::timestamptz), ''[)'') ) STORED, CONSTRAINT %I CHECK (end_time IS NULL OR end_time > start_time), CONSTRAINT %I FOREIGN KEY (device_pk) REFERENCES telemetry.devices(device_pk) ON UPDATE CASCADE ON DELETE CASCADE, CONSTRAINT %I FOREIGN KEY (device_id) REFERENCES telemetry.devices(device_id) ON UPDATE CASCADE ON DELETE CASCADE, CONSTRAINT %I FOREIGN KEY (policy_id) REFERENCES telemetry.metric_policies(policy_id) ) WITH ( fillfactor = 90, autovacuum_vacuum_scale_factor = 0.02, autovacuum_analyze_scale_factor = 0.01 )', v_table_name, p_value_type, v_samples_check, v_end_time_check, v_device_pk_fk, v_device_fk, v_policy_fk ); EXECUTE format( 'CREATE UNIQUE INDEX IF NOT EXISTS %I ON %s (device_id) WHERE end_time IS NULL', v_open_idx, v_table_name ); EXECUTE format( 'CREATE INDEX IF NOT EXISTS %I ON %s (device_id, start_time DESC) INCLUDE (end_time, value, samples_count)', v_start_idx, v_table_name ); EXECUTE format( 'CREATE INDEX IF NOT EXISTS %I ON %s USING gist (device_id, segment_period)', v_device_period_gist_idx, v_table_name ); BEGIN EXECUTE format( 'ALTER TABLE %s ADD CONSTRAINT %I EXCLUDE USING gist ( device_id WITH =, segment_period WITH && )', v_table_name, v_period_excl ); EXCEPTION WHEN duplicate_object THEN NULL; WHEN duplicate_table THEN NULL; END; END; $$; -- Metric registration. CREATE OR REPLACE FUNCTION telemetry.register_numeric_metric( p_metric_name text, p_table_name text, p_domain_name text, p_epsilon double precision, p_min_value double precision, p_max_value double precision, p_rounding_precision double precision, p_max_sampling_interval interval, p_allow_null boolean DEFAULT true ) RETURNS void LANGUAGE plpgsql AS $$ DECLARE v_table_name text := telemetry.normalize_metric_table_name(p_table_name); BEGIN INSERT INTO telemetry.metrics ( metric_name, table_name, domain_name, metric_type, comparison_mode, epsilon, min_value, max_value, rounding_precision, max_sampling_interval, allow_null ) VALUES ( p_metric_name, v_table_name, p_domain_name, 'numeric', 'epsilon', p_epsilon, p_min_value, p_max_value, p_rounding_precision, p_max_sampling_interval, p_allow_null ) ON CONFLICT (metric_name) DO UPDATE SET table_name = EXCLUDED.table_name, domain_name = EXCLUDED.domain_name, metric_type = EXCLUDED.metric_type, comparison_mode = EXCLUDED.comparison_mode, epsilon = EXCLUDED.epsilon, min_value = EXCLUDED.min_value, max_value = EXCLUDED.max_value, rounding_precision = EXCLUDED.rounding_precision, max_sampling_interval = EXCLUDED.max_sampling_interval, allow_null = EXCLUDED.allow_null, updated_at = now(); INSERT INTO telemetry.metric_policies ( metric_name, valid_from, comparison_mode, epsilon, min_value, max_value, rounding_precision, allow_null, max_sampling_interval ) VALUES ( p_metric_name, '-infinity'::timestamptz, 'epsilon', p_epsilon, p_min_value, p_max_value, p_rounding_precision, p_allow_null, p_max_sampling_interval ) ON CONFLICT (metric_name, valid_from) DO NOTHING; PERFORM telemetry.create_segment_table(v_table_name, 'double precision'); PERFORM telemetry.metric_table_sql(v_table_name); END; $$; CREATE OR REPLACE FUNCTION telemetry.register_boolean_metric( p_metric_name text, p_table_name text, p_domain_name text, p_max_sampling_interval interval, p_allow_null boolean DEFAULT true ) RETURNS void LANGUAGE plpgsql AS $$ DECLARE v_table_name text := telemetry.normalize_metric_table_name(p_table_name); BEGIN INSERT INTO telemetry.metrics ( metric_name, table_name, domain_name, metric_type, comparison_mode, epsilon, min_value, max_value, rounding_precision, max_sampling_interval, allow_null ) VALUES ( p_metric_name, v_table_name, p_domain_name, 'boolean', 'exact', NULL, NULL, NULL, NULL, p_max_sampling_interval, p_allow_null ) ON CONFLICT (metric_name) DO UPDATE SET table_name = EXCLUDED.table_name, domain_name = EXCLUDED.domain_name, metric_type = EXCLUDED.metric_type, comparison_mode = EXCLUDED.comparison_mode, epsilon = EXCLUDED.epsilon, min_value = EXCLUDED.min_value, max_value = EXCLUDED.max_value, rounding_precision = EXCLUDED.rounding_precision, max_sampling_interval = EXCLUDED.max_sampling_interval, allow_null = EXCLUDED.allow_null, updated_at = now(); INSERT INTO telemetry.metric_policies ( metric_name, valid_from, comparison_mode, epsilon, min_value, max_value, rounding_precision, allow_null, max_sampling_interval ) VALUES ( p_metric_name, '-infinity'::timestamptz, 'exact', NULL, NULL, NULL, NULL, p_allow_null, p_max_sampling_interval ) ON CONFLICT (metric_name, valid_from) DO NOTHING; PERFORM telemetry.create_segment_table(v_table_name, 'boolean'); PERFORM telemetry.metric_table_sql(v_table_name); END; $$; -- Metric and policy lookup. CREATE OR REPLACE FUNCTION telemetry.require_metric(p_metric_name text) RETURNS telemetry.metrics LANGUAGE plpgsql STABLE AS $$ DECLARE v_metric telemetry.metrics%ROWTYPE; BEGIN SELECT * INTO v_metric FROM telemetry.metrics AS m WHERE m.metric_name = p_metric_name; IF NOT FOUND THEN RAISE EXCEPTION 'unknown metric: %', p_metric_name; END IF; RETURN v_metric; END; $$; CREATE OR REPLACE FUNCTION telemetry.require_metric_policy( p_metric_name text, p_observed_at timestamptz ) RETURNS telemetry.metric_policies LANGUAGE plpgsql STABLE AS $$ DECLARE v_policy telemetry.metric_policies%ROWTYPE; BEGIN SELECT * INTO v_policy FROM telemetry.metric_policies AS mp WHERE mp.metric_name = p_metric_name AND mp.valid_from <= p_observed_at ORDER BY mp.valid_from DESC LIMIT 1; IF NOT FOUND THEN RAISE EXCEPTION 'no active policy for metric % at %', p_metric_name, p_observed_at; END IF; RETURN v_policy; END; $$; CREATE OR REPLACE FUNCTION telemetry.add_metric_policy( p_metric_name text, p_valid_from timestamptz, p_comparison_mode telemetry.comparison_mode_enum, p_epsilon double precision, p_min_value double precision, p_max_value double precision, p_rounding_precision double precision, p_max_sampling_interval interval, p_allow_null boolean ) RETURNS bigint LANGUAGE plpgsql AS $$ DECLARE v_metric telemetry.metrics%ROWTYPE; v_policy_id bigint; BEGIN IF p_valid_from IS NULL THEN RAISE EXCEPTION 'valid_from is required'; END IF; IF p_max_sampling_interval IS NULL THEN RAISE EXCEPTION 'max_sampling_interval is required'; END IF; v_metric := telemetry.require_metric(p_metric_name); IF p_min_value IS NOT NULL AND p_max_value IS NOT NULL AND p_min_value > p_max_value THEN RAISE EXCEPTION 'min_value % exceeds max_value % for metric %', p_min_value, p_max_value, p_metric_name; END IF; CASE v_metric.metric_type WHEN 'numeric' THEN IF p_comparison_mode <> 'epsilon'::telemetry.comparison_mode_enum THEN RAISE EXCEPTION 'numeric metric % requires comparison_mode epsilon', p_metric_name; END IF; IF p_epsilon IS NULL OR p_rounding_precision IS NULL THEN RAISE EXCEPTION 'numeric metric % requires epsilon and rounding_precision', p_metric_name; END IF; WHEN 'boolean', 'state' THEN IF p_comparison_mode <> 'exact'::telemetry.comparison_mode_enum THEN RAISE EXCEPTION 'metric % with type % requires comparison_mode exact', p_metric_name, v_metric.metric_type; END IF; IF p_epsilon IS NOT NULL OR p_min_value IS NOT NULL OR p_max_value IS NOT NULL OR p_rounding_precision IS NOT NULL THEN RAISE EXCEPTION 'metric % with type % does not accept epsilon/min/max/rounding_precision in policies', p_metric_name, v_metric.metric_type; END IF; ELSE RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name; END CASE; IF EXISTS ( SELECT 1 FROM telemetry.metric_policies AS mp WHERE mp.metric_name = p_metric_name AND mp.valid_from = p_valid_from ) THEN RAISE EXCEPTION 'policy already exists for metric % at %', p_metric_name, p_valid_from; END IF; INSERT INTO telemetry.metric_policies ( metric_name, valid_from, comparison_mode, epsilon, min_value, max_value, rounding_precision, allow_null, max_sampling_interval ) VALUES ( p_metric_name, p_valid_from, p_comparison_mode, p_epsilon, p_min_value, p_max_value, p_rounding_precision, p_allow_null, p_max_sampling_interval ) RETURNING policy_id INTO v_policy_id; UPDATE telemetry.metrics AS m SET comparison_mode = p_comparison_mode, epsilon = p_epsilon, min_value = p_min_value, max_value = p_max_value, rounding_precision = p_rounding_precision, allow_null = p_allow_null, max_sampling_interval = p_max_sampling_interval, updated_at = now() WHERE m.metric_name = p_metric_name AND NOT EXISTS ( SELECT 1 FROM telemetry.metric_policies AS later WHERE later.metric_name = p_metric_name AND later.valid_from > p_valid_from ); RETURN v_policy_id; END; $$; -- Ingestion functions. CREATE OR REPLACE FUNCTION telemetry.assert_append_only( p_metric_name text, p_device_id text, p_observed_at timestamptz ) RETURNS timestamptz LANGUAGE plpgsql AS $$ DECLARE v_last_observed_at timestamptz; BEGIN SELECT w.last_observed_at INTO v_last_observed_at FROM telemetry.metric_device_watermarks AS w WHERE w.metric_name = p_metric_name AND w.device_id = p_device_id; IF FOUND AND p_observed_at <= v_last_observed_at THEN RAISE EXCEPTION 'out-of-order measurement for metric % device %: incoming % <= last observed %', p_metric_name, p_device_id, p_observed_at, v_last_observed_at; END IF; RETURN v_last_observed_at; END; $$; CREATE OR REPLACE FUNCTION telemetry.bump_watermark( p_metric_name text, p_device_id text, p_observed_at timestamptz ) RETURNS void LANGUAGE sql AS $$ INSERT INTO telemetry.metric_device_watermarks ( metric_name, device_id, last_observed_at ) VALUES ($1, $2, $3) ON CONFLICT ON CONSTRAINT metric_device_watermarks_pkey DO UPDATE SET last_observed_at = EXCLUDED.last_observed_at; $$; CREATE OR REPLACE FUNCTION telemetry.ingest_segment( p_table_name text, p_metric_type telemetry.metric_type_enum, p_comparison_mode telemetry.comparison_mode_enum, p_policy_id bigint, p_policy_valid_from timestamptz, p_device_pk bigint, p_device_id text, p_value anyelement, p_observed_at timestamptz, p_last_observed_at timestamptz, p_epsilon double precision, p_max_sampling_interval interval, p_allow_null boolean ) RETURNS text LANGUAGE plpgsql AS $$ DECLARE v_current record; v_has_current boolean; v_gap_start timestamptz; v_rowcount bigint; v_equal boolean; v_policy_changed boolean; v_effective_last_observed_at timestamptz := p_last_observed_at; v_table_name text := telemetry.normalize_metric_table_name(p_table_name); v_table_sql text := telemetry.metric_table_sql(p_table_name); BEGIN EXECUTE format( 'SELECT segment_id, device_pk, device_id, start_time, end_time, value, samples_count, policy_id FROM %s WHERE device_id = $1 AND end_time IS NULL FOR UPDATE', v_table_sql ) INTO v_current USING p_device_id; GET DIAGNOSTICS v_rowcount = ROW_COUNT; v_has_current := v_rowcount > 0; IF p_value IS NULL AND NOT p_allow_null THEN RAISE EXCEPTION 'metric table % does not allow explicit NULL measurements', v_table_name; END IF; IF NOT v_has_current THEN EXECUTE format( 'INSERT INTO %s ( device_pk, device_id, start_time, end_time, value, samples_count, policy_id ) VALUES ($1, $2, $3, NULL, $4, 1, $5)', v_table_sql ) USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id; RETURN CASE WHEN p_value IS NULL THEN 'opened_null' ELSE 'opened' END; END IF; v_policy_changed := v_current.policy_id IS DISTINCT FROM p_policy_id; IF v_policy_changed AND p_policy_valid_from > v_current.start_time THEN EXECUTE format( 'UPDATE %s SET end_time = $1 WHERE segment_id = $2', v_table_sql ) USING p_policy_valid_from, v_current.segment_id; EXECUTE format( 'INSERT INTO %s ( device_pk, device_id, start_time, end_time, value, samples_count, policy_id ) -- samples_count = 0 marks a synthetic continuation across a policy boundary. VALUES ($1, $2, $3, NULL, $4, 0, $5) RETURNING segment_id, device_pk, device_id, start_time, end_time, value, samples_count, policy_id', v_table_sql ) INTO v_current USING v_current.device_pk, v_current.device_id, p_policy_valid_from, v_current.value, p_policy_id; v_policy_changed := false; v_effective_last_observed_at := GREATEST( COALESCE(p_last_observed_at, p_policy_valid_from), p_policy_valid_from ); END IF; IF v_effective_last_observed_at IS NOT NULL AND p_observed_at - v_effective_last_observed_at > p_max_sampling_interval AND v_current.value IS NOT NULL THEN v_gap_start := v_effective_last_observed_at + p_max_sampling_interval; EXECUTE format( 'UPDATE %s SET end_time = $1 WHERE segment_id = $2', v_table_sql ) USING v_gap_start, v_current.segment_id; IF p_value IS NULL THEN EXECUTE format( 'INSERT INTO %s ( device_pk, device_id, start_time, end_time, value, samples_count, policy_id ) VALUES ($1, $2, $3, NULL, NULL, 1, $4)', v_table_sql ) USING p_device_pk, p_device_id, v_gap_start, p_policy_id; RETURN 'gap_to_null'; END IF; EXECUTE format( 'INSERT INTO %s ( device_pk, device_id, start_time, end_time, value, samples_count, policy_id ) -- samples_count = 0 marks a historian-generated gap interval. VALUES ($1, $2, $3, $4, NULL, 0, $5)', v_table_sql ) USING p_device_pk, p_device_id, v_gap_start, p_observed_at, p_policy_id; EXECUTE format( 'INSERT INTO %s ( device_pk, device_id, start_time, end_time, value, samples_count, policy_id ) VALUES ($1, $2, $3, NULL, $4, 1, $5)', v_table_sql ) USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id; RETURN 'gap_split'; END IF; IF v_current.samples_count = 0 AND p_observed_at = v_current.start_time THEN IF p_value IS NULL THEN EXECUTE format( 'UPDATE %s SET value = NULL, samples_count = 1 WHERE segment_id = $1', v_table_sql ) USING v_current.segment_id; RETURN CASE WHEN v_current.value IS NULL THEN 'extended_null' ELSE 'value_to_null' END; END IF; IF v_current.value IS NULL THEN EXECUTE format( 'UPDATE %s SET value = $1, samples_count = 1 WHERE segment_id = $2', v_table_sql ) USING p_value, v_current.segment_id; RETURN 'null_to_value'; END IF; CASE p_comparison_mode WHEN 'epsilon' THEN EXECUTE 'SELECT abs($1::double precision - $2::double precision) <= $3' INTO v_equal USING v_current.value, p_value, p_epsilon; WHEN 'exact' THEN EXECUTE 'SELECT $1 IS NOT DISTINCT FROM $2' INTO v_equal USING v_current.value, p_value; ELSE RAISE EXCEPTION 'unsupported comparison_mode % for table %', p_comparison_mode, v_table_name; END CASE; EXECUTE format( 'UPDATE %s SET value = $1, samples_count = 1 WHERE segment_id = $2', v_table_sql ) USING p_value, v_current.segment_id; RETURN CASE WHEN v_equal THEN 'extended' ELSE 'split' END; END IF; IF v_current.value IS NULL THEN IF p_value IS NULL THEN IF v_policy_changed THEN EXECUTE format( 'UPDATE %s SET end_time = $1 WHERE segment_id = $2', v_table_sql ) USING p_observed_at, v_current.segment_id; EXECUTE format( 'INSERT INTO %s ( device_pk, device_id, start_time, end_time, value, samples_count, policy_id ) VALUES ($1, $2, $3, NULL, NULL, 1, $4)', v_table_sql ) USING p_device_pk, p_device_id, p_observed_at, p_policy_id; RETURN 'split'; END IF; EXECUTE format( 'UPDATE %s SET samples_count = samples_count + 1 WHERE segment_id = $1', v_table_sql ) USING v_current.segment_id; RETURN 'extended_null'; END IF; EXECUTE format( 'UPDATE %s SET end_time = $1 WHERE segment_id = $2', v_table_sql ) USING p_observed_at, v_current.segment_id; EXECUTE format( 'INSERT INTO %s ( device_pk, device_id, start_time, end_time, value, samples_count, policy_id ) VALUES ($1, $2, $3, NULL, $4, 1, $5)', v_table_sql ) USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id; RETURN 'null_to_value'; END IF; IF p_value IS NULL THEN EXECUTE format( 'UPDATE %s SET end_time = $1 WHERE segment_id = $2', v_table_sql ) USING p_observed_at, v_current.segment_id; EXECUTE format( 'INSERT INTO %s ( device_pk, device_id, start_time, end_time, value, samples_count, policy_id ) VALUES ($1, $2, $3, NULL, NULL, 1, $4)', v_table_sql ) USING p_device_pk, p_device_id, p_observed_at, p_policy_id; RETURN 'value_to_null'; END IF; CASE p_comparison_mode WHEN 'epsilon' THEN EXECUTE 'SELECT abs($1::double precision - $2::double precision) <= $3' INTO v_equal USING v_current.value, p_value, p_epsilon; WHEN 'exact' THEN EXECUTE 'SELECT $1 IS NOT DISTINCT FROM $2' INTO v_equal USING v_current.value, p_value; ELSE RAISE EXCEPTION 'unsupported comparison_mode % for table %', p_comparison_mode, v_table_name; END CASE; IF v_equal AND NOT v_policy_changed THEN EXECUTE format( 'UPDATE %s SET samples_count = samples_count + 1 WHERE segment_id = $1', v_table_sql ) USING v_current.segment_id; RETURN 'extended'; END IF; EXECUTE format( 'UPDATE %s SET end_time = $1 WHERE segment_id = $2', v_table_sql ) USING p_observed_at, v_current.segment_id; EXECUTE format( 'INSERT INTO %s ( device_pk, device_id, start_time, end_time, value, samples_count, policy_id ) VALUES ($1, $2, $3, NULL, $4, 1, $5)', v_table_sql ) USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id; RETURN 'split'; END; $$; CREATE OR REPLACE FUNCTION telemetry.ingest_measurement( p_metric_name text, p_device_id text, p_value double precision, p_observed_at timestamptz ) RETURNS TABLE ( metric_name text, device_id text, table_name text, normalized_value double precision, action text ) LANGUAGE plpgsql AS $$ DECLARE v_metric telemetry.metrics%ROWTYPE; v_policy telemetry.metric_policies%ROWTYPE; v_device telemetry.devices%ROWTYPE; v_last_observed_at timestamptz; v_normalized double precision; v_action text; v_metric_lock_key integer; v_device_lock_key integer; BEGIN IF p_metric_name IS NULL OR btrim(p_metric_name) = '' THEN RAISE EXCEPTION 'metric_name is required'; END IF; IF p_device_id IS NULL OR btrim(p_device_id) = '' THEN RAISE EXCEPTION 'device_id is required'; END IF; IF p_observed_at IS NULL THEN RAISE EXCEPTION 'observed_at is required'; END IF; v_metric := telemetry.require_metric(p_metric_name); PERFORM telemetry.ensure_device(p_device_id); v_device := telemetry.require_device(p_device_id); -- 'state' is reserved for future implementation of discrete enumerated -- machine states, likely stored as smallint. It will use the same segment -- logic as boolean metrics, but with exact comparison instead of epsilon. CASE v_metric.metric_type WHEN 'numeric' THEN NULL; WHEN 'boolean' THEN RAISE EXCEPTION 'metric % is boolean; use the boolean overload of ingest_measurement(...)', p_metric_name; WHEN 'state' THEN RAISE EXCEPTION 'metric % is state; state ingestion is not implemented yet', p_metric_name; ELSE RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name; END CASE; v_metric_lock_key := telemetry.lock_key_from_text(p_metric_name); v_device_lock_key := telemetry.lock_key_from_text(p_device_id); PERFORM pg_advisory_xact_lock(v_metric_lock_key, v_device_lock_key); v_last_observed_at := telemetry.assert_append_only(p_metric_name, p_device_id, p_observed_at); v_policy := telemetry.require_metric_policy(p_metric_name, p_observed_at); IF p_value IS NULL THEN v_normalized := NULL; ELSE v_normalized := telemetry.round_to_increment(p_value, v_policy.rounding_precision); IF v_policy.min_value IS NOT NULL AND v_normalized < v_policy.min_value THEN RAISE EXCEPTION 'value % is below min_value % for metric %', v_normalized, v_policy.min_value, p_metric_name; END IF; IF v_policy.max_value IS NOT NULL AND v_normalized > v_policy.max_value THEN RAISE EXCEPTION 'value % is above max_value % for metric %', v_normalized, v_policy.max_value, p_metric_name; END IF; END IF; v_action := telemetry.ingest_segment( v_metric.table_name, v_metric.metric_type, v_policy.comparison_mode, v_policy.policy_id, v_policy.valid_from, v_device.device_pk, p_device_id, v_normalized, p_observed_at, v_last_observed_at, v_policy.epsilon, v_policy.max_sampling_interval, v_policy.allow_null ); PERFORM telemetry.bump_watermark(p_metric_name, p_device_id, p_observed_at); PERFORM telemetry.touch_device_last_seen(p_device_id, p_observed_at); RETURN QUERY SELECT p_metric_name, p_device_id, v_metric.table_name, v_normalized, v_action; END; $$; CREATE OR REPLACE FUNCTION telemetry.ingest_measurement( p_metric_name text, p_device_id text, p_value boolean, p_observed_at timestamptz ) RETURNS TABLE ( metric_name text, device_id text, table_name text, normalized_value boolean, action text ) LANGUAGE plpgsql AS $$ DECLARE v_metric telemetry.metrics%ROWTYPE; v_policy telemetry.metric_policies%ROWTYPE; v_device telemetry.devices%ROWTYPE; v_last_observed_at timestamptz; v_action text; v_metric_lock_key integer; v_device_lock_key integer; BEGIN IF p_metric_name IS NULL OR btrim(p_metric_name) = '' THEN RAISE EXCEPTION 'metric_name is required'; END IF; IF p_device_id IS NULL OR btrim(p_device_id) = '' THEN RAISE EXCEPTION 'device_id is required'; END IF; IF p_observed_at IS NULL THEN RAISE EXCEPTION 'observed_at is required'; END IF; v_metric := telemetry.require_metric(p_metric_name); PERFORM telemetry.ensure_device(p_device_id); v_device := telemetry.require_device(p_device_id); -- 'state' is reserved for future implementation of discrete enumerated -- machine states, likely stored as smallint. It will use the same segment -- logic as boolean metrics, but with exact comparison instead of epsilon. CASE v_metric.metric_type WHEN 'boolean' THEN NULL; WHEN 'numeric' THEN RAISE EXCEPTION 'metric % is numeric; use the numeric overload of ingest_measurement(...)', p_metric_name; WHEN 'state' THEN RAISE EXCEPTION 'metric % is state; state ingestion is not implemented yet', p_metric_name; ELSE RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name; END CASE; v_metric_lock_key := telemetry.lock_key_from_text(p_metric_name); v_device_lock_key := telemetry.lock_key_from_text(p_device_id); PERFORM pg_advisory_xact_lock(v_metric_lock_key, v_device_lock_key); v_last_observed_at := telemetry.assert_append_only(p_metric_name, p_device_id, p_observed_at); v_policy := telemetry.require_metric_policy(p_metric_name, p_observed_at); v_action := telemetry.ingest_segment( v_metric.table_name, v_metric.metric_type, v_policy.comparison_mode, v_policy.policy_id, v_policy.valid_from, v_device.device_pk, p_device_id, p_value, p_observed_at, v_last_observed_at, v_policy.epsilon, v_policy.max_sampling_interval, v_policy.allow_null ); PERFORM telemetry.bump_watermark(p_metric_name, p_device_id, p_observed_at); PERFORM telemetry.touch_device_last_seen(p_device_id, p_observed_at); RETURN QUERY SELECT p_metric_name, p_device_id, v_metric.table_name, p_value, v_action; END; $$; -- Query functions. CREATE OR REPLACE FUNCTION telemetry.value_at_from_table( p_table_name text, p_metric_name text, p_device_id text, p_at timestamptz ) RETURNS jsonb LANGUAGE plpgsql STABLE AS $$ DECLARE v_value jsonb; v_table_sql text := telemetry.metric_table_sql(p_table_name); BEGIN EXECUTE format( 'WITH watermark AS ( SELECT w.last_observed_at FROM telemetry.metric_device_watermarks AS w WHERE w.metric_name = $1 AND w.device_id = $2 ) SELECT to_jsonb(s.value) FROM %s AS s JOIN telemetry.metric_policies AS mp ON mp.policy_id = s.policy_id LEFT JOIN watermark AS w ON true WHERE s.device_id = $2 AND tstzrange( s.start_time, CASE WHEN s.end_time IS NOT NULL THEN s.end_time WHEN s.value IS NULL THEN ''infinity''::timestamptz ELSE COALESCE( GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time) + mp.max_sampling_interval, ''infinity''::timestamptz ) END, ''[)'' ) @> $3 LIMIT 1', v_table_sql ) INTO v_value USING p_metric_name, p_device_id, p_at; RETURN v_value; END; $$; CREATE OR REPLACE FUNCTION telemetry.value_at( p_metric_name text, p_device_id text, p_at timestamptz ) RETURNS double precision LANGUAGE plpgsql STABLE AS $$ DECLARE v_metric telemetry.metrics%ROWTYPE; v_value jsonb; BEGIN v_metric := telemetry.require_metric(p_metric_name); -- 'state' is reserved for future implementation of discrete enumerated -- machine states, likely stored as smallint. It will use the same segment -- logic as boolean metrics, but with exact comparison instead of epsilon. CASE v_metric.metric_type WHEN 'numeric' THEN v_value := telemetry.value_at_from_table( v_metric.table_name, p_metric_name, p_device_id, p_at ); RETURN CASE WHEN v_value IS NULL THEN NULL -- PostgreSQL must cast jsonb scalars via text first; direct -- casts such as (v_value)::double precision are not supported. ELSE v_value::text::double precision END; WHEN 'boolean' THEN RAISE EXCEPTION 'metric % is boolean; use telemetry.boolean_value_at(...)', p_metric_name; WHEN 'state' THEN RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name; ELSE RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name; END CASE; END; $$; CREATE OR REPLACE FUNCTION telemetry.boolean_value_at( p_metric_name text, p_device_id text, p_at timestamptz ) RETURNS boolean LANGUAGE plpgsql STABLE AS $$ DECLARE v_metric telemetry.metrics%ROWTYPE; v_value jsonb; BEGIN v_metric := telemetry.require_metric(p_metric_name); -- 'state' is reserved for future implementation of discrete enumerated -- machine states, likely stored as smallint. It will use the same segment -- logic as boolean metrics, but with exact comparison instead of epsilon. CASE v_metric.metric_type WHEN 'boolean' THEN v_value := telemetry.value_at_from_table( v_metric.table_name, p_metric_name, p_device_id, p_at ); RETURN CASE WHEN v_value IS NULL THEN NULL -- PostgreSQL must cast jsonb scalars via text first; direct -- casts such as (v_value)::boolean are not supported. ELSE v_value::text::boolean END; WHEN 'numeric' THEN RAISE EXCEPTION 'metric % is numeric; use telemetry.value_at(...)', p_metric_name; WHEN 'state' THEN RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name; ELSE RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name; END CASE; END; $$; CREATE OR REPLACE FUNCTION telemetry.segments_between_from_table( p_table_name text, p_metric_name text, p_device_id text, p_from timestamptz, p_to timestamptz ) RETURNS TABLE ( device_id text, start_time timestamptz, end_time timestamptz, value_json jsonb, samples_count integer ) LANGUAGE plpgsql STABLE AS $$ DECLARE v_table_sql text := telemetry.metric_table_sql(p_table_name); BEGIN RETURN QUERY EXECUTE format( 'WITH watermark AS ( SELECT w.last_observed_at FROM telemetry.metric_device_watermarks AS w WHERE w.metric_name = $1 AND w.device_id = $2 ), open_segment AS ( SELECT s.* FROM %s AS s WHERE s.device_id = $2 AND s.end_time IS NULL LIMIT 1 ), stored_segments AS ( SELECT s.device_id, GREATEST(s.start_time, $3) AS start_time, LEAST( CASE WHEN s.end_time IS NOT NULL THEN s.end_time WHEN s.value IS NULL THEN $4 ELSE LEAST( COALESCE( GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time) + mp.max_sampling_interval, $4 ), $4 ) END, $4 ) AS end_time, to_jsonb(s.value) AS value_json, s.samples_count FROM %s AS s JOIN telemetry.metric_policies AS mp ON mp.policy_id = s.policy_id LEFT JOIN watermark AS w ON true WHERE s.device_id = $2 AND tstzrange( s.start_time, CASE WHEN s.end_time IS NOT NULL THEN s.end_time WHEN s.value IS NULL THEN ''infinity''::timestamptz ELSE COALESCE( GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time) + mp.max_sampling_interval, ''infinity''::timestamptz ) END, ''[)'' ) && tstzrange($3, $4, ''[)'') ), synthetic_tail AS ( SELECT $2::text AS device_id, GREATEST( GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time) + mp.max_sampling_interval, $3 ) AS start_time, $4 AS end_time, NULL::jsonb AS value_json, 0 AS samples_count FROM open_segment AS s JOIN telemetry.metric_policies AS mp ON mp.policy_id = s.policy_id JOIN watermark AS w ON true WHERE s.value IS NOT NULL AND GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time) + mp.max_sampling_interval < $4 ) SELECT * FROM ( SELECT * FROM stored_segments WHERE end_time > start_time UNION ALL SELECT * FROM synthetic_tail WHERE end_time > start_time ) AS combined_segments ORDER BY start_time', v_table_sql, v_table_sql ) USING p_metric_name, p_device_id, p_from, p_to; END; $$; CREATE OR REPLACE FUNCTION telemetry.metric_segments( p_metric_name text, p_device_id text, p_from timestamptz, p_to timestamptz ) RETURNS TABLE ( device_id text, start_time timestamptz, end_time timestamptz, value double precision, samples_count integer ) LANGUAGE plpgsql STABLE AS $$ DECLARE v_metric telemetry.metrics%ROWTYPE; BEGIN IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN RAISE EXCEPTION 'p_from and p_to must define a non-empty interval'; END IF; v_metric := telemetry.require_metric(p_metric_name); -- 'state' is reserved for future implementation of discrete enumerated -- machine states, likely stored as smallint. It will use the same segment -- logic as boolean metrics, but with exact comparison instead of epsilon. CASE v_metric.metric_type WHEN 'numeric' THEN RETURN QUERY SELECT s.device_id, s.start_time, s.end_time, CASE WHEN s.value_json IS NULL THEN NULL -- PostgreSQL must cast jsonb scalars via text first; -- direct casts such as (s.value_json)::double precision -- are not supported. ELSE s.value_json::text::double precision END AS value, s.samples_count FROM telemetry.segments_between_from_table( v_metric.table_name, p_metric_name, p_device_id, p_from, p_to ) AS s; WHEN 'boolean' THEN RAISE EXCEPTION 'metric % is boolean; use telemetry.boolean_segments_between(...)', p_metric_name; WHEN 'state' THEN RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name; ELSE RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name; END CASE; END; $$; CREATE OR REPLACE FUNCTION telemetry.boolean_segments_between( p_metric_name text, p_device_id text, p_from timestamptz, p_to timestamptz ) RETURNS TABLE ( device_id text, start_time timestamptz, end_time timestamptz, value boolean, samples_count integer ) LANGUAGE plpgsql STABLE AS $$ DECLARE v_metric telemetry.metrics%ROWTYPE; BEGIN IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN RAISE EXCEPTION 'p_from and p_to must define a non-empty interval'; END IF; v_metric := telemetry.require_metric(p_metric_name); -- 'state' is reserved for future implementation of discrete enumerated -- machine states, likely stored as smallint. It will use the same segment -- logic as boolean metrics, but with exact comparison instead of epsilon. CASE v_metric.metric_type WHEN 'boolean' THEN RETURN QUERY SELECT s.device_id, s.start_time, s.end_time, CASE WHEN s.value_json IS NULL THEN NULL -- PostgreSQL must cast jsonb scalars via text first; -- direct casts such as (s.value_json)::boolean are not -- supported. ELSE s.value_json::text::boolean END AS value, s.samples_count FROM telemetry.segments_between_from_table( v_metric.table_name, p_metric_name, p_device_id, p_from, p_to ) AS s; WHEN 'numeric' THEN RAISE EXCEPTION 'metric % is numeric; use telemetry.metric_segments(...)', p_metric_name; WHEN 'state' THEN RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name; ELSE RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name; END CASE; END; $$; CREATE OR REPLACE FUNCTION telemetry.samples_from_table( p_table_name text, p_metric_name text, p_device_id text, p_from timestamptz, p_to timestamptz, p_points integer ) RETURNS TABLE ( sample_at timestamptz, value_json jsonb ) LANGUAGE plpgsql STABLE AS $$ DECLARE v_table_sql text := telemetry.metric_table_sql(p_table_name); BEGIN RETURN QUERY EXECUTE format( 'WITH watermark AS ( SELECT w.last_observed_at FROM telemetry.metric_device_watermarks AS w WHERE w.metric_name = $1 AND w.device_id = $2 ), sample_points AS ( SELECT $3 + (($4 - $3) * gs.i::double precision / ($5 - 1)::double precision) AS sample_at FROM generate_series(0, $5 - 1) AS gs(i) ) SELECT sp.sample_at, to_jsonb(seg.value) AS value_json FROM sample_points AS sp LEFT JOIN watermark AS w ON true LEFT JOIN LATERAL ( SELECT s.value FROM %s AS s JOIN telemetry.metric_policies AS mp ON mp.policy_id = s.policy_id WHERE s.device_id = $2 AND tstzrange( s.start_time, CASE WHEN s.end_time IS NOT NULL THEN s.end_time WHEN s.value IS NULL THEN ''infinity''::timestamptz ELSE COALESCE( GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time) + mp.max_sampling_interval, ''infinity''::timestamptz ) END, ''[)'' ) @> sp.sample_at LIMIT 1 ) AS seg ON true ORDER BY sp.sample_at', v_table_sql ) USING p_metric_name, p_device_id, p_from, p_to, p_points; END; $$; CREATE OR REPLACE FUNCTION telemetry.sample_metric( p_metric_name text, p_device_id text, p_from timestamptz, p_to timestamptz, p_points integer ) RETURNS TABLE ( sample_at timestamptz, value double precision ) LANGUAGE plpgsql STABLE AS $$ DECLARE v_metric telemetry.metrics%ROWTYPE; BEGIN IF p_points IS NULL OR p_points < 2 THEN RAISE EXCEPTION 'p_points must be at least 2'; END IF; IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN RAISE EXCEPTION 'p_from and p_to must define a non-empty interval'; END IF; v_metric := telemetry.require_metric(p_metric_name); -- 'state' is reserved for future implementation of discrete enumerated -- machine states, likely stored as smallint. It will use the same segment -- logic as boolean metrics, but with exact comparison instead of epsilon. CASE v_metric.metric_type WHEN 'numeric' THEN RETURN QUERY SELECT s.sample_at, CASE WHEN s.value_json IS NULL THEN NULL -- PostgreSQL must cast jsonb scalars via text first; -- direct casts such as (s.value_json)::double precision -- are not supported. ELSE s.value_json::text::double precision END AS value FROM telemetry.samples_from_table( v_metric.table_name, p_metric_name, p_device_id, p_from, p_to, p_points ) AS s; WHEN 'boolean' THEN RAISE EXCEPTION 'metric % is boolean; use telemetry.boolean_samples(...)', p_metric_name; WHEN 'state' THEN RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name; ELSE RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name; END CASE; END; $$; CREATE OR REPLACE FUNCTION telemetry.boolean_samples( p_metric_name text, p_device_id text, p_from timestamptz, p_to timestamptz, p_points integer ) RETURNS TABLE ( sample_at timestamptz, value boolean ) LANGUAGE plpgsql STABLE AS $$ DECLARE v_metric telemetry.metrics%ROWTYPE; BEGIN IF p_points IS NULL OR p_points < 2 THEN RAISE EXCEPTION 'p_points must be at least 2'; END IF; IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN RAISE EXCEPTION 'p_from and p_to must define a non-empty interval'; END IF; v_metric := telemetry.require_metric(p_metric_name); -- 'state' is reserved for future implementation of discrete enumerated -- machine states, likely stored as smallint. It will use the same segment -- logic as boolean metrics, but with exact comparison instead of epsilon. CASE v_metric.metric_type WHEN 'boolean' THEN RETURN QUERY SELECT s.sample_at, CASE WHEN s.value_json IS NULL THEN NULL -- PostgreSQL must cast jsonb scalars via text first; -- direct casts such as (s.value_json)::boolean are not -- supported. ELSE s.value_json::text::boolean END AS value FROM telemetry.samples_from_table( v_metric.table_name, p_metric_name, p_device_id, p_from, p_to, p_points ) AS s; WHEN 'numeric' THEN RAISE EXCEPTION 'metric % is numeric; use telemetry.sample_metric(...)', p_metric_name; WHEN 'state' THEN RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name; ELSE RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name; END CASE; END; $$; -- Maintenance utilities. CREATE OR REPLACE FUNCTION telemetry.verify_segments(p_metric_name text) RETURNS TABLE ( metric_name text, device_id text, issue text, segment_id bigint, related_segment_id bigint, start_time timestamptz, end_time timestamptz, details text ) LANGUAGE plpgsql STABLE AS $$ DECLARE v_metric telemetry.metrics%ROWTYPE; v_table_sql text; BEGIN v_metric := telemetry.require_metric(p_metric_name); v_table_sql := telemetry.metric_table_sql(v_metric.table_name); RETURN QUERY EXECUTE format( 'WITH ordered AS ( SELECT s.segment_id, s.device_id, s.policy_id, s.start_time, s.end_time, lag(s.segment_id) OVER ( PARTITION BY s.device_id ORDER BY s.start_time, s.segment_id ) AS prev_segment_id, lag(s.end_time) OVER ( PARTITION BY s.device_id ORDER BY s.start_time, s.segment_id ) AS prev_end_time, lag(s.policy_id) OVER ( PARTITION BY s.device_id ORDER BY s.start_time, s.segment_id ) AS prev_policy_id FROM %s AS s ), open_counts AS ( SELECT s.device_id, count(*)::integer AS open_count FROM %s AS s WHERE s.end_time IS NULL GROUP BY s.device_id HAVING count(*) > 1 ) SELECT $1::text AS metric_name, o.device_id, ''invalid_interval''::text AS issue, o.segment_id, NULL::bigint AS related_segment_id, o.start_time, o.end_time, ''end_time must be greater than start_time''::text AS details FROM ordered AS o WHERE o.end_time IS NOT NULL AND o.end_time <= o.start_time UNION ALL SELECT $1::text, oc.device_id, ''multiple_open_segments''::text, NULL::bigint, NULL::bigint, NULL::timestamptz, NULL::timestamptz, format(''%%s open segments found'', oc.open_count) FROM open_counts AS oc UNION ALL SELECT $1::text, o.device_id, ''overlap''::text, o.segment_id, o.prev_segment_id, o.start_time, o.end_time, format(''previous segment ends at %%s'', o.prev_end_time) FROM ordered AS o WHERE o.prev_end_time IS NOT NULL AND o.prev_end_time > o.start_time UNION ALL SELECT $1::text, o.device_id, ''unexpected_gap''::text, o.segment_id, o.prev_segment_id, o.start_time, o.end_time, format( ''stored gap of %%s violates expected continuity (max_sampling_interval=%%s)'', o.start_time - o.prev_end_time, mp.max_sampling_interval ) FROM ordered AS o JOIN telemetry.metric_policies AS mp ON mp.policy_id = o.prev_policy_id WHERE o.prev_end_time IS NOT NULL AND o.start_time > o.prev_end_time ORDER BY device_id, start_time NULLS FIRST, segment_id NULLS FIRST', v_table_sql, v_table_sql ) USING p_metric_name; END; $$; CREATE OR REPLACE FUNCTION telemetry.compact_segments( p_metric_name text, p_before timestamptz ) RETURNS TABLE ( device_id text, kept_segment_id bigint, merged_segments integer, start_time timestamptz, end_time timestamptz, total_samples integer ) LANGUAGE plpgsql AS $$ DECLARE v_metric telemetry.metrics%ROWTYPE; v_table_sql text; v_merge record; BEGIN IF p_before IS NULL THEN RAISE EXCEPTION 'p_before is required'; END IF; v_metric := telemetry.require_metric(p_metric_name); v_table_sql := telemetry.metric_table_sql(v_metric.table_name); FOR v_merge IN EXECUTE format( 'WITH candidates AS ( SELECT s.segment_id, s.device_id, s.start_time, s.end_time, s.samples_count, s.policy_id, s.value, lag(s.end_time) OVER ( PARTITION BY s.device_id ORDER BY s.start_time, s.segment_id ) AS prev_end_time, lag(s.policy_id) OVER ( PARTITION BY s.device_id ORDER BY s.start_time, s.segment_id ) AS prev_policy_id, lag(s.value) OVER ( PARTITION BY s.device_id ORDER BY s.start_time, s.segment_id ) AS prev_value FROM %s AS s WHERE s.end_time IS NOT NULL AND s.end_time <= $1 ), grouped AS ( SELECT c.*, sum( CASE WHEN c.prev_end_time IS NULL OR c.start_time <> c.prev_end_time OR c.policy_id IS DISTINCT FROM c.prev_policy_id OR c.value IS DISTINCT FROM c.prev_value THEN 1 ELSE 0 END ) OVER ( PARTITION BY c.device_id ORDER BY c.start_time, c.segment_id ) AS grp FROM candidates AS c ), aggregates AS ( SELECT g.device_id, (array_agg(g.segment_id ORDER BY g.start_time, g.segment_id))[1] AS keep_segment_id, array_remove( array_agg(g.segment_id ORDER BY g.start_time, g.segment_id), (array_agg(g.segment_id ORDER BY g.start_time, g.segment_id))[1] ) AS delete_segment_ids, min(g.start_time) AS merged_start_time, max(g.end_time) AS merged_end_time, sum(g.samples_count)::integer AS total_samples, count(*)::integer AS merged_segments FROM grouped AS g GROUP BY g.device_id, g.grp HAVING count(*) > 1 ) SELECT a.device_id, a.keep_segment_id, a.delete_segment_ids, a.merged_segments, a.merged_start_time, a.merged_end_time, a.total_samples FROM aggregates AS a ORDER BY a.device_id, a.merged_start_time', v_table_sql ) USING p_before LOOP EXECUTE format( 'DELETE FROM %s WHERE segment_id = ANY($1)', v_table_sql ) USING v_merge.delete_segment_ids; EXECUTE format( 'UPDATE %s SET end_time = $1, samples_count = $2 WHERE segment_id = $3', v_table_sql ) USING v_merge.merged_end_time, v_merge.total_samples, v_merge.keep_segment_id; device_id := v_merge.device_id; kept_segment_id := v_merge.keep_segment_id; merged_segments := v_merge.merged_segments; start_time := v_merge.merged_start_time; end_time := v_merge.merged_end_time; total_samples := v_merge.total_samples; RETURN NEXT; END LOOP; RETURN; END; $$; CREATE OR REPLACE FUNCTION telemetry.inactive_devices(p_threshold interval) RETURNS TABLE ( device_pk bigint, device_id text, device_type text, location text, last_seen timestamptz, inactive_for interval ) LANGUAGE sql STABLE AS $$ SELECT d.device_pk, d.device_id, d.device_type, d.location, d.last_seen, now() - d.last_seen AS inactive_for FROM telemetry.devices AS d WHERE d.last_seen IS NOT NULL AND now() - d.last_seen > $1 ORDER BY inactive_for DESC, d.device_id; $$; -- Example metric registrations. SELECT telemetry.register_numeric_metric( p_metric_name => 'ambient_temperature', p_table_name => 'ambient_temperature_segments', p_domain_name => 'environmental', p_epsilon => 0.05, p_min_value => -40, p_max_value => 60, p_rounding_precision => 0.1, p_max_sampling_interval => '5 minutes', p_allow_null => true ); SELECT telemetry.register_numeric_metric( p_metric_name => 'cpu_temperature', p_table_name => 'cpu_temperature_segments', p_domain_name => 'system', p_epsilon => 0.5, p_min_value => 0, p_max_value => 120, p_rounding_precision => 1, p_max_sampling_interval => '10 seconds', p_allow_null => true ); SELECT telemetry.register_numeric_metric( p_metric_name => 'humidity', p_table_name => 'humidity_segments', p_domain_name => 'environmental', p_epsilon => 0.5, p_min_value => 0, p_max_value => 100, p_rounding_precision => 0.1, p_max_sampling_interval => '5 minutes', p_allow_null => true ); COMMIT;