Showing 3 changed files with 283 additions and 10 deletions
+11 -5
HealthProbe/Doc/04-project/Import-Optimization-Log.md
@@ -584,6 +584,7 @@ rows exist".
584 584
 | 2026-06-03 | pending | Add capture-mode summary to diagnostics. | Repeated full-profile captures rarely produce a perfect no-delta report because at least one metric can change between manual runs. Diagnostic reports now include aggregate `CaptureModes` counts plus per-metric `captureMode`, so comparisons can separate unchanged empty-delta metrics from delta-applied metrics and full imports without manually reading every `record_import` line. Expected signal: stable checksum plus high `unchangedDelta` count and zero summed processing/insert confirms the fast path even when a few metrics changed. |
585 585
 | 2026-06-03 | pending | Add delta-event counts to diagnostics. | A full-profile follow-up completed in `47.4s` with `127/127` complete, `0` degraded, `CaptureModes: unchangedDelta=115, delta=12, initialImport=0`, `SummedProcessingElapsed: 25.9s`, `SummedInsertElapsed: 0.2s`, and `SummedFinalizeElapsed: 16.0s`. This confirms anchors work and no full import ran. Remaining cost is delta application for large metrics: Heart Rate `23.5s` total (`14.1s` processing, `8.8s` finalize), Active Energy `7.1s`, and Basal Energy `6.0s`. Diagnostics now report aggregate/per-metric `DeltaEvents` so future logs can separate true HealthKit delta size from the final visible record count. |
586 586
 | 2026-06-04 | `a676df1` | Rebuild delta compact archives without large intermediate record arrays. | Follow-up full-profile delta report completed in `42.1s` with `127/127` complete, `0` degraded, `CaptureModes: unchangedDelta=108, delta=19, initialImport=0`, and `DeltaEvents: 582`. Compared with the prior `47.4s` run, `SummedProcessingElapsed` dropped `25.9s -> 12.8s`; Heart Rate processing dropped `14.1s -> 6.4s` despite `187` delta events; Active Energy processing dropped `4.9s -> 2.5s` with `100` delta events; Basal Energy processing dropped `4.1s -> 2.0s` with `82` delta events. `SummedFinalizeElapsed` rose `16.0s -> 19.6s`, so the remaining bottleneck is now archive finalization / aggregate rebuild for changed high-volume types, not Swift archive reconstruction. |
587
+| 2026-06-04 | pending | Incrementally replace changed daily aggregate buckets. | The latest measured bottleneck is finalize work for changed high-volume delta types. Changed metrics now copy the previous observation's daily aggregates and recompute only the days touched by appeared, disappeared, or representation-changed events, with a bounded start-date window for the replacement query. Expected signal: lower `SummedFinalizeElapsed`, especially Heart Rate / Active Energy / Basal Energy finalize times, when `DeltaEvents` is small relative to the full visible type count. |
587 588
 
588 589
 ## Current Diagnosis
589 590
 
@@ -649,6 +650,11 @@ The likely bottleneck is per-row SQLite work:
649 650
   high-value target is `markVerification` / daily aggregate rebuild for changed
650 651
   high-volume delta types, where Heart Rate still spent `8.9s` finalizing and
651 652
   Active Energy spent `4.8s`.
653
+- Changed high-volume metrics should not rebuild daily aggregates for the whole
654
+  type when the current observation touched only a few days. The active
655
+  experiment is to copy previous daily aggregates and replace only affected
656
+  buckets; compare `DeltaEvents` and per-type finalize time before doing deeper
657
+  schema/index work.
652 658
 
653 659
 ## Open Issues / Observations
654 660
 
@@ -687,11 +693,11 @@ Prioritize experiments in this order:
687 693
    `CaptureModes`, and high-volume type timings. Treat stable checksum, a high
688 694
    `unchangedDelta` count, and zero processing/insert as the main unchanged-path
689 695
    signal.
