1 contributor
BEGIN;
-- Extensions.
CREATE SCHEMA telemetry;
CREATE EXTENSION btree_gist;
-- Enums.
-- 'state' is reserved for future implementation of discrete enumerated
-- machine states such as thermostat_mode, pump_state, hvac_mode, or
-- door_state. Values will likely be stored as smallint and use the
-- same segment engine as boolean metrics, but with exact comparison
-- over enumerated values.
CREATE TYPE telemetry.metric_type_enum AS ENUM ('numeric', 'boolean', 'state');
CREATE TYPE telemetry.comparison_mode_enum AS ENUM ('epsilon', 'exact');
-- Registry utility functions.
CREATE OR REPLACE FUNCTION telemetry.normalize_metric_table_name(p_table_name text)
RETURNS text
LANGUAGE plpgsql
IMMUTABLE
AS $$
DECLARE
v_parts text[];
BEGIN
IF p_table_name IS NULL OR btrim(p_table_name) = '' THEN
RAISE EXCEPTION 'table_name is required';
END IF;
v_parts := pg_catalog.parse_ident(p_table_name, false);
CASE COALESCE(array_length(v_parts, 1), 0)
WHEN 1 THEN
RETURN v_parts[1];
WHEN 2 THEN
IF v_parts[1] <> 'telemetry' THEN
RAISE EXCEPTION 'metric tables must live in schema telemetry: %', p_table_name;
END IF;
RETURN v_parts[2];
ELSE
RAISE EXCEPTION 'invalid metric table reference: %', p_table_name;
END CASE;
END;
$$;
CREATE OR REPLACE FUNCTION telemetry.metric_table_sql(p_table_name text)
RETURNS text
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
v_table_name text := telemetry.normalize_metric_table_name(p_table_name);
v_table_sql text := format('telemetry.%I', v_table_name);
BEGIN
IF to_regclass(v_table_sql) IS NULL THEN
RAISE EXCEPTION 'could not resolve metric table %', v_table_sql;
END IF;
RETURN v_table_sql;
END;
$$;
-- Core registry tables.
CREATE TABLE telemetry.metrics (
metric_pk bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
metric_name text NOT NULL UNIQUE,
table_name text NOT NULL UNIQUE,
domain_name text NOT NULL DEFAULT 'generic',
metric_type telemetry.metric_type_enum NOT NULL,
comparison_mode telemetry.comparison_mode_enum NOT NULL,
epsilon double precision,
min_value double precision,
max_value double precision,
rounding_precision double precision,
allow_null boolean NOT NULL DEFAULT true,
max_sampling_interval interval NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
CHECK (min_value IS NULL OR max_value IS NULL OR min_value <= max_value),
CONSTRAINT metrics_policy_shape_check
CHECK (
(metric_type = 'numeric'::telemetry.metric_type_enum
AND comparison_mode = 'epsilon'::telemetry.comparison_mode_enum
AND epsilon IS NOT NULL
AND rounding_precision IS NOT NULL)
OR
(metric_type IN (
'boolean'::telemetry.metric_type_enum,
'state'::telemetry.metric_type_enum
)
AND comparison_mode = 'exact'::telemetry.comparison_mode_enum
AND epsilon IS NULL
AND min_value IS NULL
AND max_value IS NULL
AND rounding_precision IS NULL)
),
CONSTRAINT metrics_table_name_check
CHECK (table_name = telemetry.normalize_metric_table_name(table_name))
);
-- Policy tables.
CREATE TABLE telemetry.metric_policies (
policy_id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
metric_name text NOT NULL
REFERENCES telemetry.metrics(metric_name)
ON UPDATE CASCADE
ON DELETE CASCADE,
valid_from timestamptz NOT NULL,
comparison_mode telemetry.comparison_mode_enum NOT NULL,
epsilon double precision,
min_value double precision,
max_value double precision,
rounding_precision double precision,
allow_null boolean NOT NULL,
max_sampling_interval interval NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
UNIQUE (metric_name, valid_from)
);
CREATE TABLE telemetry.metric_retention_policies (
metric_name text PRIMARY KEY
REFERENCES telemetry.metrics(metric_name)
ON UPDATE CASCADE
ON DELETE CASCADE,
raw_retention interval NULL,
compact_after interval NULL
);
-- Device registry tables.
CREATE TABLE telemetry.devices (
device_pk bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
device_id text NOT NULL UNIQUE,
device_type text NULL,
location text NULL,
metadata jsonb NULL,
last_seen timestamptz NULL,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE TABLE telemetry.metric_device_watermarks (
metric_name text NOT NULL
REFERENCES telemetry.metrics(metric_name)
ON UPDATE CASCADE
ON DELETE CASCADE,
device_id text NOT NULL
REFERENCES telemetry.devices(device_id)
ON UPDATE CASCADE
ON DELETE CASCADE,
last_observed_at timestamptz NOT NULL,
PRIMARY KEY (metric_name, device_id)
);
-- Runtime helpers.
CREATE OR REPLACE FUNCTION telemetry.round_to_increment(
p_value double precision,
p_increment double precision
)
RETURNS double precision
LANGUAGE sql
IMMUTABLE
STRICT
AS $$
SELECT CASE
WHEN p_increment <= 0 THEN p_value
ELSE round(p_value / p_increment) * p_increment
END;
$$;
CREATE OR REPLACE FUNCTION telemetry.ensure_device(p_device_id text)
RETURNS void
LANGUAGE sql
AS $$
INSERT INTO telemetry.devices (device_id)
VALUES ($1)
ON CONFLICT (device_id) DO NOTHING;
$$;
CREATE OR REPLACE FUNCTION telemetry.touch_device_last_seen(
p_device_id text,
p_observed_at timestamptz
)
RETURNS void
LANGUAGE sql
AS $$
UPDATE telemetry.devices
SET last_seen = CASE
WHEN last_seen IS NULL OR last_seen < $2 THEN $2
ELSE last_seen
END
WHERE device_id = $1;
$$;
CREATE OR REPLACE FUNCTION telemetry.require_device(p_device_id text)
RETURNS telemetry.devices
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
v_device telemetry.devices%ROWTYPE;
BEGIN
SELECT *
INTO v_device
FROM telemetry.devices AS d
WHERE d.device_id = p_device_id;
IF NOT FOUND THEN
RAISE EXCEPTION 'unknown device: %', p_device_id;
END IF;
RETURN v_device;
END;
$$;
CREATE OR REPLACE FUNCTION telemetry.lock_key_from_text(p_value text)
RETURNS integer
LANGUAGE sql
IMMUTABLE
STRICT
AS $$
SELECT (hashtextextended($1, 0) % 2147483647)::integer;
$$;
-- Segment template and table creation.
CREATE OR REPLACE FUNCTION telemetry.create_segment_table(
p_table_name text,
p_value_type text
)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
v_base_table_name text := telemetry.normalize_metric_table_name(p_table_name);
v_table_name text := format('telemetry.%I', v_base_table_name);
v_metric_name text;
v_initial_policy_id bigint;
v_open_idx text := format('%s_open_segment_uidx', v_base_table_name);
v_start_idx text := format('%s_device_start_cover_idx', v_base_table_name);
v_device_period_gist_idx text := format('%s_device_period_gist', v_base_table_name);
v_period_excl text := format('%s_device_period_excl', v_base_table_name);
v_device_pk_fk text := format('%s_device_pk_fkey', v_base_table_name);
v_device_fk text := format('%s_device_id_fkey', v_base_table_name);
v_policy_fk text := format('%s_policy_id_fkey', v_base_table_name);
v_samples_check text := format('%s_samples_count_check', v_base_table_name);
v_end_time_check text := format('%s_check', v_base_table_name);
BEGIN
IF p_value_type NOT IN ('double precision', 'boolean', 'smallint') THEN
RAISE EXCEPTION 'unsupported segment value type: %', p_value_type;
END IF;
SELECT m.metric_name
INTO v_metric_name
FROM telemetry.metrics AS m
WHERE m.table_name = v_base_table_name;
IF v_metric_name IS NULL THEN
RAISE EXCEPTION
'metric metadata must exist before creating segment table %',
v_base_table_name;
END IF;
SELECT mp.policy_id
INTO v_initial_policy_id
FROM telemetry.metric_policies AS mp
WHERE mp.metric_name = v_metric_name
AND mp.valid_from = '-infinity'::timestamptz;
IF v_initial_policy_id IS NULL THEN
RAISE EXCEPTION
'initial policy row is missing for metric %',
v_metric_name;
END IF;
EXECUTE format(
'CREATE TABLE IF NOT EXISTS %s (
segment_id bigserial PRIMARY KEY,
device_pk bigint NOT NULL,
device_id text NOT NULL,
start_time timestamptz NOT NULL,
end_time timestamptz NULL,
value %s NULL,
-- samples_count = 0 is intentional for synthetic gap segments inserted
-- during lazy gap detection; those intervals represent missing data, not samples.
samples_count integer NOT NULL DEFAULT 1
CONSTRAINT %I CHECK (samples_count >= 0),
policy_id bigint NOT NULL,
segment_period tstzrange GENERATED ALWAYS AS (
tstzrange(start_time, COALESCE(end_time, ''infinity''::timestamptz), ''[)'')
) STORED,
CONSTRAINT %I CHECK (end_time IS NULL OR end_time > start_time),
CONSTRAINT %I
FOREIGN KEY (device_pk)
REFERENCES telemetry.devices(device_pk)
ON UPDATE CASCADE
ON DELETE CASCADE,
CONSTRAINT %I
FOREIGN KEY (device_id)
REFERENCES telemetry.devices(device_id)
ON UPDATE CASCADE
ON DELETE CASCADE,
CONSTRAINT %I
FOREIGN KEY (policy_id)
REFERENCES telemetry.metric_policies(policy_id)
) WITH (
fillfactor = 90,
autovacuum_vacuum_scale_factor = 0.02,
autovacuum_analyze_scale_factor = 0.01
)',
v_table_name,
p_value_type,
v_samples_check,
v_end_time_check,
v_device_pk_fk,
v_device_fk,
v_policy_fk
);
EXECUTE format(
'CREATE UNIQUE INDEX IF NOT EXISTS %I
ON %s (device_id)
WHERE end_time IS NULL',
v_open_idx,
v_table_name
);
EXECUTE format(
'CREATE INDEX IF NOT EXISTS %I
ON %s (device_id, start_time DESC)
INCLUDE (end_time, value, samples_count)',
v_start_idx,
v_table_name
);
EXECUTE format(
'CREATE INDEX IF NOT EXISTS %I
ON %s
USING gist (device_id, segment_period)',
v_device_period_gist_idx,
v_table_name
);
BEGIN
EXECUTE format(
'ALTER TABLE %s
ADD CONSTRAINT %I
EXCLUDE USING gist (
device_id WITH =,
segment_period WITH &&
)',
v_table_name,
v_period_excl
);
EXCEPTION
WHEN duplicate_object THEN
NULL;
WHEN duplicate_table THEN
NULL;
END;
END;
$$;
-- Metric registration.
CREATE OR REPLACE FUNCTION telemetry.register_numeric_metric(
p_metric_name text,
p_table_name text,
p_domain_name text,
p_epsilon double precision,
p_min_value double precision,
p_max_value double precision,
p_rounding_precision double precision,
p_max_sampling_interval interval,
p_allow_null boolean DEFAULT true
)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
v_table_name text := telemetry.normalize_metric_table_name(p_table_name);
BEGIN
INSERT INTO telemetry.metrics (
metric_name,
table_name,
domain_name,
metric_type,
comparison_mode,
epsilon,
min_value,
max_value,
rounding_precision,
max_sampling_interval,
allow_null
)
VALUES (
p_metric_name,
v_table_name,
p_domain_name,
'numeric',
'epsilon',
p_epsilon,
p_min_value,
p_max_value,
p_rounding_precision,
p_max_sampling_interval,
p_allow_null
)
ON CONFLICT (metric_name) DO UPDATE
SET table_name = EXCLUDED.table_name,
domain_name = EXCLUDED.domain_name,
metric_type = EXCLUDED.metric_type,
comparison_mode = EXCLUDED.comparison_mode,
epsilon = EXCLUDED.epsilon,
min_value = EXCLUDED.min_value,
max_value = EXCLUDED.max_value,
rounding_precision = EXCLUDED.rounding_precision,
max_sampling_interval = EXCLUDED.max_sampling_interval,
allow_null = EXCLUDED.allow_null,
updated_at = now();
INSERT INTO telemetry.metric_policies (
metric_name,
valid_from,
comparison_mode,
epsilon,
min_value,
max_value,
rounding_precision,
allow_null,
max_sampling_interval
)
VALUES (
p_metric_name,
'-infinity'::timestamptz,
'epsilon',
p_epsilon,
p_min_value,
p_max_value,
p_rounding_precision,
p_allow_null,
p_max_sampling_interval
)
ON CONFLICT (metric_name, valid_from) DO NOTHING;
PERFORM telemetry.create_segment_table(v_table_name, 'double precision');
PERFORM telemetry.metric_table_sql(v_table_name);
END;
$$;
CREATE OR REPLACE FUNCTION telemetry.register_boolean_metric(
p_metric_name text,
p_table_name text,
p_domain_name text,
p_max_sampling_interval interval,
p_allow_null boolean DEFAULT true
)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
v_table_name text := telemetry.normalize_metric_table_name(p_table_name);
BEGIN
INSERT INTO telemetry.metrics (
metric_name,
table_name,
domain_name,
metric_type,
comparison_mode,
epsilon,
min_value,
max_value,
rounding_precision,
max_sampling_interval,
allow_null
)
VALUES (
p_metric_name,
v_table_name,
p_domain_name,
'boolean',
'exact',
NULL,
NULL,
NULL,
NULL,
p_max_sampling_interval,
p_allow_null
)
ON CONFLICT (metric_name) DO UPDATE
SET table_name = EXCLUDED.table_name,
domain_name = EXCLUDED.domain_name,
metric_type = EXCLUDED.metric_type,
comparison_mode = EXCLUDED.comparison_mode,
epsilon = EXCLUDED.epsilon,
min_value = EXCLUDED.min_value,
max_value = EXCLUDED.max_value,
rounding_precision = EXCLUDED.rounding_precision,
max_sampling_interval = EXCLUDED.max_sampling_interval,
allow_null = EXCLUDED.allow_null,
updated_at = now();
INSERT INTO telemetry.metric_policies (
metric_name,
valid_from,
comparison_mode,
epsilon,
min_value,
max_value,
rounding_precision,
allow_null,
max_sampling_interval
)
VALUES (
p_metric_name,
'-infinity'::timestamptz,
'exact',
NULL,
NULL,
NULL,
NULL,
p_allow_null,
p_max_sampling_interval
)
ON CONFLICT (metric_name, valid_from) DO NOTHING;
PERFORM telemetry.create_segment_table(v_table_name, 'boolean');
PERFORM telemetry.metric_table_sql(v_table_name);
END;
$$;
-- Metric and policy lookup.
CREATE OR REPLACE FUNCTION telemetry.require_metric(p_metric_name text)
RETURNS telemetry.metrics
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
v_metric telemetry.metrics%ROWTYPE;
BEGIN
SELECT *
INTO v_metric
FROM telemetry.metrics AS m
WHERE m.metric_name = p_metric_name;
IF NOT FOUND THEN
RAISE EXCEPTION 'unknown metric: %', p_metric_name;
END IF;
RETURN v_metric;
END;
$$;
CREATE OR REPLACE FUNCTION telemetry.require_metric_policy(
p_metric_name text,
p_observed_at timestamptz
)
RETURNS telemetry.metric_policies
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
v_policy telemetry.metric_policies%ROWTYPE;
BEGIN
SELECT *
INTO v_policy
FROM telemetry.metric_policies AS mp
WHERE mp.metric_name = p_metric_name
AND mp.valid_from <= p_observed_at
ORDER BY mp.valid_from DESC
LIMIT 1;
IF NOT FOUND THEN
RAISE EXCEPTION
'no active policy for metric % at %',
p_metric_name,
p_observed_at;
END IF;
RETURN v_policy;
END;
$$;
CREATE OR REPLACE FUNCTION telemetry.add_metric_policy(
p_metric_name text,
p_valid_from timestamptz,
p_comparison_mode telemetry.comparison_mode_enum,
p_epsilon double precision,
p_min_value double precision,
p_max_value double precision,
p_rounding_precision double precision,
p_max_sampling_interval interval,
p_allow_null boolean
)
RETURNS bigint
LANGUAGE plpgsql
AS $$
DECLARE
v_metric telemetry.metrics%ROWTYPE;
v_policy_id bigint;
BEGIN
IF p_valid_from IS NULL THEN
RAISE EXCEPTION 'valid_from is required';
END IF;
IF p_max_sampling_interval IS NULL THEN
RAISE EXCEPTION 'max_sampling_interval is required';
END IF;
v_metric := telemetry.require_metric(p_metric_name);
IF p_min_value IS NOT NULL
AND p_max_value IS NOT NULL
AND p_min_value > p_max_value THEN
RAISE EXCEPTION 'min_value % exceeds max_value % for metric %',
p_min_value,
p_max_value,
p_metric_name;
END IF;
CASE v_metric.metric_type
WHEN 'numeric' THEN
IF p_comparison_mode <> 'epsilon'::telemetry.comparison_mode_enum THEN
RAISE EXCEPTION
'numeric metric % requires comparison_mode epsilon',
p_metric_name;
END IF;
IF p_epsilon IS NULL OR p_rounding_precision IS NULL THEN
RAISE EXCEPTION
'numeric metric % requires epsilon and rounding_precision',
p_metric_name;
END IF;
WHEN 'boolean', 'state' THEN
IF p_comparison_mode <> 'exact'::telemetry.comparison_mode_enum THEN
RAISE EXCEPTION
'metric % with type % requires comparison_mode exact',
p_metric_name,
v_metric.metric_type;
END IF;
IF p_epsilon IS NOT NULL
OR p_min_value IS NOT NULL
OR p_max_value IS NOT NULL
OR p_rounding_precision IS NOT NULL THEN
RAISE EXCEPTION
'metric % with type % does not accept epsilon/min/max/rounding_precision in policies',
p_metric_name,
v_metric.metric_type;
END IF;
ELSE
RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
END CASE;
IF EXISTS (
SELECT 1
FROM telemetry.metric_policies AS mp
WHERE mp.metric_name = p_metric_name
AND mp.valid_from = p_valid_from
) THEN
RAISE EXCEPTION
'policy already exists for metric % at %',
p_metric_name,
p_valid_from;
END IF;
INSERT INTO telemetry.metric_policies (
metric_name,
valid_from,
comparison_mode,
epsilon,
min_value,
max_value,
rounding_precision,
allow_null,
max_sampling_interval
)
VALUES (
p_metric_name,
p_valid_from,
p_comparison_mode,
p_epsilon,
p_min_value,
p_max_value,
p_rounding_precision,
p_allow_null,
p_max_sampling_interval
)
RETURNING policy_id
INTO v_policy_id;
UPDATE telemetry.metrics AS m
SET comparison_mode = p_comparison_mode,
epsilon = p_epsilon,
min_value = p_min_value,
max_value = p_max_value,
rounding_precision = p_rounding_precision,
allow_null = p_allow_null,
max_sampling_interval = p_max_sampling_interval,
updated_at = now()
WHERE m.metric_name = p_metric_name
AND NOT EXISTS (
SELECT 1
FROM telemetry.metric_policies AS later
WHERE later.metric_name = p_metric_name
AND later.valid_from > p_valid_from
);
RETURN v_policy_id;
END;
$$;
-- Ingestion functions.
CREATE OR REPLACE FUNCTION telemetry.assert_append_only(
p_metric_name text,
p_device_id text,
p_observed_at timestamptz
)
RETURNS timestamptz
LANGUAGE plpgsql
AS $$
DECLARE
v_last_observed_at timestamptz;
BEGIN
SELECT w.last_observed_at
INTO v_last_observed_at
FROM telemetry.metric_device_watermarks AS w
WHERE w.metric_name = p_metric_name
AND w.device_id = p_device_id;
IF FOUND AND p_observed_at <= v_last_observed_at THEN
RAISE EXCEPTION
'out-of-order measurement for metric % device %: incoming % <= last observed %',
p_metric_name,
p_device_id,
p_observed_at,
v_last_observed_at;
END IF;
RETURN v_last_observed_at;
END;
$$;
CREATE OR REPLACE FUNCTION telemetry.bump_watermark(
p_metric_name text,
p_device_id text,
p_observed_at timestamptz
)
RETURNS void
LANGUAGE sql
AS $$
INSERT INTO telemetry.metric_device_watermarks (
metric_name,
device_id,
last_observed_at
)
VALUES ($1, $2, $3)
ON CONFLICT ON CONSTRAINT metric_device_watermarks_pkey DO UPDATE
SET last_observed_at = EXCLUDED.last_observed_at;
$$;
CREATE OR REPLACE FUNCTION telemetry.ingest_segment(
p_table_name text,
p_metric_type telemetry.metric_type_enum,
p_comparison_mode telemetry.comparison_mode_enum,
p_policy_id bigint,
p_policy_valid_from timestamptz,
p_device_pk bigint,
p_device_id text,
p_value anyelement,
p_observed_at timestamptz,
p_last_observed_at timestamptz,
p_epsilon double precision,
p_max_sampling_interval interval,
p_allow_null boolean
)
RETURNS text
LANGUAGE plpgsql
AS $$
DECLARE
v_current record;
v_has_current boolean;
v_gap_start timestamptz;
v_rowcount bigint;
v_equal boolean;
v_policy_changed boolean;
v_effective_last_observed_at timestamptz := p_last_observed_at;
v_table_name text := telemetry.normalize_metric_table_name(p_table_name);
v_table_sql text := telemetry.metric_table_sql(p_table_name);
BEGIN
EXECUTE format(
'SELECT segment_id, device_pk, device_id, start_time, end_time, value, samples_count, policy_id
FROM %s
WHERE device_id = $1
AND end_time IS NULL
FOR UPDATE',
v_table_sql
)
INTO v_current
USING p_device_id;
GET DIAGNOSTICS v_rowcount = ROW_COUNT;
v_has_current := v_rowcount > 0;
IF p_value IS NULL AND NOT p_allow_null THEN
RAISE EXCEPTION 'metric table % does not allow explicit NULL measurements', v_table_name;
END IF;
IF NOT v_has_current THEN
EXECUTE format(
'INSERT INTO %s (
device_pk,
device_id,
start_time,
end_time,
value,
samples_count,
policy_id
)
VALUES ($1, $2, $3, NULL, $4, 1, $5)',
v_table_sql
)
USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id;
RETURN CASE WHEN p_value IS NULL THEN 'opened_null' ELSE 'opened' END;
END IF;
v_policy_changed := v_current.policy_id IS DISTINCT FROM p_policy_id;
IF v_policy_changed
AND p_policy_valid_from > v_current.start_time THEN
EXECUTE format(
'UPDATE %s
SET end_time = $1
WHERE segment_id = $2',
v_table_sql
)
USING p_policy_valid_from, v_current.segment_id;
EXECUTE format(
'INSERT INTO %s (
device_pk,
device_id,
start_time,
end_time,
value,
samples_count,
policy_id
)
-- samples_count = 0 marks a synthetic continuation across a policy boundary.
VALUES ($1, $2, $3, NULL, $4, 0, $5)
RETURNING segment_id, device_pk, device_id, start_time, end_time, value, samples_count, policy_id',
v_table_sql
)
INTO v_current
USING v_current.device_pk, v_current.device_id, p_policy_valid_from, v_current.value, p_policy_id;
v_policy_changed := false;
v_effective_last_observed_at := GREATEST(
COALESCE(p_last_observed_at, p_policy_valid_from),
p_policy_valid_from
);
END IF;
IF v_effective_last_observed_at IS NOT NULL
AND p_observed_at - v_effective_last_observed_at > p_max_sampling_interval
AND v_current.value IS NOT NULL THEN
v_gap_start := v_effective_last_observed_at + p_max_sampling_interval;
EXECUTE format(
'UPDATE %s
SET end_time = $1
WHERE segment_id = $2',
v_table_sql
)
USING v_gap_start, v_current.segment_id;
IF p_value IS NULL THEN
EXECUTE format(
'INSERT INTO %s (
device_pk,
device_id,
start_time,
end_time,
value,
samples_count,
policy_id
)
VALUES ($1, $2, $3, NULL, NULL, 1, $4)',
v_table_sql
)
USING p_device_pk, p_device_id, v_gap_start, p_policy_id;
RETURN 'gap_to_null';
END IF;
EXECUTE format(
'INSERT INTO %s (
device_pk,
device_id,
start_time,
end_time,
value,
samples_count,
policy_id
)
-- samples_count = 0 marks a historian-generated gap interval.
VALUES ($1, $2, $3, $4, NULL, 0, $5)',
v_table_sql
)
USING p_device_pk, p_device_id, v_gap_start, p_observed_at, p_policy_id;
EXECUTE format(
'INSERT INTO %s (
device_pk,
device_id,
start_time,
end_time,
value,
samples_count,
policy_id
)
VALUES ($1, $2, $3, NULL, $4, 1, $5)',
v_table_sql
)
USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id;
RETURN 'gap_split';
END IF;
IF v_current.samples_count = 0
AND p_observed_at = v_current.start_time THEN
IF p_value IS NULL THEN
EXECUTE format(
'UPDATE %s
SET value = NULL,
samples_count = 1
WHERE segment_id = $1',
v_table_sql
)
USING v_current.segment_id;
RETURN CASE
WHEN v_current.value IS NULL THEN 'extended_null'
ELSE 'value_to_null'
END;
END IF;
IF v_current.value IS NULL THEN
EXECUTE format(
'UPDATE %s
SET value = $1,
samples_count = 1
WHERE segment_id = $2',
v_table_sql
)
USING p_value, v_current.segment_id;
RETURN 'null_to_value';
END IF;
CASE p_comparison_mode
WHEN 'epsilon' THEN
EXECUTE
'SELECT abs($1::double precision - $2::double precision) <= $3'
INTO v_equal
USING v_current.value, p_value, p_epsilon;
WHEN 'exact' THEN
EXECUTE
'SELECT $1 IS NOT DISTINCT FROM $2'
INTO v_equal
USING v_current.value, p_value;
ELSE
RAISE EXCEPTION 'unsupported comparison_mode % for table %', p_comparison_mode, v_table_name;
END CASE;
EXECUTE format(
'UPDATE %s
SET value = $1,
samples_count = 1
WHERE segment_id = $2',
v_table_sql
)
USING p_value, v_current.segment_id;
RETURN CASE
WHEN v_equal THEN 'extended'
ELSE 'split'
END;
END IF;
IF v_current.value IS NULL THEN
IF p_value IS NULL THEN
IF v_policy_changed THEN
EXECUTE format(
'UPDATE %s
SET end_time = $1
WHERE segment_id = $2',
v_table_sql
)
USING p_observed_at, v_current.segment_id;
EXECUTE format(
'INSERT INTO %s (
device_pk,
device_id,
start_time,
end_time,
value,
samples_count,
policy_id
)
VALUES ($1, $2, $3, NULL, NULL, 1, $4)',
v_table_sql
)
USING p_device_pk, p_device_id, p_observed_at, p_policy_id;
RETURN 'split';
END IF;
EXECUTE format(
'UPDATE %s
SET samples_count = samples_count + 1
WHERE segment_id = $1',
v_table_sql
)
USING v_current.segment_id;
RETURN 'extended_null';
END IF;
EXECUTE format(
'UPDATE %s
SET end_time = $1
WHERE segment_id = $2',
v_table_sql
)
USING p_observed_at, v_current.segment_id;
EXECUTE format(
'INSERT INTO %s (
device_pk,
device_id,
start_time,
end_time,
value,
samples_count,
policy_id
)
VALUES ($1, $2, $3, NULL, $4, 1, $5)',
v_table_sql
)
USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id;
RETURN 'null_to_value';
END IF;
IF p_value IS NULL THEN
EXECUTE format(
'UPDATE %s
SET end_time = $1
WHERE segment_id = $2',
v_table_sql
)
USING p_observed_at, v_current.segment_id;
EXECUTE format(
'INSERT INTO %s (
device_pk,
device_id,
start_time,
end_time,
value,
samples_count,
policy_id
)
VALUES ($1, $2, $3, NULL, NULL, 1, $4)',
v_table_sql
)
USING p_device_pk, p_device_id, p_observed_at, p_policy_id;
RETURN 'value_to_null';
END IF;
CASE p_comparison_mode
WHEN 'epsilon' THEN
EXECUTE
'SELECT abs($1::double precision - $2::double precision) <= $3'
INTO v_equal
USING v_current.value, p_value, p_epsilon;
WHEN 'exact' THEN
EXECUTE
'SELECT $1 IS NOT DISTINCT FROM $2'
INTO v_equal
USING v_current.value, p_value;
ELSE
RAISE EXCEPTION 'unsupported comparison_mode % for table %', p_comparison_mode, v_table_name;
END CASE;
IF v_equal AND NOT v_policy_changed THEN
EXECUTE format(
'UPDATE %s
SET samples_count = samples_count + 1
WHERE segment_id = $1',
v_table_sql
)
USING v_current.segment_id;
RETURN 'extended';
END IF;
EXECUTE format(
'UPDATE %s
SET end_time = $1
WHERE segment_id = $2',
v_table_sql
)
USING p_observed_at, v_current.segment_id;
EXECUTE format(
'INSERT INTO %s (
device_pk,
device_id,
start_time,
end_time,
value,
samples_count,
policy_id
)
VALUES ($1, $2, $3, NULL, $4, 1, $5)',
v_table_sql
)
USING p_device_pk, p_device_id, p_observed_at, p_value, p_policy_id;
RETURN 'split';
END;
$$;
CREATE OR REPLACE FUNCTION telemetry.ingest_measurement(
p_metric_name text,
p_device_id text,
p_value double precision,
p_observed_at timestamptz
)
RETURNS TABLE (
metric_name text,
device_id text,
table_name text,
normalized_value double precision,
action text
)
LANGUAGE plpgsql
AS $$
DECLARE
v_metric telemetry.metrics%ROWTYPE;
v_policy telemetry.metric_policies%ROWTYPE;
v_device telemetry.devices%ROWTYPE;
v_last_observed_at timestamptz;
v_normalized double precision;
v_action text;
v_metric_lock_key integer;
v_device_lock_key integer;
BEGIN
IF p_metric_name IS NULL OR btrim(p_metric_name) = '' THEN
RAISE EXCEPTION 'metric_name is required';
END IF;
IF p_device_id IS NULL OR btrim(p_device_id) = '' THEN
RAISE EXCEPTION 'device_id is required';
END IF;
IF p_observed_at IS NULL THEN
RAISE EXCEPTION 'observed_at is required';
END IF;
v_metric := telemetry.require_metric(p_metric_name);
PERFORM telemetry.ensure_device(p_device_id);
v_device := telemetry.require_device(p_device_id);
-- 'state' is reserved for future implementation of discrete enumerated
-- machine states, likely stored as smallint. It will use the same segment
-- logic as boolean metrics, but with exact comparison instead of epsilon.
CASE v_metric.metric_type
WHEN 'numeric' THEN
NULL;
WHEN 'boolean' THEN
RAISE EXCEPTION 'metric % is boolean; use the boolean overload of ingest_measurement(...)', p_metric_name;
WHEN 'state' THEN
RAISE EXCEPTION 'metric % is state; state ingestion is not implemented yet', p_metric_name;
ELSE
RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
END CASE;
v_metric_lock_key := telemetry.lock_key_from_text(p_metric_name);
v_device_lock_key := telemetry.lock_key_from_text(p_device_id);
PERFORM pg_advisory_xact_lock(v_metric_lock_key, v_device_lock_key);
v_last_observed_at := telemetry.assert_append_only(p_metric_name, p_device_id, p_observed_at);
v_policy := telemetry.require_metric_policy(p_metric_name, p_observed_at);
IF p_value IS NULL THEN
v_normalized := NULL;
ELSE
v_normalized := telemetry.round_to_increment(p_value, v_policy.rounding_precision);
IF v_policy.min_value IS NOT NULL AND v_normalized < v_policy.min_value THEN
RAISE EXCEPTION 'value % is below min_value % for metric %',
v_normalized, v_policy.min_value, p_metric_name;
END IF;
IF v_policy.max_value IS NOT NULL AND v_normalized > v_policy.max_value THEN
RAISE EXCEPTION 'value % is above max_value % for metric %',
v_normalized, v_policy.max_value, p_metric_name;
END IF;
END IF;
v_action := telemetry.ingest_segment(
v_metric.table_name,
v_metric.metric_type,
v_policy.comparison_mode,
v_policy.policy_id,
v_policy.valid_from,
v_device.device_pk,
p_device_id,
v_normalized,
p_observed_at,
v_last_observed_at,
v_policy.epsilon,
v_policy.max_sampling_interval,
v_policy.allow_null
);
PERFORM telemetry.bump_watermark(p_metric_name, p_device_id, p_observed_at);
PERFORM telemetry.touch_device_last_seen(p_device_id, p_observed_at);
RETURN QUERY
SELECT
p_metric_name,
p_device_id,
v_metric.table_name,
v_normalized,
v_action;
END;
$$;
CREATE OR REPLACE FUNCTION telemetry.ingest_measurement(
p_metric_name text,
p_device_id text,
p_value boolean,
p_observed_at timestamptz
)
RETURNS TABLE (
metric_name text,
device_id text,
table_name text,
normalized_value boolean,
action text
)
LANGUAGE plpgsql
AS $$
DECLARE
v_metric telemetry.metrics%ROWTYPE;
v_policy telemetry.metric_policies%ROWTYPE;
v_device telemetry.devices%ROWTYPE;
v_last_observed_at timestamptz;
v_action text;
v_metric_lock_key integer;
v_device_lock_key integer;
BEGIN
IF p_metric_name IS NULL OR btrim(p_metric_name) = '' THEN
RAISE EXCEPTION 'metric_name is required';
END IF;
IF p_device_id IS NULL OR btrim(p_device_id) = '' THEN
RAISE EXCEPTION 'device_id is required';
END IF;
IF p_observed_at IS NULL THEN
RAISE EXCEPTION 'observed_at is required';
END IF;
v_metric := telemetry.require_metric(p_metric_name);
PERFORM telemetry.ensure_device(p_device_id);
v_device := telemetry.require_device(p_device_id);
-- 'state' is reserved for future implementation of discrete enumerated
-- machine states, likely stored as smallint. It will use the same segment
-- logic as boolean metrics, but with exact comparison instead of epsilon.
CASE v_metric.metric_type
WHEN 'boolean' THEN
NULL;
WHEN 'numeric' THEN
RAISE EXCEPTION 'metric % is numeric; use the numeric overload of ingest_measurement(...)', p_metric_name;
WHEN 'state' THEN
RAISE EXCEPTION 'metric % is state; state ingestion is not implemented yet', p_metric_name;
ELSE
RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
END CASE;
v_metric_lock_key := telemetry.lock_key_from_text(p_metric_name);
v_device_lock_key := telemetry.lock_key_from_text(p_device_id);
PERFORM pg_advisory_xact_lock(v_metric_lock_key, v_device_lock_key);
v_last_observed_at := telemetry.assert_append_only(p_metric_name, p_device_id, p_observed_at);
v_policy := telemetry.require_metric_policy(p_metric_name, p_observed_at);
v_action := telemetry.ingest_segment(
v_metric.table_name,
v_metric.metric_type,
v_policy.comparison_mode,
v_policy.policy_id,
v_policy.valid_from,
v_device.device_pk,
p_device_id,
p_value,
p_observed_at,
v_last_observed_at,
v_policy.epsilon,
v_policy.max_sampling_interval,
v_policy.allow_null
);
PERFORM telemetry.bump_watermark(p_metric_name, p_device_id, p_observed_at);
PERFORM telemetry.touch_device_last_seen(p_device_id, p_observed_at);
RETURN QUERY
SELECT
p_metric_name,
p_device_id,
v_metric.table_name,
p_value,
v_action;
END;
$$;
-- Query functions.
CREATE OR REPLACE FUNCTION telemetry.value_at_from_table(
p_table_name text,
p_metric_name text,
p_device_id text,
p_at timestamptz
)
RETURNS jsonb
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
v_value jsonb;
v_table_sql text := telemetry.metric_table_sql(p_table_name);
BEGIN
EXECUTE format(
'WITH watermark AS (
SELECT w.last_observed_at
FROM telemetry.metric_device_watermarks AS w
WHERE w.metric_name = $1
AND w.device_id = $2
)
SELECT to_jsonb(s.value)
FROM %s AS s
JOIN telemetry.metric_policies AS mp
ON mp.policy_id = s.policy_id
LEFT JOIN watermark AS w ON true
WHERE s.device_id = $2
AND tstzrange(
s.start_time,
CASE
WHEN s.end_time IS NOT NULL THEN s.end_time
WHEN s.value IS NULL THEN ''infinity''::timestamptz
ELSE COALESCE(
GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
+ mp.max_sampling_interval,
''infinity''::timestamptz
)
END,
''[)''
) @> $3
LIMIT 1',
v_table_sql
)
INTO v_value
USING p_metric_name, p_device_id, p_at;
RETURN v_value;
END;
$$;
CREATE OR REPLACE FUNCTION telemetry.value_at(
p_metric_name text,
p_device_id text,
p_at timestamptz
)
RETURNS double precision
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
v_metric telemetry.metrics%ROWTYPE;
v_value jsonb;
BEGIN
v_metric := telemetry.require_metric(p_metric_name);
-- 'state' is reserved for future implementation of discrete enumerated
-- machine states, likely stored as smallint. It will use the same segment
-- logic as boolean metrics, but with exact comparison instead of epsilon.
CASE v_metric.metric_type
WHEN 'numeric' THEN
v_value := telemetry.value_at_from_table(
v_metric.table_name,
p_metric_name,
p_device_id,
p_at
);
RETURN CASE
WHEN v_value IS NULL THEN NULL
-- PostgreSQL must cast jsonb scalars via text first; direct
-- casts such as (v_value)::double precision are not supported.
ELSE v_value::text::double precision
END;
WHEN 'boolean' THEN
RAISE EXCEPTION 'metric % is boolean; use telemetry.boolean_value_at(...)', p_metric_name;
WHEN 'state' THEN
RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
ELSE
RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
END CASE;
END;
$$;
CREATE OR REPLACE FUNCTION telemetry.boolean_value_at(
p_metric_name text,
p_device_id text,
p_at timestamptz
)
RETURNS boolean
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
v_metric telemetry.metrics%ROWTYPE;
v_value jsonb;
BEGIN
v_metric := telemetry.require_metric(p_metric_name);
-- 'state' is reserved for future implementation of discrete enumerated
-- machine states, likely stored as smallint. It will use the same segment
-- logic as boolean metrics, but with exact comparison instead of epsilon.
CASE v_metric.metric_type
WHEN 'boolean' THEN
v_value := telemetry.value_at_from_table(
v_metric.table_name,
p_metric_name,
p_device_id,
p_at
);
RETURN CASE
WHEN v_value IS NULL THEN NULL
-- PostgreSQL must cast jsonb scalars via text first; direct
-- casts such as (v_value)::boolean are not supported.
ELSE v_value::text::boolean
END;
WHEN 'numeric' THEN
RAISE EXCEPTION 'metric % is numeric; use telemetry.value_at(...)', p_metric_name;
WHEN 'state' THEN
RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
ELSE
RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
END CASE;
END;
$$;
CREATE OR REPLACE FUNCTION telemetry.segments_between_from_table(
p_table_name text,
p_metric_name text,
p_device_id text,
p_from timestamptz,
p_to timestamptz
)
RETURNS TABLE (
device_id text,
start_time timestamptz,
end_time timestamptz,
value_json jsonb,
samples_count integer
)
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
v_table_sql text := telemetry.metric_table_sql(p_table_name);
BEGIN
RETURN QUERY EXECUTE format(
'WITH watermark AS (
SELECT w.last_observed_at
FROM telemetry.metric_device_watermarks AS w
WHERE w.metric_name = $1
AND w.device_id = $2
),
open_segment AS (
SELECT s.*
FROM %s AS s
WHERE s.device_id = $2
AND s.end_time IS NULL
LIMIT 1
),
stored_segments AS (
SELECT
s.device_id,
GREATEST(s.start_time, $3) AS start_time,
LEAST(
CASE
WHEN s.end_time IS NOT NULL THEN s.end_time
WHEN s.value IS NULL THEN $4
ELSE LEAST(
COALESCE(
GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
+ mp.max_sampling_interval,
$4
),
$4
)
END,
$4
) AS end_time,
to_jsonb(s.value) AS value_json,
s.samples_count
FROM %s AS s
JOIN telemetry.metric_policies AS mp
ON mp.policy_id = s.policy_id
LEFT JOIN watermark AS w ON true
WHERE s.device_id = $2
AND tstzrange(
s.start_time,
CASE
WHEN s.end_time IS NOT NULL THEN s.end_time
WHEN s.value IS NULL THEN ''infinity''::timestamptz
ELSE COALESCE(
GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
+ mp.max_sampling_interval,
''infinity''::timestamptz
)
END,
''[)''
) && tstzrange($3, $4, ''[)'')
),
synthetic_tail AS (
SELECT
$2::text AS device_id,
GREATEST(
GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
+ mp.max_sampling_interval,
$3
) AS start_time,
$4 AS end_time,
NULL::jsonb AS value_json,
0 AS samples_count
FROM open_segment AS s
JOIN telemetry.metric_policies AS mp
ON mp.policy_id = s.policy_id
JOIN watermark AS w ON true
WHERE s.value IS NOT NULL
AND GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
+ mp.max_sampling_interval < $4
)
SELECT *
FROM (
SELECT *
FROM stored_segments
WHERE end_time > start_time
UNION ALL
SELECT *
FROM synthetic_tail
WHERE end_time > start_time
) AS combined_segments
ORDER BY start_time',
v_table_sql,
v_table_sql
)
USING p_metric_name, p_device_id, p_from, p_to;
END;
$$;
CREATE OR REPLACE FUNCTION telemetry.metric_segments(
p_metric_name text,
p_device_id text,
p_from timestamptz,
p_to timestamptz
)
RETURNS TABLE (
device_id text,
start_time timestamptz,
end_time timestamptz,
value double precision,
samples_count integer
)
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
v_metric telemetry.metrics%ROWTYPE;
BEGIN
IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN
RAISE EXCEPTION 'p_from and p_to must define a non-empty interval';
END IF;
v_metric := telemetry.require_metric(p_metric_name);
-- 'state' is reserved for future implementation of discrete enumerated
-- machine states, likely stored as smallint. It will use the same segment
-- logic as boolean metrics, but with exact comparison instead of epsilon.
CASE v_metric.metric_type
WHEN 'numeric' THEN
RETURN QUERY
SELECT
s.device_id,
s.start_time,
s.end_time,
CASE
WHEN s.value_json IS NULL THEN NULL
-- PostgreSQL must cast jsonb scalars via text first;
-- direct casts such as (s.value_json)::double precision
-- are not supported.
ELSE s.value_json::text::double precision
END AS value,
s.samples_count
FROM telemetry.segments_between_from_table(
v_metric.table_name,
p_metric_name,
p_device_id,
p_from,
p_to
) AS s;
WHEN 'boolean' THEN
RAISE EXCEPTION 'metric % is boolean; use telemetry.boolean_segments_between(...)', p_metric_name;
WHEN 'state' THEN
RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
ELSE
RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
END CASE;
END;
$$;
CREATE OR REPLACE FUNCTION telemetry.boolean_segments_between(
p_metric_name text,
p_device_id text,
p_from timestamptz,
p_to timestamptz
)
RETURNS TABLE (
device_id text,
start_time timestamptz,
end_time timestamptz,
value boolean,
samples_count integer
)
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
v_metric telemetry.metrics%ROWTYPE;
BEGIN
IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN
RAISE EXCEPTION 'p_from and p_to must define a non-empty interval';
END IF;
v_metric := telemetry.require_metric(p_metric_name);
-- 'state' is reserved for future implementation of discrete enumerated
-- machine states, likely stored as smallint. It will use the same segment
-- logic as boolean metrics, but with exact comparison instead of epsilon.
CASE v_metric.metric_type
WHEN 'boolean' THEN
RETURN QUERY
SELECT
s.device_id,
s.start_time,
s.end_time,
CASE
WHEN s.value_json IS NULL THEN NULL
-- PostgreSQL must cast jsonb scalars via text first;
-- direct casts such as (s.value_json)::boolean are not
-- supported.
ELSE s.value_json::text::boolean
END AS value,
s.samples_count
FROM telemetry.segments_between_from_table(
v_metric.table_name,
p_metric_name,
p_device_id,
p_from,
p_to
) AS s;
WHEN 'numeric' THEN
RAISE EXCEPTION 'metric % is numeric; use telemetry.metric_segments(...)', p_metric_name;
WHEN 'state' THEN
RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
ELSE
RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
END CASE;
END;
$$;
CREATE OR REPLACE FUNCTION telemetry.samples_from_table(
p_table_name text,
p_metric_name text,
p_device_id text,
p_from timestamptz,
p_to timestamptz,
p_points integer
)
RETURNS TABLE (
sample_at timestamptz,
value_json jsonb
)
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
v_table_sql text := telemetry.metric_table_sql(p_table_name);
BEGIN
RETURN QUERY EXECUTE format(
'WITH watermark AS (
SELECT w.last_observed_at
FROM telemetry.metric_device_watermarks AS w
WHERE w.metric_name = $1
AND w.device_id = $2
),
sample_points AS (
SELECT $3 + (($4 - $3) * gs.i::double precision / ($5 - 1)::double precision) AS sample_at
FROM generate_series(0, $5 - 1) AS gs(i)
)
SELECT
sp.sample_at,
to_jsonb(seg.value) AS value_json
FROM sample_points AS sp
LEFT JOIN watermark AS w ON true
LEFT JOIN LATERAL (
SELECT s.value
FROM %s AS s
JOIN telemetry.metric_policies AS mp
ON mp.policy_id = s.policy_id
WHERE s.device_id = $2
AND tstzrange(
s.start_time,
CASE
WHEN s.end_time IS NOT NULL THEN s.end_time
WHEN s.value IS NULL THEN ''infinity''::timestamptz
ELSE COALESCE(
GREATEST(COALESCE(w.last_observed_at, s.start_time), s.start_time)
+ mp.max_sampling_interval,
''infinity''::timestamptz
)
END,
''[)''
) @> sp.sample_at
LIMIT 1
) AS seg ON true
ORDER BY sp.sample_at',
v_table_sql
)
USING p_metric_name, p_device_id, p_from, p_to, p_points;
END;
$$;
CREATE OR REPLACE FUNCTION telemetry.sample_metric(
p_metric_name text,
p_device_id text,
p_from timestamptz,
p_to timestamptz,
p_points integer
)
RETURNS TABLE (
sample_at timestamptz,
value double precision
)
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
v_metric telemetry.metrics%ROWTYPE;
BEGIN
IF p_points IS NULL OR p_points < 2 THEN
RAISE EXCEPTION 'p_points must be at least 2';
END IF;
IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN
RAISE EXCEPTION 'p_from and p_to must define a non-empty interval';
END IF;
v_metric := telemetry.require_metric(p_metric_name);
-- 'state' is reserved for future implementation of discrete enumerated
-- machine states, likely stored as smallint. It will use the same segment
-- logic as boolean metrics, but with exact comparison instead of epsilon.
CASE v_metric.metric_type
WHEN 'numeric' THEN
RETURN QUERY
SELECT
s.sample_at,
CASE
WHEN s.value_json IS NULL THEN NULL
-- PostgreSQL must cast jsonb scalars via text first;
-- direct casts such as (s.value_json)::double precision
-- are not supported.
ELSE s.value_json::text::double precision
END AS value
FROM telemetry.samples_from_table(
v_metric.table_name,
p_metric_name,
p_device_id,
p_from,
p_to,
p_points
) AS s;
WHEN 'boolean' THEN
RAISE EXCEPTION 'metric % is boolean; use telemetry.boolean_samples(...)', p_metric_name;
WHEN 'state' THEN
RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
ELSE
RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
END CASE;
END;
$$;
CREATE OR REPLACE FUNCTION telemetry.boolean_samples(
p_metric_name text,
p_device_id text,
p_from timestamptz,
p_to timestamptz,
p_points integer
)
RETURNS TABLE (
sample_at timestamptz,
value boolean
)
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
v_metric telemetry.metrics%ROWTYPE;
BEGIN
IF p_points IS NULL OR p_points < 2 THEN
RAISE EXCEPTION 'p_points must be at least 2';
END IF;
IF p_from IS NULL OR p_to IS NULL OR p_to <= p_from THEN
RAISE EXCEPTION 'p_from and p_to must define a non-empty interval';
END IF;
v_metric := telemetry.require_metric(p_metric_name);
-- 'state' is reserved for future implementation of discrete enumerated
-- machine states, likely stored as smallint. It will use the same segment
-- logic as boolean metrics, but with exact comparison instead of epsilon.
CASE v_metric.metric_type
WHEN 'boolean' THEN
RETURN QUERY
SELECT
s.sample_at,
CASE
WHEN s.value_json IS NULL THEN NULL
-- PostgreSQL must cast jsonb scalars via text first;
-- direct casts such as (s.value_json)::boolean are not
-- supported.
ELSE s.value_json::text::boolean
END AS value
FROM telemetry.samples_from_table(
v_metric.table_name,
p_metric_name,
p_device_id,
p_from,
p_to,
p_points
) AS s;
WHEN 'numeric' THEN
RAISE EXCEPTION 'metric % is numeric; use telemetry.sample_metric(...)', p_metric_name;
WHEN 'state' THEN
RAISE EXCEPTION 'metric % is state; state query wrappers are not implemented yet', p_metric_name;
ELSE
RAISE EXCEPTION 'unsupported metric_type % for metric %', v_metric.metric_type, p_metric_name;
END CASE;
END;
$$;
-- Maintenance utilities.
CREATE OR REPLACE FUNCTION telemetry.verify_segments(p_metric_name text)
RETURNS TABLE (
metric_name text,
device_id text,
issue text,
segment_id bigint,
related_segment_id bigint,
start_time timestamptz,
end_time timestamptz,
details text
)
LANGUAGE plpgsql
STABLE
AS $$
DECLARE
v_metric telemetry.metrics%ROWTYPE;
v_table_sql text;
BEGIN
v_metric := telemetry.require_metric(p_metric_name);
v_table_sql := telemetry.metric_table_sql(v_metric.table_name);
RETURN QUERY EXECUTE format(
'WITH ordered AS (
SELECT
s.segment_id,
s.device_id,
s.policy_id,
s.start_time,
s.end_time,
lag(s.segment_id) OVER (
PARTITION BY s.device_id
ORDER BY s.start_time, s.segment_id
) AS prev_segment_id,
lag(s.end_time) OVER (
PARTITION BY s.device_id
ORDER BY s.start_time, s.segment_id
) AS prev_end_time,
lag(s.policy_id) OVER (
PARTITION BY s.device_id
ORDER BY s.start_time, s.segment_id
) AS prev_policy_id
FROM %s AS s
),
open_counts AS (
SELECT
s.device_id,
count(*)::integer AS open_count
FROM %s AS s
WHERE s.end_time IS NULL
GROUP BY s.device_id
HAVING count(*) > 1
)
SELECT
$1::text AS metric_name,
o.device_id,
''invalid_interval''::text AS issue,
o.segment_id,
NULL::bigint AS related_segment_id,
o.start_time,
o.end_time,
''end_time must be greater than start_time''::text AS details
FROM ordered AS o
WHERE o.end_time IS NOT NULL
AND o.end_time <= o.start_time
UNION ALL
SELECT
$1::text,
oc.device_id,
''multiple_open_segments''::text,
NULL::bigint,
NULL::bigint,
NULL::timestamptz,
NULL::timestamptz,
format(''%%s open segments found'', oc.open_count)
FROM open_counts AS oc
UNION ALL
SELECT
$1::text,
o.device_id,
''overlap''::text,
o.segment_id,
o.prev_segment_id,
o.start_time,
o.end_time,
format(''previous segment ends at %%s'', o.prev_end_time)
FROM ordered AS o
WHERE o.prev_end_time IS NOT NULL
AND o.prev_end_time > o.start_time
UNION ALL
SELECT
$1::text,
o.device_id,
''unexpected_gap''::text,
o.segment_id,
o.prev_segment_id,
o.start_time,
o.end_time,
format(
''stored gap of %%s violates expected continuity (max_sampling_interval=%%s)'',
o.start_time - o.prev_end_time,
mp.max_sampling_interval
)
FROM ordered AS o
JOIN telemetry.metric_policies AS mp
ON mp.policy_id = o.prev_policy_id
WHERE o.prev_end_time IS NOT NULL
AND o.start_time > o.prev_end_time
ORDER BY device_id, start_time NULLS FIRST, segment_id NULLS FIRST',
v_table_sql,
v_table_sql
)
USING p_metric_name;
END;
$$;
CREATE OR REPLACE FUNCTION telemetry.compact_segments(
p_metric_name text,
p_before timestamptz
)
RETURNS TABLE (
device_id text,
kept_segment_id bigint,
merged_segments integer,
start_time timestamptz,
end_time timestamptz,
total_samples integer
)
LANGUAGE plpgsql
AS $$
DECLARE
v_metric telemetry.metrics%ROWTYPE;
v_table_sql text;
v_merge record;
BEGIN
IF p_before IS NULL THEN
RAISE EXCEPTION 'p_before is required';
END IF;
v_metric := telemetry.require_metric(p_metric_name);
v_table_sql := telemetry.metric_table_sql(v_metric.table_name);
FOR v_merge IN EXECUTE format(
'WITH candidates AS (
SELECT
s.segment_id,
s.device_id,
s.start_time,
s.end_time,
s.samples_count,
s.policy_id,
s.value,
lag(s.end_time) OVER (
PARTITION BY s.device_id
ORDER BY s.start_time, s.segment_id
) AS prev_end_time,
lag(s.policy_id) OVER (
PARTITION BY s.device_id
ORDER BY s.start_time, s.segment_id
) AS prev_policy_id,
lag(s.value) OVER (
PARTITION BY s.device_id
ORDER BY s.start_time, s.segment_id
) AS prev_value
FROM %s AS s
WHERE s.end_time IS NOT NULL
AND s.end_time <= $1
),
grouped AS (
SELECT
c.*,
sum(
CASE
WHEN c.prev_end_time IS NULL
OR c.start_time <> c.prev_end_time
OR c.policy_id IS DISTINCT FROM c.prev_policy_id
OR c.value IS DISTINCT FROM c.prev_value
THEN 1
ELSE 0
END
) OVER (
PARTITION BY c.device_id
ORDER BY c.start_time, c.segment_id
) AS grp
FROM candidates AS c
),
aggregates AS (
SELECT
g.device_id,
(array_agg(g.segment_id ORDER BY g.start_time, g.segment_id))[1] AS keep_segment_id,
array_remove(
array_agg(g.segment_id ORDER BY g.start_time, g.segment_id),
(array_agg(g.segment_id ORDER BY g.start_time, g.segment_id))[1]
) AS delete_segment_ids,
min(g.start_time) AS merged_start_time,
max(g.end_time) AS merged_end_time,
sum(g.samples_count)::integer AS total_samples,
count(*)::integer AS merged_segments
FROM grouped AS g
GROUP BY g.device_id, g.grp
HAVING count(*) > 1
)
SELECT
a.device_id,
a.keep_segment_id,
a.delete_segment_ids,
a.merged_segments,
a.merged_start_time,
a.merged_end_time,
a.total_samples
FROM aggregates AS a
ORDER BY a.device_id, a.merged_start_time',
v_table_sql
)
USING p_before
LOOP
EXECUTE format(
'DELETE FROM %s
WHERE segment_id = ANY($1)',
v_table_sql
)
USING v_merge.delete_segment_ids;
EXECUTE format(
'UPDATE %s
SET end_time = $1,
samples_count = $2
WHERE segment_id = $3',
v_table_sql
)
USING v_merge.merged_end_time, v_merge.total_samples, v_merge.keep_segment_id;
device_id := v_merge.device_id;
kept_segment_id := v_merge.keep_segment_id;
merged_segments := v_merge.merged_segments;
start_time := v_merge.merged_start_time;
end_time := v_merge.merged_end_time;
total_samples := v_merge.total_samples;
RETURN NEXT;
END LOOP;
RETURN;
END;
$$;
CREATE OR REPLACE FUNCTION telemetry.inactive_devices(p_threshold interval)
RETURNS TABLE (
device_pk bigint,
device_id text,
device_type text,
location text,
last_seen timestamptz,
inactive_for interval
)
LANGUAGE sql
STABLE
AS $$
SELECT
d.device_pk,
d.device_id,
d.device_type,
d.location,
d.last_seen,
now() - d.last_seen AS inactive_for
FROM telemetry.devices AS d
WHERE d.last_seen IS NOT NULL
AND now() - d.last_seen > $1
ORDER BY inactive_for DESC, d.device_id;
$$;
-- Example metric registrations.
SELECT telemetry.register_numeric_metric(
p_metric_name => 'ambient_temperature',
p_table_name => 'ambient_temperature_segments',
p_domain_name => 'environmental',
p_epsilon => 0.05,
p_min_value => -40,
p_max_value => 60,
p_rounding_precision => 0.1,
p_max_sampling_interval => '5 minutes',
p_allow_null => true
);
SELECT telemetry.register_numeric_metric(
p_metric_name => 'cpu_temperature',
p_table_name => 'cpu_temperature_segments',
p_domain_name => 'system',
p_epsilon => 0.5,
p_min_value => 0,
p_max_value => 120,
p_rounding_precision => 1,
p_max_sampling_interval => '10 seconds',
p_allow_null => true
);
SELECT telemetry.register_numeric_metric(
p_metric_name => 'humidity',
p_table_name => 'humidity_segments',
p_domain_name => 'environmental',
p_epsilon => 0.5,
p_min_value => 0,
p_max_value => 100,
p_rounding_precision => 0.1,
p_max_sampling_interval => '5 minutes',
p_allow_null => true
);
COMMIT;