This document describes how an ingestion worker should write measurements into the PostgreSQL telemetry historian.
It documents the database API as implemented today. It does not redefine the schema or the historian model.
This document covers only the currently implemented measurement API. Counter metrics are tracked separately in counter_ingestion_api.md and energy_counter_draft.md.
MQTT is used throughout this document only as an example source transport because it is a common worker implementation target. The PostgreSQL API itself is source-agnostic: the same ingestion flow applies to any worker that can map incoming data to metric_name, device_id, value, and observed_at.
One common ingestion path is:
Source devices / upstream bus
-> ingestion worker
-> telemetry.ingest_measurement(...)
-> metric segment tables
More generally, the source worker is responsible for:
metric_name, device_id, value, and observed_atThe database is responsible for historian behavior:
NULLtelemetry.metric_policiesFor non-MQTT transports, replace topic subscription/parsing with the equivalent source-specific decode step. The database contract stays the same.
Workers should treat PostgreSQL as the source of truth for consolidation logic. They should send measurements in order and let the database decide whether a segment is extended, split, or closed.
If the source transport is MQTT, a common topic shape is:
/$bus/$metric/$domain/$sensor
Examples:
/homebus/temperature/bedroom/sensor1
/homebus/humidity/bedroom/sensor1
/homebus/rssi/network/sensor1
Example worker mapping:
topic: /homebus/temperature/bedroom/sensor1
metric_name = "temperature"
device_id = "bedroom.sensor1"
Another example:
topic: /homebus/rssi/network/sensor1
metric_name = "rssi"
device_id = "network.sensor1"
Important:
If your deployment is not using MQTT, apply the same idea to whatever source envelope you receive. The database only needs the final mapped identifiers.
If your deployment needs aliases, normalization, or routing rules, implement them in the worker before calling PostgreSQL.
Two typed overloads exist.
Numeric metrics:
SELECT *
FROM telemetry.ingest_measurement(
p_metric_name => $1,
p_device_id => $2,
p_value => $3::double precision,
p_observed_at => $4::timestamptz
);
Boolean metrics:
SELECT *
FROM telemetry.ingest_measurement(
p_metric_name => $1,
p_device_id => $2,
p_value => $3::boolean,
p_observed_at => $4::timestamptz
);
Parameter semantics:
p_metric_name: application metric identifier registered in telemetry.metricsp_device_id: logical device identifier chosen by the workerp_value: typed measurement value, or NULL for “measurement unavailable”p_observed_at: time the device observed the measurementImportant implementation note:
ingest_measurement is overloaded, NULL values must be typed explicitlyNULL::double precision for numeric unknown valuesNULL::boolean for boolean unknown valuesExample:
SELECT *
FROM telemetry.ingest_measurement(
'temperature',
'bedroom.sensor1',
NULL::double precision,
'2026-03-08T10:00:00Z'::timestamptz
);
Each call returns one row:
metric_namedevice_idtable_namenormalized_valueactionnormalized_value is the value actually used by the historian after normalization:
NULL: explicit unknown valueaction is returned by the internal segment engine and is useful for logs, metrics, and debugging.
Actions:
opened: no open segment existed; a new non-NULL segment was createdopened_null: no open segment existed; a new NULL segment was createdextended: the current non-NULL segment was kept open and samples_count increasedextended_null: the current NULL segment was kept open and samples_count increasedsplit: the previous segment was closed and a new segment was openednull_to_value: a NULL segment was closed and replaced by a value segmentvalue_to_null: a value segment was closed and replaced by a NULL segmentgap_split: a late gap was detected; the previous value segment was closed at last_observed_at + max_sampling_interval, a synthetic NULL gap segment was inserted, then a new value segment was openedgap_to_null: a late gap was detected and the next state is unknown; the previous value segment was closed at last_observed_at + max_sampling_interval, then a new open NULL segment was openedWorkers usually do not branch on action. Treat it as observability data.
Recommended worker logging:
normalized_valueactionThe worker should not re-implement historian logic already handled in PostgreSQL.
Automatic device provisioning:
ensure_device()device_id does not exist in telemetry.devices, it is created automaticallyAppend-only enforcement:
assert_append_only()observed_at <= last_observed_at for the same (metric_name, device_id)Policy lookup:
require_metric_policy()telemetry.metric_policies using valid_from <= observed_atGap detection:
Segment splitting:
NULL, policy changes, and gap boundaries are converted into segment transitions automaticallyPolicy boundary splitting:
policy.valid_fromTail NULL simulation:
NULL segment immediatelylast_observed_at + max_sampling_intervalWorkers should treat database errors as either permanent message errors or transient infrastructure errors, regardless of whether the source transport is MQTT, HTTP, serial, or another upstream feed.
Common permanent errors:
ERROR: unknown metric: temperaturemetric_name is not registeredERROR: out-of-order measurement for metric ...ERROR: value ... is below min_value ...ERROR: value ... is above max_value ...ERROR: metric ... is boolean; use the boolean overload of ingest_measurement(...)ERROR: metric ... is numeric; use the numeric overload of ingest_measurement(...)NULL
ERROR: metric table ... does not allow explicit NULL measurementsallow_null = falseRecommended behavior for permanent message errors:
Recommended dead-letter payload:
Recommended behavior for transient failures:
If a retry happens after an ambiguous network failure, the worker may see an out-of-order error if the first attempt actually committed. Handle that case as an operational duplicate and log it accordingly.
The ingestion API assumes chronological ordering per (metric_name, device_id).
This is enforced by:
telemetry.metric_device_watermarksConsequences for workers:
This API is append-only, not last-write-wins.
Same-timestamp replay is not accepted:
observed_at = last_observed_at is rejectedobserved_at < last_observed_at is rejectedImportant limitation of the current measurement API:
ingest_measurement(...) does not currently accept an explicit idempotency_keyobserved_at is the time the device measured the value.
It is not:
Preferred worker behavior:
Historian correctness depends on observed_at being as close as possible to the real device observation time.
Example flow using MQTT as one source transport:
/homebus/temperature/bedroom/sensor1metric_name = 'temperature'device_id = 'bedroom.sensor1'telemetry.ingest_measurement(...)action and normalized_valueExample numeric SQL call:
SELECT *
FROM telemetry.ingest_measurement(
'temperature',
'bedroom.sensor1',
22.97::double precision,
'2026-03-08T10:15:12Z'::timestamptz
);
Example boolean SQL call:
SELECT *
FROM telemetry.ingest_measurement(
'motion_detected',
'hallway.sensor1',
true,
'2026-03-08T10:15:12Z'::timestamptz
);
Example pseudo-code:
on_message(topic, payload):
metric_name = map_topic_to_metric(topic)
device_id = map_topic_to_device(topic)
observed_at = extract_device_timestamp(payload)
value = parse_payload_value(payload, metric_name)
row = db.query_one(
"SELECT * FROM telemetry.ingest_measurement($1, $2, $3::double precision, $4)",
[metric_name, device_id, value, observed_at]
)
log.info(
"ingested",
metric_name=row.metric_name,
device_id=row.device_id,
action=row.action,
normalized_value=row.normalized_value
)
The worker is responsible for choosing the correct numeric or boolean call path before executing SQL. The same database call pattern applies to any worker after it maps its source message format into the historian parameters.
This API is designed for high ingestion rates without requiring the worker to manage historian state.
Typical work performed per measurement:
(metric_name, device_id)last_seen bookkeepingIn the common case where a value is unchanged, ingestion usually updates the current open segment rather than inserting a new raw sample row.
Locking notes:
(metric_name, device_id)NOWAIT contract(metric_name, device_id) concurrently from multiple writers(metric_name, device_id), for example via deterministic routing or consistent hashing in the worker layerBatching is optional:
Practical batching guidance:
(metric_name, device_id) inside the batchThe most important worker-side performance requirement is preserving ordering while keeping the database connection pool healthy.
metric_name and device_idingest_measurement overloadNULL explicitly when sending unknown valuesaction and normalized_value(metric_name, device_id) during retries and buffering