# Counter Ingestion API

This document defines how a historian worker should write cumulative counter observations into the PostgreSQL telemetry historian.

It complements the measurement ingestion path and exists because cumulative counters do not have the same storage semantics as instantaneous measurements.

Examples of counters:

- `energy_total`
- `import_energy_total`
- `export_energy_total`
- `rx_bytes_total`
- `tx_packets_total`

Measurement-style numeric and boolean values continue to use `mqtt_ingestion_api.md`.

---

## 1. System Overview

One common ingestion path is:

```text
Canonical MQTT bus
    -> historian worker
    -> telemetry.ingest_counter(...)
    -> counter historian tables
```

The worker is responsible for:

- subscribing to canonical counter-bearing bus topics
- mapping each message to `metric_name`, `device_id`, `counter_value`, and `observed_at`
- preserving ordering per counter stream
- passing replay metadata when available
- logging result actions and failures

The database is responsible for:

- append-only ordering enforcement
- duplicate detection where replay metadata allows it
- reset and rollover boundary classification
- policy lookup
- query-time freshness and gap interpretation

The worker should treat PostgreSQL as the source of truth for counter boundary logic.

---

## 2. Why Counter Data Uses a Separate API

Cumulative counters are not the same as instantaneous measurements.

Key differences:

- the value represents an ever-growing total, not a sampled state
- a drop in value is usually meaningful and may indicate reset or rollover
- gaps should be derived at query time from freshness policy, not stored as synthetic `NULL`
- `counter_value` should never be written as `NULL`

For this reason, the worker MUST NOT send cumulative counters through `telemetry.ingest_measurement(...)` just because the payload is numeric.

---

## 3. Example MQTT Mapping

The database does not parse MQTT topics.

The worker maps canonical topics to application identifiers before calling PostgreSQL.

Example 1:

```text
topic: vad/energy/load/living-room-tv/energy_total/value

metric_name   = "energy_total"
device_id     = "load.living-room-tv"
counter_value = 4.72
observed_at   = 2026-03-21T10:15:12Z
```

Example 2:

```text
topic: vad/energy/grid/main-meter/import_energy_total/value

metric_name   = "import_energy_total"
device_id     = "grid.main-meter"
counter_value = 18654.31
observed_at   = 2026-03-21T10:15:12Z
```

The same pattern applies to future domains such as network traffic counters.

---

## 4. Database Ingestion API

Canonical signature:

```sql
SELECT *
FROM telemetry.ingest_counter(
    p_metric_name     => $1,
    p_device_id       => $2,
    p_counter_value   => $3::numeric,
    p_observed_at     => $4::timestamptz,
    p_source_sequence => $5,
    p_idempotency_key => $6,
    p_snapshot_id     => $7
);
```

Parameter semantics:

- `p_metric_name`: counter metric identifier registered in the database
- `p_device_id`: logical device identifier chosen by the worker
- `p_counter_value`: cumulative counter observation
- `p_observed_at`: time the source observed the counter value
- `p_source_sequence`: optional monotonic source-side sequence or offset
- `p_idempotency_key`: optional replay-safe deduplication key
- `p_snapshot_id`: optional identifier for a source snapshot or polling batch

Rules:

- `p_counter_value` MUST NOT be `NULL`
- `p_observed_at` MUST be present
- workers SHOULD pass `p_source_sequence` when the source provides a reliable monotonic sequence
- workers SHOULD pass `p_idempotency_key` when replay ambiguity exists
- workers MAY leave optional replay fields `NULL` if the source does not provide them

---

## 5. Core Semantics

Counter stream identity is:

```text
stream = (metric_name, device_id)
```

Core rules:

- writes are append-only per stream
- observations MUST arrive in strictly increasing `observed_at` order per stream
- `counter_value` MUST be non-null
- silence does not produce synthetic `NULL` writes
- decreases in `counter_value` create semantic boundaries
- deltas and rates MUST NOT cross reset or rollover boundaries

The worker should assume the database owns the interpretation of those boundaries through metric policy.

---

## 6. Reliability Metadata

Three reliability levels are supported conceptually.

### Degraded / At-Most-Once

Inputs:

- no `source_sequence`
- no `idempotency_key`

