# Telemetry Ingestion API

This document describes how an ingestion worker should write measurements into the PostgreSQL telemetry historian.

It documents the database API as implemented today. It does not redefine the schema or the historian model.

This document covers only the currently implemented measurement API. Counter metrics are tracked separately in [counter_ingestion_api.md](./counter_ingestion_api.md) and [energy_counter_draft.md](./energy_counter_draft.md).

MQTT is used throughout this document only as an example source transport because it is a common worker implementation target. The PostgreSQL API itself is source-agnostic: the same ingestion flow applies to any worker that can map incoming data to `metric_name`, `device_id`, `value`, and `observed_at`.

## 1. System Overview

One common ingestion path is:

```text
Source devices / upstream bus   
    -> ingestion worker
    -> telemetry.ingest_measurement(...)
    -> metric segment tables
```

More generally, the source worker is responsible for:

- receiving source messages from MQTT, HTTP, files, queues, serial links, or another upstream transport
- parsing the source envelope and payload
- mapping each message to `metric_name`, `device_id`, `value`, and `observed_at`
- calling the correct PostgreSQL ingestion function
- logging success or failure

The database is responsible for historian behavior:

- append-only ingestion
- per-device/per-metric serialization via advisory locks
- deduplication by epsilon or exact comparison
- segment storage instead of raw sample storage
- lazy gap detection
- explicit unknown intervals via `NULL`
- policy versioning via `telemetry.metric_policies`
- automatic segment splitting when policy boundaries are crossed

For non-MQTT transports, replace topic subscription/parsing with the equivalent source-specific decode step. The database contract stays the same.

Workers should treat PostgreSQL as the source of truth for consolidation logic. They should send measurements in order and let the database decide whether a segment is extended, split, or closed.

## 2. Example MQTT Topic Model

If the source transport is MQTT, a common topic shape is:

```text
/$bus/$metric/$domain/$sensor
```

Examples:

```text
/homebus/temperature/bedroom/sensor1
/homebus/humidity/bedroom/sensor1
/homebus/rssi/network/sensor1
```

Example worker mapping:

```text
topic: /homebus/temperature/bedroom/sensor1

metric_name = "temperature"
device_id   = "bedroom.sensor1"
```

Another example:

```text
topic: /homebus/rssi/network/sensor1

metric_name = "rssi"
device_id   = "network.sensor1"
```

Important:

- topic-to-metric mapping is implemented in the ingestion worker
- topic-to-device mapping is implemented in the ingestion worker
- the database does not parse MQTT topics
- the database expects already-mapped application identifiers

If your deployment is not using MQTT, apply the same idea to whatever source envelope you receive. The database only needs the final mapped identifiers.

If your deployment needs aliases, normalization, or routing rules, implement them in the worker before calling PostgreSQL.

## 3. Database Ingestion API

Two typed overloads exist.

Numeric metrics:

```sql
SELECT *
FROM telemetry.ingest_measurement(
    p_metric_name  => $1,
    p_device_id    => $2,
    p_value        => $3::double precision,
    p_observed_at  => $4::timestamptz
);
```

Boolean metrics:

```sql
SELECT *
FROM telemetry.ingest_measurement(
    p_metric_name  => $1,
    p_device_id    => $2,
    p_value        => $3::boolean,
    p_observed_at  => $4::timestamptz
);
```

Parameter semantics:

- `p_metric_name`: application metric identifier registered in `telemetry.metrics`
- `p_device_id`: logical device identifier chosen by the worker
- `p_value`: typed measurement value, or `NULL` for “measurement unavailable”
- `p_observed_at`: time the device observed the measurement

Important implementation note:

- because `ingest_measurement` is overloaded, `NULL` values must be typed explicitly
- use `NULL::double precision` for numeric unknown values
- use `NULL::boolean` for boolean unknown values

Example:

```sql
SELECT *
FROM telemetry.ingest_measurement(
    'temperature',
    'bedroom.sensor1',
    NULL::double precision,
    '2026-03-08T10:00:00Z'::timestamptz
);
```

## 4. Ingestion Response

Each call returns one row:

- `metric_name`
- `device_id`
- `table_name`
- `normalized_value`
- `action`

`normalized_value` is the value actually used by the historian after normalization:

- numeric metrics: rounded according to metric policy
- boolean metrics: identical to input
- `NULL`: explicit unknown value

`action` is returned by the internal segment engine and is useful for logs, metrics, and debugging.

Actions:

- `opened`: no open segment existed; a new non-`NULL` segment was created
- `opened_null`: no open segment existed; a new `NULL` segment was created
- `extended`: the current non-`NULL` segment was kept open and `samples_count` increased
- `extended_null`: the current `NULL` segment was kept open and `samples_count` increased
- `split`: the previous segment was closed and a new segment was opened
- `null_to_value`: a `NULL` segment was closed and replaced by a value segment
- `value_to_null`: a value segment was closed and replaced by a `NULL` segment
- `gap_split`: a late gap was detected; the previous value segment was closed at `last_observed_at + max_sampling_interval`, a synthetic `NULL` gap segment was inserted, then a new value segment was opened
- `gap_to_null`: a late gap was detected and the next state is unknown; the previous value segment was closed at `last_observed_at + max_sampling_interval`, then a new open `NULL` segment was opened

Workers usually do not branch on `action`. Treat it as observability data.

Recommended worker logging:

- metric name
- device id
- observed timestamp
- original payload value
- returned `normalized_value`
- returned `action`

## 5. Automatic Behaviors Implemented by the Database

The worker should not re-implement historian logic already handled in PostgreSQL.

Automatic device provisioning:

- ingestion calls `ensure_device()`
- if `device_id` does not exist in `telemetry.devices`, it is created automatically

Append-only enforcement:

- ingestion calls `assert_append_only()`
- the database rejects measurements where `observed_at <= last_observed_at` for the same `(metric_name, device_id)`

Policy lookup:

- ingestion calls `require_metric_policy()`
- the active policy is selected from `telemetry.metric_policies` using `valid_from <= observed_at`

Gap detection:

- the database detects gaps lazily when the next measurement arrives
- workers must not emit timeout markers on their own

Segment splitting:

- value changes, explicit `NULL`, policy changes, and gap boundaries are converted into segment transitions automatically

Policy boundary splitting:

- if a metric policy changes while a segment is still open, the database splits the open segment at `policy.valid_from`
- workers do not need to know segment internals or policy transitions

Tail `NULL` simulation:

- if a stream stops, the database does not persist a trailing `NULL` segment immediately
- query functions simulate unknown tail values after `last_observed_at + max_sampling_interval`

## 6. Error Handling

Workers should treat database errors as either permanent message errors or transient infrastructure errors, regardless of whether the source transport is MQTT, HTTP, serial, or another upstream feed.

Common permanent errors:

- unknown metric
  - example: `ERROR: unknown metric: temperature`
  - cause: `metric_name` is not registered
- out-of-order measurement
  - example: `ERROR: out-of-order measurement for metric ...`
  - cause: message timestamp is older than or equal to the watermark for that metric/device
- invalid numeric value
  - examples:
    - `ERROR: value ... is below min_value ...`
    - `ERROR: value ... is above max_value ...`
  - cause: payload violates policy bounds
- metric type mismatch
  - examples:
    - `ERROR: metric ... is boolean; use the boolean overload of ingest_measurement(...)`
    - `ERROR: metric ... is numeric; use the numeric overload of ingest_measurement(...)`
- disallowed explicit `NULL`
  - example: `ERROR: metric table ... does not allow explicit NULL measurements`
  - cause: worker sent unknown value for a metric with `allow_null = false`

Recommended behavior for permanent message errors:

- log the full error together with the source metadata and payload
- if the source is MQTT, include topic and payload
- write the message to a dead-letter path or equivalent audited error sink before discarding it
- discard the message from the hot path after it has been captured for audit
- do not retry it
- increment an application metric or dead-letter counter

Recommended dead-letter payload:

- error type
- original source metadata such as topic, route, file offset, or queue message id
- original payload or a safe reference to it
- database error message
- worker instance id
- timestamp
- attempt count if retries already happened

Recommended behavior for transient failures:

- retry only for infrastructure failures such as connection loss, failover, or temporary database unavailability
- preserve per-device/per-metric ordering when retrying
- do not reorder buffered measurements while retrying

If a retry happens after an ambiguous network failure, the worker may see an out-of-order error if the first attempt actually committed. Handle that case as an operational duplicate and log it accordingly.

## 7. Idempotency and Ordering

The ingestion API assumes chronological ordering per `(metric_name, device_id)`.

This is enforced by:

- `telemetry.metric_device_watermarks`

Consequences for workers:

- do not send older measurements after newer ones for the same metric/device
- if a device buffers multiple samples, flush them in timestamp order
- if the source can redeliver messages, preserve original order
- do not use broker receive time to reorder a device stream