690
-4. Optimize finalization for changed high-volume delta types. The latest
691
-   full-profile report showed `SummedFinalizeElapsed: 19.6s`, with Heart Rate
692
-   `8.9s`, Active Energy `4.8s`, and Basal Energy `1.7s`. Investigate whether
693
-   daily aggregates can be updated incrementally from `DeltaEvents` instead of
694
-   rebuilding all visible aggregates for the type.
696
+4. Run a full-profile repeated capture after incremental changed-bucket
697
+   aggregate replacement. Compare `SummedFinalizeElapsed`, Heart Rate / Active
698
+   Energy / Basal Energy finalize times, and `DeltaEvents`. If finalize remains
699
+   high with small `DeltaEvents`, inspect the bounded replacement SQL plan before
700
+   adding new write-time indexes.
695 701
 5. Keep using `DeltaEvents` to quantify changed high-volume metrics, especially
696 702
    Heart Rate, Active Energy, and Basal Energy. If delta events are small while
697 703
    finalize remains large, optimize aggregate rebuild/finalization rather than
+217 -5
HealthProbe/Services/SQLiteHealthArchiveStore.swift
@@ -2010,15 +2010,16 @@ actor SQLiteHealthArchiveStore: HealthArchiveStore {
2010 2010
             lookupCache: lookupCache
2011 2011
         )
2012 2012
         let counts = try eventCounts(observationID: observationID, sampleTypeID: sampleTypeID, db: db)