Implication:

- retries after ambiguous failures are unsafe

### Recommended / At-Least-Once

Inputs:

- `source_sequence` or `idempotency_key`

Implication:

- replay is detectable

### Stronger Replay Safety

Inputs:

- both `source_sequence` and `idempotency_key`

Implication:

- duplicate detection is explicit and robust

The database contract should remain usable at all three levels, but higher reliability depends on richer source metadata.

---

## 7. Reporting Modes and Freshness

Counter policies should support three reporting modes:

- `periodic`
- `on_change`
- `hybrid`

Semantics:

- `periodic`: updates are expected on a cadence; silence eventually means stale
- `on_change`: updates happen only on change; silence does not imply delta `0`
- `hybrid`: updates happen on change plus periodic heartbeat

Recommended freshness defaults:

| Reporting mode | Input | Default `stale_after_s` |
|---|---|---|
| `periodic` | `expected_interval_s` | `expected_interval_s * 2` |
| `on_change` | `heartbeat_interval_s` | `heartbeat_interval_s * 2` |
| `hybrid` | `heartbeat_interval_s` | `heartbeat_interval_s * 2` |

If `stale_after_s` is explicitly defined by policy, it overrides these defaults.

Freshness is evaluated at query time, not by inserting synthetic unknown rows.

---

## 8. Ingestion Response

Each call should return one row that is useful for logging and debugging.

Recommended fields:

- `metric_name`
- `device_id`
- `normalized_counter_value`
- `action`
- `boundary_kind`

Recommended `action` values:

- `opened`: first observation for an open stream
- `extended`: normal append to an existing stream
- `duplicate_ignored`: replay duplicate recognized from reliability metadata
- `boundary_split`: a reset or rollover boundary was classified and a new segment started

Recommended `boundary_kind` values:

- `none`
- `reset_boundary`
- `rollover_boundary`
- `invalid_drop`

Workers usually should not branch on these values, but they SHOULD log them.

---

## 9. Worker Routing Rules

The historian worker should route counter samples based on canonical metric semantics, not vendor heuristics.

Rules:

- bus metrics explicitly classified as cumulative counters MUST go to `telemetry.ingest_counter(...)`
- measurement-style metrics MUST continue to go to `telemetry.ingest_measurement(...)`
- retained `last` MUST NOT be used as a normal counter ingestion source
- if a counter metric is encountered and the counter path is unavailable, the worker SHOULD skip it and expose the skip operationally

Examples of metrics that belong here:

- `energy_total`
- `import_energy_total`
- `export_energy_total`
- `rx_bytes_total`
- `tx_packets_total`

Examples of metrics that do not belong here:

- `active_power`
- `voltage`
- `current`
- `temperature`
- `soc`

---

## 10. Error Handling

Permanent message errors include:

- unknown counter metric
- out-of-order counter observation
- `NULL` counter value
- negative counter value where policy disallows it
- replay metadata conflict that proves the message is invalid

Transient failures include:

- PostgreSQL unavailable
- network interruption between worker and database
- temporary worker restart during streaming

Rules:

- permanent message errors SHOULD be logged and dead-lettered
- transient failures SHOULD be retried
- retries MUST preserve ordering per `(metric_name, device_id)`
- workers SHOULD treat ambiguous retry success followed by duplicate detection as an operational duplicate, not as corruption

---

## 11. Implementation Guidance

Recommended implementation order:

1. worker-side routing from semantic bus metrics to counter pipeline
2. SQL wrapper for `telemetry.ingest_counter(...)`
3. operational metrics for skipped, ingested, duplicated, and boundary-split counter samples
4. query-side validation for freshness, reset handling, and delta/rate calculations

Recommended first live metrics:

- `energy_total`
- `import_energy_total`
- `export_energy_total`

These are the most concrete counter metrics already present in the current semantic bus design.

---

## 12. Relationship with the Semantic Bus

The semantic bus remains the canonical transport.

Counter metrics:

- continue to use normal semantic bus topics such as `.../value`
- remain visible to any consumer that understands cumulative semantics
- do not require a separate bus just because the historian storage path differs

This preserves one semantic model while allowing the historian to use a storage API that matches counter behavior.