This API is append-only, not last-write-wins.

Same-timestamp replay is not accepted:

- `observed_at = last_observed_at` is rejected
- `observed_at < last_observed_at` is rejected

Important limitation of the current measurement API:

- `ingest_measurement(...)` does not currently accept an explicit `idempotency_key`
- after an ambiguous network failure, a safe replay may surface as an out-of-order duplicate
- workers should treat this as a limitation of the current measurement API, not as a general design recommendation for future counter ingestion

## 8. Time Semantics

`observed_at` is the time the device measured the value.

It is not:

- source receive time
- worker processing time
- database insert time

Preferred worker behavior:

- use the timestamp provided by the device payload if available
- preserve the device timestamp exactly
- only fall back to worker receive time when the upstream source does not provide an observation timestamp

Historian correctness depends on `observed_at` being as close as possible to the real device observation time.

## 9. Example Worker Flow

Example flow using MQTT as one source transport:

1. MQTT message is received on `/homebus/temperature/bedroom/sensor1`
2. worker parses the topic
3. worker maps the topic to:
   - `metric_name = 'temperature'`
   - `device_id = 'bedroom.sensor1'`
4. worker parses payload value and device timestamp
5. worker calls `telemetry.ingest_measurement(...)`
6. worker logs `action` and `normalized_value`

Example numeric SQL call:

```sql
SELECT *
FROM telemetry.ingest_measurement(
    'temperature',
    'bedroom.sensor1',
    22.97::double precision,
    '2026-03-08T10:15:12Z'::timestamptz
);
```

Example boolean SQL call:

```sql
SELECT *
FROM telemetry.ingest_measurement(
    'motion_detected',
    'hallway.sensor1',
    true,
    '2026-03-08T10:15:12Z'::timestamptz
);
```

Example pseudo-code:

```text
on_message(topic, payload):
    metric_name = map_topic_to_metric(topic)
    device_id = map_topic_to_device(topic)
    observed_at = extract_device_timestamp(payload)
    value = parse_payload_value(payload, metric_name)

    row = db.query_one(
        "SELECT * FROM telemetry.ingest_measurement($1, $2, $3::double precision, $4)",
        [metric_name, device_id, value, observed_at]
    )

    log.info(
        "ingested",
        metric_name=row.metric_name,
        device_id=row.device_id,
        action=row.action,
        normalized_value=row.normalized_value
    )
```

The worker is responsible for choosing the correct numeric or boolean call path before executing SQL. The same database call pattern applies to any worker after it maps its source message format into the historian parameters.

## 10. Performance Expectations

This API is designed for high ingestion rates without requiring the worker to manage historian state.

Typical work performed per measurement:

- one advisory lock for `(metric_name, device_id)`
- metric and policy resolution inside PostgreSQL
- one segment update or one segment insert on the target metric table
- watermark and device `last_seen` bookkeeping

In the common case where a value is unchanged, ingestion usually updates the current open segment rather than inserting a new raw sample row.

Locking notes:

- serialization is enforced by a PostgreSQL advisory transaction lock on `(metric_name, device_id)`
- the current implementation waits for the advisory lock inside the transaction; it is not a fail-fast `NOWAIT` contract
- workers should therefore avoid sending the same `(metric_name, device_id)` concurrently from multiple writers
- the preferred operational pattern is writer affinity per `(metric_name, device_id)`, for example via deterministic routing or consistent hashing in the worker layer
- if lock waits become visible in production, treat that as a signal that worker concurrency or routing needs review rather than relying on database-side contention as normal flow

Batching is optional:

- a worker may issue one SQL call per source message
- batching can reduce client/network overhead
- batching is not required for correctness

Practical batching guidance:

- batching is most useful when it reduces network round-trips without mixing unrelated retry domains
- keep ordering intact per `(metric_name, device_id)` inside the batch
- if a deployment expects large polling waves, modest transactional batches are preferable to one giant transaction
- if policy changes are rolled out, stagger them when possible rather than changing many hot metrics at exactly the same moment

The most important worker-side performance requirement is preserving ordering while keeping the database connection pool healthy.

## Implementation Checklist

- map the source message into `metric_name` and `device_id`
- parse payload into typed numeric or boolean value
- preserve device observation timestamps
- call the correct `ingest_measurement` overload
- cast `NULL` explicitly when sending unknown values
- log returned `action` and `normalized_value`
- discard permanent validation errors
- retry only transient database failures
- preserve order per `(metric_name, device_id)` during retries and buffering