2013
-        if counts.appeared == 0,
2014
-           counts.disappeared == 0,
2015
-           counts.representationChanged == 0,
2016
-           let previousSummary = try previousTypeSummary(
2013
+        let previousSummary = try previousTypeSummary(
2017 2014
             sampleTypeID: sampleTypeID,
2018 2015
             beforeObservationID: observationID,
2019 2016
             db: db,
2020 2017
             statementCache: statementCache
2021
-           ) {
2018
+        )
2019
+        if counts.appeared == 0,
2020
+           counts.disappeared == 0,
2021
+           counts.representationChanged == 0,
2022
+           let previousSummary {
2022 2023
             try insertObservationTypeRun(
2023 2024
                 observationID: observationID,
2024 2025
                 sampleTypeID: sampleTypeID,
@@ -2066,6 +2067,17 @@ actor SQLiteHealthArchiveStore: HealthArchiveStore {
2066 2067
             aggregate: aggregate,
2067 2068
             db: db
2068 2069
         )
2070
+        if let previousSummary,
2071
+           try replaceChangedDailyAggregates(
2072
+            fromObservationID: previousSummary.observationID,
2073
+            toObservationID: observationID,
2074
+            sampleTypeID: sampleTypeID,
2075
+            observedAt: verifiedAt,
2076
+            db: db
2077
+           ) {
2078
+            return
2079
+        }
2080
+
2069 2081
         try rebuildDailyAggregates(
2070 2082
             observationID: observationID,
2071 2083
             sampleTypeID: sampleTypeID,
@@ -3460,6 +3472,77 @@ actor SQLiteHealthArchiveStore: HealthArchiveStore {
3460 3472
         )
3461 3473
     }
3462 3474
 
3475
+    private func replaceChangedDailyAggregates(
3476
+        fromObservationID: Int64,
3477
+        toObservationID: Int64,
3478
+        sampleTypeID: Int64,
3479
+        observedAt: Date,
3480
+        db: OpaquePointer?
3481
+    ) throws -> Bool {
3482
+        let secondsFromGMT = TimeZone.current.secondsFromGMT(for: observedAt)
3483
+        let affectedBuckets = try changedDailyAggregateBuckets(
3484
+            observationID: toObservationID,
3485
+            sampleTypeID: sampleTypeID,
3486
+            secondsFromGMT: secondsFromGMT,
3487
+            db: db
3488
+        )
3489
+        guard !affectedBuckets.isEmpty else { return false }
3490
+
3491
+        try copyDailyAggregates(
3492
+            fromObservationID: fromObservationID,
3493
+            toObservationID: toObservationID,
3494
+            sampleTypeID: sampleTypeID,
3495
+            db: db
3496
+        )
3497
+        try deleteDailyAggregateRows(
3498
+            observationID: toObservationID,
3499
+            sampleTypeID: sampleTypeID,
3500
+            bucketStarts: affectedBuckets,
3501
+            db: db
3502
+        )
3503
+        let replacementRows = try dailyAggregateRows(
3504
+            sampleTypeID: sampleTypeID,
3505
+            secondsFromGMT: secondsFromGMT,
3506
+            bucketStarts: affectedBuckets,
3507
+            db: db
3508
+        )
3509
+        try insertDailyAggregateRows(
3510
+            replacementRows,
3511
+            observationID: toObservationID,
3512
+            sampleTypeID: sampleTypeID,
3513
+            db: db
3514
+        )
3515
+        return true
3516
+    }
3517
+
3518
+    private func deleteDailyAggregateRows(
3519
+        observationID: Int64,
3520
+        sampleTypeID: Int64,
3521
+        bucketStarts: [Double],
3522
+        db: OpaquePointer?
3523
+    ) throws {
3524
+        guard !bucketStarts.isEmpty else { return }
3525
+        let placeholders = Array(repeating: "?", count: bucketStarts.count).joined(separator: ", ")
3526
+        try withStatement(
3527
+            """
3528
+            DELETE FROM daily_type_aggregates
3529
+            WHERE observation_id = ?
3530
+              AND sample_type_id = ?
3531
+              AND bucket_start IN (\(placeholders))
3532
+            """,
3533
+            db: db
3534
+        ) { statement in
3535
+            bindInt64(observationID, to: 1, in: statement)
3536
+            bindInt64(sampleTypeID, to: 2, in: statement)
3537
+            for (index, bucketStart) in bucketStarts.enumerated() {
3538
+                sqlite3_bind_double(statement, Int32(index + 3), bucketStart)
3539
+            }
3540
+            guard sqlite3_step(statement) == SQLITE_DONE else {
3541
+                throw SQLiteHealthArchiveStoreError.stepFailed(lastErrorMessage(db))
3542
+            }
3543
+        }
3544
+    }
3545
+
3463 3546
     private func insertDailyAggregateRows(
3464 3547
         _ rows: [ArchiveV2DailyAggregateRow],
3465 3548
         observationID: Int64,
@@ -3576,6 +3659,135 @@ actor SQLiteHealthArchiveStore: HealthArchiveStore {
3576 3659
         }
3577 3660
     }
3578 3661
 
3662
+    private func changedDailyAggregateBuckets(
3663
+        observationID: Int64,
3664
+        sampleTypeID: Int64,
3665
+        secondsFromGMT: Int,
3666
+        db: OpaquePointer?
3667
+    ) throws -> [Double] {
3668
+        let sql = """
3669
+        WITH affected_samples AS (
3670
+            SELECT DISTINCT e.sample_id
3671
+            FROM sample_observation_events e
3672
+            JOIN samples s ON s.id = e.sample_id
3673
+            WHERE e.observation_id = ?
3674
+              AND s.sample_type_id = ?
3675
+        ),
3676
+        candidate_dates AS (
3677
+            SELECT v.start_date
3678
+            FROM sample_observation_events e
3679
+            JOIN samples s ON s.id = e.sample_id
3680
+            JOIN sample_versions v ON v.id = e.version_id
3681
+            WHERE e.observation_id = ?
3682
+              AND s.sample_type_id = ?
3683
+              AND e.version_id IS NOT NULL
3684
+
3685
+            UNION
3686
+
3687
+            SELECT v.start_date
3688
+            FROM affected_samples a
3689
+            JOIN sample_visibility_ranges r
3690
+              ON r.sample_id = a.sample_id
3691
+             AND r.last_observation_id IS NULL
3692
+            JOIN sample_versions v ON v.id = r.version_id
3693
+
3694
+            UNION
3695
+
3696
+            SELECT v.start_date
3697
+            FROM affected_samples a
3698
+            JOIN sample_visibility_ranges r
3699
+              ON r.sample_id = a.sample_id
3700
+             AND r.last_observation_id = ?
3701
+            JOIN sample_versions v ON v.id = r.version_id
3702
+        )
3703
+        SELECT DISTINCT CAST(((start_date + ?) / 86400) AS INTEGER) * 86400 - ? AS bucket_start
3704
+        FROM candidate_dates
3705
+        ORDER BY bucket_start ASC
3706
+        """
3707
+
3708
+        return try withStatement(sql, db: db) { statement in
3709
+            bindInt64(observationID, to: 1, in: statement)
3710
+            bindInt64(sampleTypeID, to: 2, in: statement)
3711
+            bindInt64(observationID, to: 3, in: statement)
3712
+            bindInt64(sampleTypeID, to: 4, in: statement)
3713
+            bindInt64(observationID, to: 5, in: statement)
3714
+            sqlite3_bind_double(statement, 6, Double(secondsFromGMT))
3715
+            sqlite3_bind_double(statement, 7, Double(secondsFromGMT))
3716
+
3717
+            var buckets: [Double] = []
3718
+            while sqlite3_step(statement) == SQLITE_ROW {
3719
+                buckets.append(sqlite3_column_double(statement, 0))
3720
+            }
3721
+            return buckets
3722
+        }
3723
+    }
3724
+
3725
+    private func dailyAggregateRows(
3726
+        sampleTypeID: Int64,
3727
+        secondsFromGMT: Int,
3728
+        bucketStarts: [Double],
3729
+        db: OpaquePointer?
3730
+    ) throws -> [ArchiveV2DailyAggregateRow] {
3731
+        guard !bucketStarts.isEmpty else { return [] }
3732
+        let lowerStartDate = bucketStarts.min() ?? 0
3733
+        let upperStartDate = (bucketStarts.max() ?? lowerStartDate) + 86_400
3734
+        let placeholders = Array(repeating: "?", count: bucketStarts.count).joined(separator: ", ")
3735
+        let sql = """
3736
+        WITH visible_rows AS (
3737
+            SELECT
3738
+                CAST(((v.start_date + ?) / 86400) AS INTEGER) * 86400 - ? AS bucket_start,
3739
+                CAST(((v.start_date + ?) / 86400) AS INTEGER) * 86400 - ? + 86400 AS bucket_end,
3740
+                v.numeric_value,
3741
+                v.source_revision_id
3742
+            FROM samples s INDEXED BY idx_samples_type_id
3743
+            JOIN sample_visibility_ranges r INDEXED BY idx_visibility_sample_open_version
3744
+              ON r.sample_id = s.id
3745
+             AND r.last_observation_id IS NULL
3746
+            JOIN sample_versions v ON v.id = r.version_id
3747
+            WHERE s.sample_type_id = ?
3748
+              AND v.start_date >= ?
3749
+              AND v.start_date < ?
3750
+        )
3751
+        SELECT
3752
+            bucket_start,
3753
+            bucket_end,
3754
+            COUNT(*),
3755
+            SUM(numeric_value),
3756
+            MAX(numeric_value),
3757
+            source_revision_id
3758
+        FROM visible_rows
3759
+        WHERE bucket_start IN (\(placeholders))
3760
+        GROUP BY bucket_start, bucket_end, source_revision_id
3761
+        ORDER BY bucket_start ASC, source_revision_id ASC
3762
+        """
3763
+
3764
+        return try withStatement(sql, db: db) { statement in
3765
+            sqlite3_bind_double(statement, 1, Double(secondsFromGMT))
3766
+            sqlite3_bind_double(statement, 2, Double(secondsFromGMT))
3767
+            sqlite3_bind_double(statement, 3, Double(secondsFromGMT))
3768
+            sqlite3_bind_double(statement, 4, Double(secondsFromGMT))
3769
+            bindInt64(sampleTypeID, to: 5, in: statement)
3770
+            bindDouble(lowerStartDate, to: 6, in: statement)
3771
+            bindDouble(upperStartDate, to: 7, in: statement)
3772
+            for (index, bucketStart) in bucketStarts.enumerated() {
3773
+                sqlite3_bind_double(statement, Int32(index + 8), bucketStart)
3774
+            }
3775
+
3776
+            var rows: [ArchiveV2DailyAggregateRow] = []
3777
+            while sqlite3_step(statement) == SQLITE_ROW {
3778
+                rows.append(ArchiveV2DailyAggregateRow(
3779
+                    bucketStart: sqlite3_column_double(statement, 0),
3780
+                    bucketEnd: sqlite3_column_double(statement, 1),
3781
+                    visibleRecordCount: columnInt(statement, 2) ?? 0,
3782
+                    valueSum: columnDouble(statement, 3),
3783
+                    valueMax: columnDouble(statement, 4),
3784
+                    sourceRevisionID: columnInt64(statement, 5)
3785
+                ))
3786
+            }
3787
+            return rows
3788
+        }
3789
+    }
3790
+
3579 3791
     private func typeSummary(observationID: Int64, sampleTypeID: Int64, db: OpaquePointer?) throws -> ArchiveV2TypeSummary {
3580 3792
         let counts = try eventCounts(observationID: observationID, sampleTypeID: sampleTypeID, db: db)
3581 3793
         let aggregate = try visibleAggregate(sampleTypeID: sampleTypeID, db: db)
+55 -0
HealthProbeTests/SQLiteHealthArchiveStoreTests.swift
@@ -169,6 +169,61 @@ final class SQLiteHealthArchiveStoreTests: XCTestCase {
169 169
         XCTAssertFalse(try tableExists("archive_samples", at: url))
170 170
     }
171 171
 
172
+    func testChangedVerificationReplacesOnlyAffectedDailyAggregateBuckets() async throws {
173
+        let url = databaseURL()
174
+        let store = SQLiteHealthArchiveStore(databaseURL: url)
175
+        let firstDaySample = makeStepCountSample(value: 10, start: 1_000)
176
+        let secondDaySample = makeStepCountSample(value: 20, start: 90_000)
177
+        let secondDayDeltaSample = makeStepCountSample(value: 30, start: 91_000)
178
+
179
+        _ = try await store.upsertSamples(
180
+            [firstDaySample, secondDaySample],
181
+            observedAt: Date(timeIntervalSince1970: 95_000)
182
+        )
183
+        try await store.markVerification(
184
+            sampleType: firstDaySample.sampleType,
185
+            verifiedAt: Date(timeIntervalSince1970: 95_060)
186
+        )
187
+        let changedObservationID = try await store.beginObservation(
188
+            observedAt: Date(timeIntervalSince1970: 96_000),
189
+            triggerReason: "manual",
190
+            selectedTypeSetHash: "selected-types"
191
+        )
192
+
193
+        _ = try await store.upsertSamples(
194
+            [secondDayDeltaSample],
195
+            observedAt: Date(timeIntervalSince1970: 96_000),
196
+            observationID: changedObservationID
197
+        )
198
+        try await store.markVerification(
199
+            sampleType: firstDaySample.sampleType,
200
+            verifiedAt: Date(timeIntervalSince1970: 96_000),
201
+            observationID: changedObservationID
202
+        )
203
+        try await store.finishObservation(
204
+            observationID: changedObservationID,
205
+            status: "completed",
206
+            endedAt: Date(timeIntervalSince1970: 96_010)
207
+        )
208
+
209
+        XCTAssertEqual(
210
+            try countRows(in: "daily_type_aggregates WHERE observation_id = \(changedObservationID)", at: url),
211
+            2
212
+        )
213
+        XCTAssertEqual(
214
+            try countRows(in: "daily_type_aggregates WHERE observation_id = \(changedObservationID) AND visible_record_count = 1", at: url),
215
+            1
216
+        )
217
+        XCTAssertEqual(
218
+            try countRows(in: "daily_type_aggregates WHERE observation_id = \(changedObservationID) AND visible_record_count = 2", at: url),
219
+            1
220
+        )
221
+        XCTAssertEqual(
222
+            try countRows(in: "observation_type_summaries WHERE observation_id = \(changedObservationID) AND visible_record_count = 3 AND appeared_count = 1", at: url),
223
+            1
224
+        )
225
+    }
226
+
172 227
     func testDiffSummaryAndRecordsBetweenObservationsUseSQLVisibility() async throws {
173 228
         let url = databaseURL()
174 229
         let store = SQLiteHealthArchiveStore(databaseURL: url)