Showing 10 changed files with 446 additions and 99 deletions
+7 -5
HealthProbe/Doc/00-agent-guides/AGENTS.md
@@ -186,12 +186,14 @@ final class TypeDistributionBin {
186 186
 // SQLite archive store, in one schema that can preserve relationships across data
187 187
 // types, sources, devices, workouts, and metadata.
188 188
 
189
-// Interface updated 2026-05-18 — see AGENTS.md
189
+// Interface updated 2026-05-24 — see AGENTS.md
190 190
 // Services/Protocols/HealthArchiveStore.swift defines the local archive boundary.
191
-// SQLiteHealthArchiveStore is the current implementation. HealthKit anchored-query
192
-// pages must be written to this archive before UI/cache rows are saved.
193
-// Deletions are recorded by sampleUUIDHash because HKDeletedObject exposes UUIDs,
194
-// not complete sample payloads.
191
+// SQLiteHealthArchiveStore is the current implementation. Each user-visible
192
+// capture creates one archive observation; HealthKit anchored-query pages,
193
+// verification runs, and deleted-object evidence for that capture must attach to
194
+// that observation id before UI/cache rows are saved. Deletions are recorded by
195
+// sampleUUIDHash because HKDeletedObject exposes UUIDs, not complete sample
196
+// payloads.
195 197
 
196 198
 // Interface updated 2026-05-24 — see AGENTS.md
197 199
 // HealthArchiveStore exposes SQL-first observation diff APIs:
+17 -6
HealthProbe/Doc/02-architecture/Database-Design.md
@@ -91,6 +91,12 @@ An observation records:
91 91
 - HealthKit anchors;
92 92
 - events and aggregate changes observed during the capture.
93 93
 
94
+Implementation note, 2026-05-24: the app creates one archive `observations`
95
+row for each user-visible capture/snapshot. Anchored HealthKit pages,
96
+`HKDeletedObject` evidence, and the final type verification rows for that
97
+capture attach to that same observation id so SQL diffs, cache rows, and
98
+transition UI can reference a stable capture boundary.
99
+
94 100
 ### Terminology
95 101
 
96 102
 - **Capture**: the act of querying HealthKit and writing results to the archive.
@@ -538,10 +544,10 @@ Manifest hashes must cover exported content through item hashes, not only counts
538 544
 
539 545
 ### 7.1 Capture Transaction Shape
540 546
 
541
-For each observation/type run:
547
+For each user-visible capture:
542 548
 
543
-1. Open SQLite transaction.
544
-2. Insert/update `observations` and `observation_type_runs`.
549
+1. Create one `observations` row with status `running`.
550
+2. For each observation/type run, update `observation_type_runs`.
545 551
 3. For each added/visible sample page:
546 552
    - upsert source/source revision/device/metadata;
547 553
    - upsert `samples`;
@@ -552,9 +558,14 @@ For each observation/type run:
552 558
    - find sample by UUID hash;
553 559
    - insert deleted/disappeared event;
554 560
    - close open visibility ranges.
555
-5. Recompute affected materialized aggregates.
556
-6. Commit SQLite.
557
-7. Update/rebuild Core Data cache after SQLite commit.
561
+5. Record final per-type verification counts for the same observation id.
562
+6. Recompute affected materialized aggregates.
563
+7. Mark the observation `completed`, `needs_review`, `partial_*`, or `failed`.
564
+
565
+SQLite writes may be committed per page/type to keep memory and lock time
566
+bounded, but they must share the capture's observation id.
567
+
568
+8. Update/rebuild Core Data cache after SQLite commit.
558 569
 
559 570
 SQLite commit must happen before Core Data cache update. Cache rebuild failures must not corrupt archive truth.
560 571
 
+2 -2
HealthProbe/Doc/02-architecture/Implementation-Guide.md
@@ -62,9 +62,9 @@ Use:
62 62
 
63 63
 Capture flow:
64 64
 1. Resolve the current local device chain ID.
65
-2. Start an observation record.
65
+2. Start one archive observation record for the user-visible capture and keep its id.
66 66
 3. For each selected sample type, run anchored queries.
67
-4. Write HealthKit samples and deleted-object evidence to the local archive first.
67
+4. Write HealthKit samples, deleted-object evidence, and final per-type verification to the local archive first, all under that same observation id.
68 68
 5. Update materialized aggregate tables in SQLite.
69 69
 6. Save/rebuild derived Core Data cache rows only after archive writes succeed.
70 70
 7. Compute summary/diff caches for UI and reports.
+4 -4
HealthProbe/Doc/04-project/IMPLEMENTATION_STATUS.md
@@ -24,8 +24,8 @@ There are no real deployments, only test installations. Existing prototype datab
24 24
 | Area | Current Status | Target / Next Work |
25 25
 |------|----------------|--------------------|
26 26
 | Product docs | Updated | Keep `HealthProbe/Doc/README.md` as canonical index |
27
-| HealthKit capture | Prototype exists | Adapt capture to write differential SQLite observations first |
28
-| SQLite archive | Archive v2 schema, differential write path, v2 verification/delete bookkeeping, daily aggregate rebuilds, integrity report, v2 record reads, SQL diff/count/aggregate/provenance/consolidation-evidence APIs, large synthetic diff pagination coverage, formal timing/memory metrics, and XCTest coverage are in place; the legacy `archive_samples` mirror has been removed | Start Core Data cache work |
27
+| HealthKit capture | Capture now opens one archive observation per user-visible snapshot and attaches HealthKit pages, deleted-object evidence, and type verification to that observation id before finishing it | Continue moving UI/cache reads to archive-backed observation ids |
28
+| SQLite archive | Archive v2 schema, snapshot-level observation grouping, differential write path, v2 verification/delete bookkeeping, daily aggregate rebuilds, integrity report, v2 record reads, SQL diff/count/aggregate/provenance/consolidation-evidence APIs, large synthetic diff pagination coverage, formal timing/memory metrics, and XCTest coverage are in place; the legacy `archive_samples` mirror has been removed | Move Snapshots/Data Types from SwiftData previews to archive/cache DTOs |
29 29
 | Core Data cache | Initial programmatic Core Data model, full-cache rebuild service, read DTOs for observation/type/health rows, and Dashboard archive-cache status wiring are in place | Move Snapshots/Data Types to cache DTOs and add targeted partial invalidation |
30 30
 | SwiftData cache | Exists; test builds now reset legacy prototype UI/archive/cache stores once for archive v2 so old SwiftData-only snapshots are not treated as backed-up observations | Treat as disposable prototype data; reset/ignore during v2 transition |
31 31
 | UI | Prototype exists; Snapshots/Data Types now default to the local device timeline instead of a multi-device picker, and record-change detail uses a separate preview/paged list view with archive-value enrichment and scoped export action | Reframe remaining screens around observations, diffs, export, archive status |
@@ -53,12 +53,12 @@ Detailed checkable milestones live in [`Refactoring-Plan.md`](Refactoring-Plan.m
53 53
 - Legacy SwiftData-only snapshots can show diffs without archive-backed values; they are now reset for archive v2 test installs rather than migrated.
54 54
 - Existing implementation may decode or cache too much data for low-end devices.
55 55
 - Old prototype database compatibility is no longer required.
56
-- Initial SQLite archive tests cover open/init/reset/idempotency, legacy mirror removal, small observation diffs, large synthetic diff pagination, formal timing/memory metrics, materialized aggregate comparison, source/provenance breakdowns, and consolidation-evidence labels, but not yet export behavior.
56
+- Initial SQLite archive tests cover open/init/reset/idempotency, snapshot-level observation grouping, legacy mirror removal, small observation diffs, large synthetic diff pagination, formal timing/memory metrics, materialized aggregate comparison, source/provenance breakdowns, and consolidation-evidence labels, but not yet export behavior.
57 57
 - Initial Core Data cache tests cover full rebuild from SQLite and delete-cache-then-rebuild without losing archive data.
58 58
 
59 59
 ## Verification Checklist
60 60
 
61
-- [ ] SQLite archive v2 can reconstruct records visible at observation T.
61
+- [x] SQLite archive v2 can reconstruct records visible at observation T.
62 62
 - [ ] No recurring complete snapshot copies are written for high-volume types.
63 63
 - [x] SQL diff between two observations runs without loading full datasets into Swift arrays.
64 64
 - [x] Expensive counts used by reports/UI are cached and rebuildable.
+4 -1
HealthProbe/Doc/04-project/Refactoring-Plan.md
@@ -1,6 +1,6 @@
1 1
 # HealthProbe - Database-Led Refactoring Plan
2 2
 
3
-**Last Updated:** 2026-05-23
3
+**Last Updated:** 2026-05-24
4 4
 **Status:** Active planning document
5 5
 
6 6
 ## Goal
@@ -119,6 +119,8 @@ Acceptance:
119 119
 
120 120
 Checklist:
121 121
 - [x] Create observation transaction wrapper.
122
+- [x] Attach HealthKit pages, deleted-object evidence, and final type verification to the same user-visible archive observation.
123
+- [x] Store the archive observation id on the legacy `HealthSnapshot` bridge model for transition screens.
122 124
 - [x] Upsert sample types.
123 125
 - [x] Upsert source/source revision/device/metadata rows.
124 126
 - [x] Upsert sample identity.
@@ -140,6 +142,7 @@ Acceptance:
140 142
 - [x] Representation change creates a new version, not a new logical sample.
141 143
 - [x] Disappearance closes visibility range.
142 144
 - [x] No full observation copy table is created or written.
145
+- [x] A user-visible capture has one archive observation id that SQL diff/cache/UI layers can reference.
143 146
 
144 147
 ## Milestone 5 - SQL Analysis Layer
145 148
 
+1 -0
HealthProbe/Models/HealthSnapshot.swift
@@ -24,6 +24,7 @@ import SwiftData
24 24
     var appBuildVersion: String = ""
25 25
     var monitoredTypeSetHash: String = ""
26 26
     var monitoredRegistryVersion: Int = 0
27
+    var archiveObservationID: Int64?
27 28
     var yearlyCountTimezoneIdentifier: String = ""
28 29
     var cachedSummaryVersion: Int = 0
29 30
     var cachedTypeCount: Int = 0
+55 -12
HealthProbe/Services/HealthKitService.swift
@@ -96,6 +96,15 @@ final class HealthKitService {
96 96
             deviceName: UIDevice.current.name,
97 97
             deviceID: deviceResolution.id
98 98
         )
99
+        let intendedTypeIDs = active.map { $0.id }
100
+        let monitoredTypeSetHash = HashService.typeSetHash(typeIDs: intendedTypeIDs)
101
+        let archiveObservationID = try await archiveStore.beginObservation(
102
+            observedAt: snapshot.timestamp,
103
+            triggerReason: triggerReason,
104
+            selectedTypeSetHash: monitoredTypeSetHash
105
+        )
106
+        snapshot.archiveObservationID = archiveObservationID
107
+        snapshot.monitoredTypeSetHash = monitoredTypeSetHash
99 108
         snapshot.recoveredDeviceID = deviceResolution.isRecovered
100 109
         snapshot.triggerReason = triggerReason
101 110
         snapshot.retryOfSnapshotID = retryOfSnapshotID
@@ -108,6 +117,7 @@ final class HealthKitService {
108 117
             context: context,
109 118
             snapshot: snapshot,
110 119
             previousSnapshot: previousSnapshot,
120
+            archiveObservationID: archiveObservationID,
111 121
             adaptiveTimeoutsEnabled: adaptiveTimeoutsEnabled,
112 122
             timeoutMultiplier: timeoutMultiplier,
113 123
             progress: progress
@@ -143,7 +153,7 @@ final class HealthKitService {
143 153
         configureSnapshotMetadata(
144 154
             snapshot,
145 155
             typeCounts: typeCounts,
146
-            intendedTypeIDs: active.map { $0.id },
156
+            intendedTypeIDs: intendedTypeIDs,
147 157
             context: context
148 158
         )
149 159
         markContentEquivalenceIfNeeded(
@@ -160,11 +170,30 @@ final class HealthKitService {
160 170
         if snapshot.snapshotQuality == .complete,
161 171
            reviewAmbiguousCompleteDisappearedTypes,
162 172
            hasAmbiguousCompleteDisappearance(snapshot: snapshot, typeCounts: typeCounts, context: context) {
173
+            try await archiveStore.finishObservation(
174
+                observationID: archiveObservationID,
175
+                status: "needs_review",
176
+                endedAt: Date()
177
+            )
163 178
             return snapshot
164 179
         }
165 180
 
166
-        if snapshot.snapshotQuality == .complete {
167
-            try await persistSnapshot(snapshot, typeCounts: typeCounts, context: context)
181
+        do {
182
+            if snapshot.snapshotQuality == .complete {
183
+                try await persistSnapshot(snapshot, typeCounts: typeCounts, context: context)
184
+            }
185
+            try await archiveStore.finishObservation(
186
+                observationID: archiveObservationID,
187
+                status: snapshot.snapshotQuality == .complete ? "completed" : "partial_\(snapshot.snapshotQuality.rawValue)",
188
+                endedAt: Date()
189
+            )
190
+        } catch {
191
+            try? await archiveStore.finishObservation(
192
+                observationID: archiveObservationID,
193
+                status: "failed",
194
+                endedAt: Date()
195
+            )
196
+            throw error
168 197
         }
169 198
 
170 199
         return snapshot
@@ -573,6 +602,7 @@ final class HealthKitService {
573 602
         context: ModelContext,
574 603
         snapshot: HealthSnapshot,
575 604
         previousSnapshot: HealthSnapshot?,
605
+        archiveObservationID: Int64,
576 606
         adaptiveTimeoutsEnabled: Bool,
577 607
         timeoutMultiplier: Double,
578 608
         progress: SnapshotFetchProgress? = nil
@@ -592,6 +622,7 @@ final class HealthKitService {
592 622
                 timeoutProfile: profile,
593 623
                 timeoutSeconds: timeout,
594 624
                 previousTypeCount: previousSnapshot?.typeCounts?.first { $0.typeIdentifier == monitoredType.id },
625
+                archiveObservationID: archiveObservationID,
595 626
                 progress: progress
596 627
             )
597 628
             updateTimeoutProfile(profile, with: result, monitoredType: monitoredType)
@@ -609,6 +640,7 @@ final class HealthKitService {
609 640
         timeoutProfile: MetricTimeoutProfile,
610 641
         timeoutSeconds: TimeInterval,
611 642
         previousTypeCount: TypeCount?,
643
+        archiveObservationID: Int64,
612 644
         progress: SnapshotFetchProgress? = nil
613 645
     ) async -> TypeCountFetchResult {
614 646
         let started = Date()
@@ -647,6 +679,7 @@ final class HealthKitService {
647 679
             sampleType: sampleType,
648 680
             timeoutSeconds: timeoutSeconds,
649 681
             previousTypeCount: previousTypeCount,
682
+            archiveObservationID: archiveObservationID,
650 683
             progress: progress
651 684
         )
652 685
         result.totalElapsedSeconds = Date().timeIntervalSince(started)
@@ -672,6 +705,7 @@ final class HealthKitService {
672 705
         sampleType: HKSampleType,
673 706
         timeoutSeconds: TimeInterval,
674 707
         previousTypeCount: TypeCount?,
708
+        archiveObservationID: Int64,
675 709
         progress: SnapshotFetchProgress?
676 710
     ) async -> TypeCountFetchResult {
677 711
         let dateDeadline = Date().addingTimeInterval(timeoutSeconds)
@@ -730,6 +764,7 @@ final class HealthKitService {
730 764
                 earliestDate: earliest,
731 765
                 latestDate: latest,
732 766
                 previousDistribution: previousDistribution,
767
+                archiveObservationID: archiveObservationID,
733 768
                 progress: progress
734 769
             )
735 770
         } resultDescription: { distribution in
@@ -826,6 +861,7 @@ final class HealthKitService {
826 861
         earliestDate: Date?,
827 862
         latestDate: Date?,
828 863
         previousDistribution: PreviousDistributionState,
864
+        archiveObservationID: Int64,
829 865
         progress: SnapshotFetchProgress?
830 866
     ) async throws -> SampleDistribution {
831 867
         var anchor = previousDistribution.globalAnchor
@@ -859,7 +895,7 @@ final class HealthKitService {
859 895
                     anchor: anchor
860 896
                 )
861 897
             }
862
-            try await archivePage(page, sampleType: sampleType)
898
+            try await archivePage(page, sampleType: sampleType, observationID: archiveObservationID)
863 899
             anchor = page.anchor
864 900
 
865 901
             if page.samples.isEmpty, page.deletedObjects.isEmpty,
@@ -869,7 +905,11 @@ final class HealthKitService {
869 905
                     earliestDate: earliestDate,
870 906
                     latestDate: latestDate
871 907
                ) {
872
-                try await archiveStore.markVerification(sampleType: sampleType, verifiedAt: Date())
908
+                try await archiveStore.markVerification(
909
+                    sampleType: sampleType,
910
+                    verifiedAt: Date(),
911
+                    observationID: archiveObservationID
912
+                )
873 913
                 progress?.updateBlockProgress(
874 914
                     typeIdentifier,
875 915
                     detail: "No HealthKit delta",
@@ -890,6 +930,7 @@ final class HealthKitService {
890 930
                 earliestDate: earliestDate,
891 931
                 latestDate: latestDate,
892 932
                 progressStarted: progressStarted,
933
+                archiveObservationID: archiveObservationID,
893 934
                 progress: progress
894 935
             )
895 936
         }
@@ -941,7 +982,7 @@ final class HealthKitService {
941 982
                     anchor: anchor
942 983
                 )
943 984
             }
944
-            try await archivePage(page, sampleType: sampleType)
985
+            try await archivePage(page, sampleType: sampleType, observationID: archiveObservationID)
945 986
             anchor = page.anchor
946 987
 
947 988
             applyDistributionPage(page, sampleType: sampleType, to: &recordMap)
@@ -963,7 +1004,7 @@ final class HealthKitService {
963 1004
             )
964 1005
         }
965 1006
 
966
-        try await archiveStore.markVerification(sampleType: sampleType, verifiedAt: Date())
1007
+        try await archiveStore.markVerification(sampleType: sampleType, verifiedAt: Date(), observationID: archiveObservationID)
967 1008
 
968 1009
         let sortedKeys = recordMap.keys.sorted {
969 1010
             guard let left = recordMap[$0],
@@ -1040,6 +1081,7 @@ final class HealthKitService {
1040 1081
         earliestDate: Date?,
1041 1082
         latestDate: Date?,
1042 1083
         progressStarted: Date,
1084
+        archiveObservationID: Int64,
1043 1085
         progress: SnapshotFetchProgress?
1044 1086
     ) async throws -> SampleDistribution {
1045 1087
         var anchor: HKQueryAnchor?
@@ -1078,7 +1120,7 @@ final class HealthKitService {
1078 1120
                     anchor: anchor
1079 1121
                 )
1080 1122
             }
1081
-            try await archivePage(page, sampleType: sampleType)
1123
+            try await archivePage(page, sampleType: sampleType, observationID: archiveObservationID)
1082 1124
             anchor = page.anchor
1083 1125
 
1084 1126
             for sample in page.samples {
@@ -1122,7 +1164,7 @@ final class HealthKitService {
1122 1164
             )
1123 1165
         )
1124 1166
 
1125
-        try await archiveStore.markVerification(sampleType: sampleType, verifiedAt: Date())
1167
+        try await archiveStore.markVerification(sampleType: sampleType, verifiedAt: Date(), observationID: archiveObservationID)
1126 1168
 
1127 1169
         guard recordCount > 0 || anchor != nil else {
1128 1170
             return SampleDistribution(
@@ -1266,14 +1308,15 @@ final class HealthKitService {
1266 1308
         }
1267 1309
     }
1268 1310
 
1269
-    private func archivePage(_ page: SampleDistributionPage, sampleType: HKSampleType) async throws {
1311
+    private func archivePage(_ page: SampleDistributionPage, sampleType: HKSampleType, observationID: Int64) async throws {
1270 1312
         let observedAt = Date()
1271
-        _ = try await archiveStore.upsertSamples(page.samples, observedAt: observedAt)
1313
+        _ = try await archiveStore.upsertSamples(page.samples, observedAt: observedAt, observationID: observationID)
1272 1314
         for deletedObject in page.deletedObjects {
1273 1315
             try await archiveStore.recordDisappearance(
1274 1316
                 sampleUUIDHash: HashService.sampleUUIDHash(deletedObject.uuid.uuidString),
1275 1317
                 sampleTypeIdentifier: sampleType.identifier,
1276
-                observedMissingAt: observedAt
1318
+                observedMissingAt: observedAt,
1319
+                observationID: observationID
1277 1320
             )
1278 1321
         }
1279 1322
     }
+6 -1
HealthProbe/Services/Protocols/HealthArchiveStore.swift
@@ -1,11 +1,16 @@
1 1
 import Foundation
2 2
 import HealthKit
3 3
 
4
-// Interface updated 2026-05-23 — see AGENTS.md
4
+// Interface updated 2026-05-24 — see AGENTS.md
5 5
 protocol HealthArchiveStore {
6
+    func beginObservation(observedAt: Date, triggerReason: String, selectedTypeSetHash: String?) async throws -> Int64
7
+    func finishObservation(observationID: Int64, status: String, endedAt: Date) async throws
6 8
     func upsertSamples(_ samples: [HKSample], observedAt: Date) async throws -> HealthArchiveWriteSummary
9
+    func upsertSamples(_ samples: [HKSample], observedAt: Date, observationID: Int64) async throws -> HealthArchiveWriteSummary
7 10
     func markVerification(sampleType: HKSampleType, verifiedAt: Date) async throws
11
+    func markVerification(sampleType: HKSampleType, verifiedAt: Date, observationID: Int64) async throws
8 12
     func recordDisappearance(sampleUUIDHash: String, sampleTypeIdentifier: String, observedMissingAt: Date) async throws
13
+    func recordDisappearance(sampleUUIDHash: String, sampleTypeIdentifier: String, observedMissingAt: Date, observationID: Int64) async throws
9 14
     func records(for request: HealthArchiveRecordRequest) async throws -> [ArchivedHealthRecord]
10 15
     func diffSummary(_ request: HealthArchiveDiffRequest) async throws -> HealthArchiveDiffSummary
11 16
     func diffRecords(_ request: HealthArchiveDiffRecordRequest) async throws -> [ArchivedHealthRecord]
+298 -67
HealthProbe/Services/SQLiteHealthArchiveStore.swift
@@ -10,7 +10,7 @@ private enum SQLiteHealthArchiveStoreError: Error {
10 10
     case exportEncodingFailed
11 11
 }
12 12
 
13
-// Interface updated 2026-05-18 — see AGENTS.md
13
+// Interface updated 2026-05-24 — see AGENTS.md
14 14
 actor SQLiteHealthArchiveStore: HealthArchiveStore {
15 15
     static let shared = SQLiteHealthArchiveStore()
16 16
     nonisolated static let defaultDatabaseURL = URL.applicationSupportDirectory.appending(path: "HealthProbeArchive.sqlite")
@@ -44,6 +44,46 @@ actor SQLiteHealthArchiveStore: HealthArchiveStore {
44 44
         self.databaseURL = databaseURL ?? Self.defaultDatabaseURL
45 45
     }
46 46
 
47
+    func beginObservation(observedAt: Date, triggerReason: String, selectedTypeSetHash: String?) async throws -> Int64 {
48
+        let db = try openDatabase()
49
+        defer { sqlite3_close(db) }
50
+        try prepareSchemaIfNeeded(db)
51
+        try execute("BEGIN IMMEDIATE TRANSACTION", db: db)
52
+        do {
53
+            let observationID = try createObservation(
54
+                observedAt: observedAt,
55
+                triggerReason: triggerReason,
56
+                status: "running",
57
+                selectedTypeSetHash: selectedTypeSetHash,
58
+                db: db
59
+            )
60
+            try execute("COMMIT", db: db)
61
+            return observationID
62
+        } catch {
63
+            try? execute("ROLLBACK", db: db)
64
+            throw error
65
+        }
66
+    }
67
+
68
+    func finishObservation(observationID: Int64, status: String, endedAt: Date) async throws {
69
+        let db = try openDatabase()
70
+        defer { sqlite3_close(db) }
71
+        try prepareSchemaIfNeeded(db)
72
+        try execute("BEGIN IMMEDIATE TRANSACTION", db: db)
73
+        do {
74
+            try updateObservationStatus(
75
+                observationID: observationID,
76
+                status: status,
77
+                endedAt: endedAt,
78
+                db: db
79
+            )
80
+            try execute("COMMIT", db: db)
81
+        } catch {
82
+            try? execute("ROLLBACK", db: db)
83
+            throw error
84
+        }
85
+    }
86
+
47 87
     func upsertSamples(_ samples: [HKSample], observedAt: Date) async throws -> HealthArchiveWriteSummary {
48 88
         guard !samples.isEmpty else {
49 89
             return HealthArchiveWriteSummary(insertedCount: 0, updatedCount: 0, unchangedCount: 0)
@@ -64,6 +104,26 @@ actor SQLiteHealthArchiveStore: HealthArchiveStore {
64 104
         }
65 105
     }
66 106
 
107
+    func upsertSamples(_ samples: [HKSample], observedAt: Date, observationID: Int64) async throws -> HealthArchiveWriteSummary {
108
+        guard !samples.isEmpty else {
109
+            return HealthArchiveWriteSummary(insertedCount: 0, updatedCount: 0, unchangedCount: 0)
110
+        }
111
+
112
+        let db = try openDatabase()
113
+        defer { sqlite3_close(db) }
114
+        try prepareSchemaIfNeeded(db)
115
+        try execute("BEGIN IMMEDIATE TRANSACTION", db: db)
116
+        do {
117
+            let summary = try upsertSamples(samples, observedAt: observedAt, observationID: observationID, db: db)
118
+            try execute("PRAGMA foreign_key_check", db: db)
119
+            try execute("COMMIT", db: db)
120
+            return summary
121
+        } catch {
122
+            try? execute("ROLLBACK", db: db)
123
+            throw error
124
+        }
125
+    }
126
+
67 127
     func markVerification(sampleType: HKSampleType, verifiedAt: Date) async throws {
68 128
         let db = try openDatabase()
69 129
         defer { sqlite3_close(db) }
@@ -76,25 +136,21 @@ actor SQLiteHealthArchiveStore: HealthArchiveStore {
76 136
                 status: "completed",
77 137
                 db: db
78 138
             )
79
-            let sampleTypeID = try upsertSampleType(typeIdentifier: sampleType.identifier, db: db)
80
-            let visibleCount = try visibleAggregate(sampleTypeID: sampleTypeID, db: db).visibleRecordCount
81
-            try insertObservationTypeRun(
82
-                observationID: observationID,
83
-                sampleTypeID: sampleTypeID,
84
-                status: "completed",
85
-                observedAt: verifiedAt,
86
-                insertedEventCount: 0,
87
-                deletedEventCount: 0,
88
-                verifiedVisibleCount: visibleCount,
89
-                db: db
90
-            )
91
-            try rebuildTypeSummary(observationID: observationID, sampleTypeID: sampleTypeID, db: db)
92
-            try rebuildDailyAggregates(
93
-                observationID: observationID,
94
-                sampleTypeID: sampleTypeID,
95
-                observedAt: verifiedAt,
96
-                db: db
97
-            )
139
+            try markVerification(sampleType: sampleType, verifiedAt: verifiedAt, observationID: observationID, db: db)
140
+            try execute("COMMIT", db: db)
141
+        } catch {
142
+            try? execute("ROLLBACK", db: db)
143
+            throw error
144
+        }
145
+    }
146
+
147
+    func markVerification(sampleType: HKSampleType, verifiedAt: Date, observationID: Int64) async throws {
148
+        let db = try openDatabase()
149
+        defer { sqlite3_close(db) }
150
+        try prepareSchemaIfNeeded(db)
151
+        try execute("BEGIN IMMEDIATE TRANSACTION", db: db)
152
+        do {
153
+            try markVerification(sampleType: sampleType, verifiedAt: verifiedAt, observationID: observationID, db: db)
98 154
             try execute("COMMIT", db: db)
99 155
         } catch {
100 156
             try? execute("ROLLBACK", db: db)
@@ -114,33 +170,39 @@ actor SQLiteHealthArchiveStore: HealthArchiveStore {
114 170
                 status: "completed",
115 171
                 db: db
116 172
             )
117
-            if let sampleTypeID = try sampleTypeID(typeIdentifier: sampleTypeIdentifier, db: db),
118
-               let sampleID = try sampleID(sampleUUIDHash: sampleUUIDHash, sampleTypeID: sampleTypeID, db: db) {
119
-                try insertObservationEvent(
120
-                    observationID: observationID,
121
-                    sampleID: sampleID,
122
-                    versionID: nil,
123
-                    eventKind: "disappeared",
124
-                    evidenceKind: "deleted_object",
125
-                    observedAt: observedMissingAt,
126
-                    db: db
127
-                )
128
-                try closeOpenVisibilityRanges(
129
-                    sampleID: sampleID,
130
-                    excludingVersionID: nil,
131
-                    closedAtObservationID: observationID,
132
-                    observedAt: observedMissingAt,
133
-                    db: db
134
-                )
135
-                try rebuildTypeSummary(observationID: observationID, sampleTypeID: sampleTypeID, db: db)
136
-                try rebuildDailyAggregates(
137
-                    observationID: observationID,
138
-                    sampleTypeID: sampleTypeID,
139
-                    observedAt: observedMissingAt,
140
-                    db: db
141
-                )
142
-            }
173
+            try recordDisappearance(
174
+                sampleUUIDHash: sampleUUIDHash,
175
+                sampleTypeIdentifier: sampleTypeIdentifier,
176
+                observedMissingAt: observedMissingAt,
177
+                observationID: observationID,
178
+                db: db
179
+            )
180
+
181
+            try execute("COMMIT", db: db)
182
+        } catch {
183
+            try? execute("ROLLBACK", db: db)
184
+            throw error
185
+        }
186
+    }
143 187
 
188
+    func recordDisappearance(
189
+        sampleUUIDHash: String,
190
+        sampleTypeIdentifier: String,
191
+        observedMissingAt: Date,
192
+        observationID: Int64
193
+    ) async throws {
194
+        let db = try openDatabase()
195
+        defer { sqlite3_close(db) }
196
+        try prepareSchemaIfNeeded(db)
197
+        try execute("BEGIN IMMEDIATE TRANSACTION", db: db)
198
+        do {
199
+            try recordDisappearance(
200
+                sampleUUIDHash: sampleUUIDHash,
201
+                sampleTypeIdentifier: sampleTypeIdentifier,
202
+                observedMissingAt: observedMissingAt,
203
+                observationID: observationID,
204
+                db: db
205
+            )
144 206
             try execute("COMMIT", db: db)
145 207
         } catch {
146 208
             try? execute("ROLLBACK", db: db)
@@ -1323,33 +1385,54 @@ actor SQLiteHealthArchiveStore: HealthArchiveStore {
1323 1385
     }
1324 1386
 
1325 1387
     private func upsertSamples(_ samples: [HKSample], observedAt: Date, db: OpaquePointer?) throws -> HealthArchiveWriteSummary {
1326
-        let rows = samples.map { ArchiveSampleRow(sample: $0, observedAt: observedAt) }
1327 1388
         let observationID = try createObservation(
1328 1389
             observedAt: observedAt,
1329 1390
             triggerReason: "anchored_page",
1330 1391
             status: "completed",
1331 1392
             db: db
1332 1393
         )
1394
+        return try upsertSamples(samples, observedAt: observedAt, observationID: observationID, db: db)
1395
+    }
1333 1396
 
1397
+    private func upsertSamples(
1398
+        _ samples: [HKSample],
1399
+        observedAt: Date,
1400
+        observationID: Int64,
1401
+        db: OpaquePointer?
1402
+    ) throws -> HealthArchiveWriteSummary {
1403
+        let rows = samples.map { ArchiveSampleRow(sample: $0, observedAt: observedAt) }
1334 1404
         var inserted = 0
1335 1405
         var updated = 0
1336 1406
         var unchanged = 0
1337
-        var touchedTypeIDs = Set<Int64>()
1407
+        var typeEventCounts: [Int64: (inserted: Int, deleted: Int)] = [:]
1338 1408
 
1339 1409
         for row in rows {
1340 1410
             let result = try upsertArchiveV2Sample(row, observationID: observationID, db: db)
1341
-            touchedTypeIDs.insert(result.sampleTypeID)
1411
+            var counts = typeEventCounts[result.sampleTypeID, default: (inserted: 0, deleted: 0)]
1342 1412
             switch result.kind {
1343 1413
             case .inserted:
1344 1414
                 inserted += 1
1415
+                counts.inserted += 1
1345 1416
             case .updated:
1346 1417
                 updated += 1
1418
+                counts.inserted += 1
1347 1419
             case .unchanged:
1348 1420
                 unchanged += 1
1349 1421
             }
1422
+            typeEventCounts[result.sampleTypeID] = counts
1350 1423
         }
1351 1424
 
1352
-        for sampleTypeID in touchedTypeIDs {
1425
+        for (sampleTypeID, eventCounts) in typeEventCounts {
1426
+            try insertObservationTypeRun(
1427
+                observationID: observationID,
1428
+                sampleTypeID: sampleTypeID,
1429
+                status: "running",
1430
+                observedAt: observedAt,
1431
+                insertedEventCount: eventCounts.inserted,
1432
+                deletedEventCount: eventCounts.deleted,
1433
+                verifiedVisibleCount: nil,
1434
+                db: db
1435
+            )
1353 1436
             try rebuildTypeSummary(observationID: observationID, sampleTypeID: sampleTypeID, db: db)
1354 1437
             try rebuildDailyAggregates(
1355 1438
                 observationID: observationID,
@@ -1366,6 +1449,80 @@ actor SQLiteHealthArchiveStore: HealthArchiveStore {
1366 1449
         )
1367 1450
     }
1368 1451
 
1452
+    private func markVerification(
1453
+        sampleType: HKSampleType,
1454
+        verifiedAt: Date,
1455
+        observationID: Int64,
1456
+        db: OpaquePointer?
1457
+    ) throws {
1458
+        let sampleTypeID = try upsertSampleType(typeIdentifier: sampleType.identifier, db: db)
1459
+        let visibleCount = try visibleAggregate(sampleTypeID: sampleTypeID, db: db).visibleRecordCount
1460
+        try insertObservationTypeRun(
1461
+            observationID: observationID,
1462
+            sampleTypeID: sampleTypeID,
1463
+            status: "completed",
1464
+            observedAt: verifiedAt,
1465
+            insertedEventCount: 0,
1466
+            deletedEventCount: 0,
1467
+            verifiedVisibleCount: visibleCount,
1468
+            db: db
1469
+        )
1470
+        try rebuildTypeSummary(observationID: observationID, sampleTypeID: sampleTypeID, db: db)
1471
+        try rebuildDailyAggregates(
1472
+            observationID: observationID,
1473
+            sampleTypeID: sampleTypeID,
1474
+            observedAt: verifiedAt,
1475
+            db: db
1476
+        )
1477
+    }
1478
+
1479
+    private func recordDisappearance(
1480
+        sampleUUIDHash: String,
1481
+        sampleTypeIdentifier: String,
1482
+        observedMissingAt: Date,
1483
+        observationID: Int64,
1484
+        db: OpaquePointer?
1485
+    ) throws {
1486
+        guard let sampleTypeID = try sampleTypeID(typeIdentifier: sampleTypeIdentifier, db: db),
1487
+              let sampleID = try sampleID(sampleUUIDHash: sampleUUIDHash, sampleTypeID: sampleTypeID, db: db) else {
1488
+            return
1489
+        }
1490
+
1491
+        try insertObservationEvent(
1492
+            observationID: observationID,
1493
+            sampleID: sampleID,
1494
+            versionID: nil,
1495
+            eventKind: "disappeared",
1496
+            evidenceKind: "deleted_object",
1497
+            observedAt: observedMissingAt,
1498
+            db: db
1499
+        )
1500
+        try closeOpenVisibilityRanges(
1501
+            sampleID: sampleID,
1502
+            excludingVersionID: nil,
1503
+            closedAtObservationID: observationID,
1504
+            observedAt: observedMissingAt,
1505
+            db: db
1506
+        )
1507
+        try insertObservationTypeRun(
1508
+            observationID: observationID,
1509
+            sampleTypeID: sampleTypeID,
1510
+            status: "running",
1511
+            observedAt: observedMissingAt,
1512
+            insertedEventCount: 0,
1513
+            deletedEventCount: 1,
1514
+            verifiedVisibleCount: nil,
1515
+            db: db
1516
+        )
1517
+        try rebuildTypeSummary(observationID: observationID, sampleTypeID: sampleTypeID, db: db)
1518
+        try rebuildDailyAggregates(
1519
+            observationID: observationID,
1520
+            sampleTypeID: sampleTypeID,
1521
+            observedAt: observedMissingAt,
1522
+            db: db
1523
+        )
1524
+    }
1525
+
1369 1526
     private func upsertArchiveV2Sample(
1370 1527
         _ row: ArchiveSampleRow,
1371 1528
         observationID: Int64,
@@ -1430,6 +1587,7 @@ actor SQLiteHealthArchiveStore: HealthArchiveStore {
1430 1587
         observedAt: Date,
1431 1588
         triggerReason: String,
1432 1589
         status: String,
1590
+        selectedTypeSetHash: String? = nil,
1433 1591
         db: OpaquePointer?
1434 1592
     ) throws -> Int64 {
1435 1593
         let deviceChainID = try upsertCurrentDeviceChain(db)
@@ -1439,7 +1597,7 @@ actor SQLiteHealthArchiveStore: HealthArchiveStore {
1439 1597
             device_chain_id, observed_at, started_at, ended_at, status, trigger_reason,
1440 1598
             app_version, os_version, time_zone_identifier, time_zone_seconds_from_gmt,
1441 1599
             schema_version, selected_type_set_hash, notes
1442
-        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL)
1600
+        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL)
1443 1601
         """
1444 1602
         try withStatement(sql, db: db) { statement in
1445 1603
             bindInt64(deviceChainID, to: 1, in: statement)
@@ -1453,6 +1611,7 @@ actor SQLiteHealthArchiveStore: HealthArchiveStore {
1453 1611
             bindText(timeZone.identifier, to: 9, in: statement)
1454 1612
             sqlite3_bind_int(statement, 10, Int32(timeZone.secondsFromGMT(for: observedAt)))
1455 1613
             bindInt(Self.archiveSchemaVersion, to: 11, in: statement)
1614
+            bindText(selectedTypeSetHash, to: 12, in: statement)
1456 1615
             guard sqlite3_step(statement) == SQLITE_DONE else {
1457 1616
                 throw SQLiteHealthArchiveStoreError.stepFailed(lastErrorMessage(db))
1458 1617
             }
@@ -1460,6 +1619,25 @@ actor SQLiteHealthArchiveStore: HealthArchiveStore {
1460 1619
         return sqlite3_last_insert_rowid(db)
1461 1620
     }
1462 1621
 
1622
+    private func updateObservationStatus(
1623
+        observationID: Int64,
1624
+        status: String,
1625
+        endedAt: Date,
1626
+        db: OpaquePointer?
1627
+    ) throws {
1628
+        try withStatement(
1629
+            "UPDATE observations SET status = ?, ended_at = ? WHERE id = ?",
1630
+            db: db
1631
+        ) { statement in
1632
+            bindText(status, to: 1, in: statement)
1633
+            sqlite3_bind_double(statement, 2, endedAt.timeIntervalSince1970)
1634
+            bindInt64(observationID, to: 3, in: statement)
1635
+            guard sqlite3_step(statement) == SQLITE_DONE else {
1636
+                throw SQLiteHealthArchiveStoreError.stepFailed(lastErrorMessage(db))
1637
+            }
1638
+        }
1639
+    }
1640
+
1463 1641
     private func insertObservationTypeRun(
1464 1642
         observationID: Int64,
1465 1643
         sampleTypeID: Int64,
@@ -1470,24 +1648,77 @@ actor SQLiteHealthArchiveStore: HealthArchiveStore {
1470 1648
         verifiedVisibleCount: Int?,
1471 1649
         db: OpaquePointer?
1472 1650
     ) throws {
1473
-        let sql = """
1474
-        INSERT OR REPLACE INTO observation_type_runs (
1475
-            observation_id, sample_type_id, status, started_at, ended_at,
1476
-            inserted_event_count, deleted_event_count, verified_visible_count
1477
-        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
1478
-        """
1479
-        try withStatement(sql, db: db) { statement in
1651
+        if let existing = try observationTypeRunCounts(
1652
+            observationID: observationID,
1653
+            sampleTypeID: sampleTypeID,
1654
+            db: db
1655
+        ) {
1656
+            try withStatement(
1657
+                """
1658
+                UPDATE observation_type_runs
1659
+                SET status = ?,
1660
+                    started_at = COALESCE(started_at, ?),
1661
+                    ended_at = ?,
1662
+                    inserted_event_count = ?,
1663
+                    deleted_event_count = ?,
1664
+                    verified_visible_count = COALESCE(?, verified_visible_count)
1665
+                WHERE observation_id = ? AND sample_type_id = ?
1666
+                """,
1667
+                db: db
1668
+            ) { statement in
1669
+                bindText(status, to: 1, in: statement)
1670
+                sqlite3_bind_double(statement, 2, observedAt.timeIntervalSince1970)
1671
+                sqlite3_bind_double(statement, 3, observedAt.timeIntervalSince1970)
1672
+                bindInt(existing.inserted + insertedEventCount, to: 4, in: statement)
1673
+                bindInt(existing.deleted + deletedEventCount, to: 5, in: statement)
1674
+                bindInt(verifiedVisibleCount, to: 6, in: statement)
1675
+                bindInt64(observationID, to: 7, in: statement)
1676
+                bindInt64(sampleTypeID, to: 8, in: statement)
1677
+                guard sqlite3_step(statement) == SQLITE_DONE else {
1678
+                    throw SQLiteHealthArchiveStoreError.stepFailed(lastErrorMessage(db))
1679
+                }
1680
+            }
1681
+        } else {
1682
+            let sql = """
1683
+            INSERT INTO observation_type_runs (
1684
+                observation_id, sample_type_id, status, started_at, ended_at,
1685
+                inserted_event_count, deleted_event_count, verified_visible_count
1686
+            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
1687
+            """
1688
+            try withStatement(sql, db: db) { statement in
1689
+                bindInt64(observationID, to: 1, in: statement)
1690
+                bindInt64(sampleTypeID, to: 2, in: statement)
1691
+                bindText(status, to: 3, in: statement)
1692
+                sqlite3_bind_double(statement, 4, observedAt.timeIntervalSince1970)
1693
+                sqlite3_bind_double(statement, 5, observedAt.timeIntervalSince1970)
1694
+                bindInt(insertedEventCount, to: 6, in: statement)
1695
+                bindInt(deletedEventCount, to: 7, in: statement)
1696
+                bindInt(verifiedVisibleCount, to: 8, in: statement)
1697
+                guard sqlite3_step(statement) == SQLITE_DONE else {
1698
+                    throw SQLiteHealthArchiveStoreError.stepFailed(lastErrorMessage(db))
1699
+                }
1700
+            }
1701
+        }
1702
+    }
1703
+
1704
+    private func observationTypeRunCounts(
1705
+        observationID: Int64,
1706
+        sampleTypeID: Int64,
1707
+        db: OpaquePointer?
1708
+    ) throws -> (inserted: Int, deleted: Int)? {
1709
+        try withStatement(
1710
+            """
1711
+            SELECT inserted_event_count, deleted_event_count
1712
+            FROM observation_type_runs
1713
+            WHERE observation_id = ? AND sample_type_id = ?
1714
+            LIMIT 1
1715
+            """,
1716
+            db: db
1717
+        ) { statement in
1480 1718
             bindInt64(observationID, to: 1, in: statement)
1481 1719
             bindInt64(sampleTypeID, to: 2, in: statement)
1482
-            bindText(status, to: 3, in: statement)
1483
-            sqlite3_bind_double(statement, 4, observedAt.timeIntervalSince1970)
1484
-            sqlite3_bind_double(statement, 5, observedAt.timeIntervalSince1970)
1485
-            bindInt(insertedEventCount, to: 6, in: statement)
1486
-            bindInt(deletedEventCount, to: 7, in: statement)
1487
-            bindInt(verifiedVisibleCount, to: 8, in: statement)
1488
-            guard sqlite3_step(statement) == SQLITE_DONE else {
1489
-                throw SQLiteHealthArchiveStoreError.stepFailed(lastErrorMessage(db))
1490
-            }
1720
+            guard sqlite3_step(statement) == SQLITE_ROW else { return nil }
1721
+            return (Int(sqlite3_column_int(statement, 0)), Int(sqlite3_column_int(statement, 1)))
1491 1722
         }
1492 1723
     }
1493 1724
 
+52 -1
HealthProbeTests/SQLiteHealthArchiveStoreTests.swift
@@ -92,7 +92,8 @@ final class SQLiteHealthArchiveStoreTests: XCTestCase {
92 92
         let observationIDs = try observationIDs(at: url)
93 93
 
94 94
         XCTAssertEqual(observationIDs.count, 2)
95
-        XCTAssertEqual(try countRows(in: "observation_type_runs", at: url), 1)
95
+        XCTAssertEqual(try countRows(in: "observation_type_runs", at: url), 2)
96
+        XCTAssertEqual(try countRows(in: "observation_type_runs WHERE observation_id = \(observationIDs[0]) AND inserted_event_count = 1", at: url), 1)
96 97
         XCTAssertEqual(try countRows(in: "observation_type_summaries WHERE observation_id = \(observationIDs[1]) AND visible_record_count = 1", at: url), 1)
97 98
         XCTAssertFalse(try tableExists("archive_samples", at: url))
98 99
     }
@@ -150,6 +151,56 @@ final class SQLiteHealthArchiveStoreTests: XCTestCase {
150 151
         XCTAssertNotNil(disappearedRecords.first?.disappearedAt)
151 152
     }
152 153
 
154
+    func testGroupedObservationKeepsPageWritesDeletesAndVerificationTogether() async throws {
155
+        let url = databaseURL()
156
+        let store = SQLiteHealthArchiveStore(databaseURL: url)
157
+        let firstSample = makeStepCountSample(value: 42, start: 1_000)
158
+        let secondSample = makeStepCountSample(value: 7, start: 2_000)
159
+        let typeIdentifier = HKQuantityTypeIdentifier.stepCount.rawValue
160
+
161
+        _ = try await store.upsertSamples([firstSample], observedAt: Date(timeIntervalSince1970: 3_000))
162
+        let observationID = try await store.beginObservation(
163
+            observedAt: Date(timeIntervalSince1970: 3_060),
164
+            triggerReason: "manual",
165
+            selectedTypeSetHash: "selected-types"
166
+        )
167
+        _ = try await store.upsertSamples(
168
+            [secondSample],
169
+            observedAt: Date(timeIntervalSince1970: 3_060),
170
+            observationID: observationID
171
+        )
172
+        try await store.recordDisappearance(
173
+            sampleUUIDHash: HashService.sampleUUIDHash(firstSample.uuid.uuidString),
174
+            sampleTypeIdentifier: typeIdentifier,
175
+            observedMissingAt: Date(timeIntervalSince1970: 3_060),
176
+            observationID: observationID
177
+        )
178
+        try await store.markVerification(
179
+            sampleType: secondSample.sampleType,
180
+            verifiedAt: Date(timeIntervalSince1970: 3_060),
181
+            observationID: observationID
182
+        )
183
+        try await store.finishObservation(
184
+            observationID: observationID,
185
+            status: "completed",
186
+            endedAt: Date(timeIntervalSince1970: 3_070)
187
+        )
188
+
189
+        let observationIDs = try observationIDs(at: url)
190
+        let summary = try await store.diffSummary(HealthArchiveDiffRequest(
191
+            fromObservationID: observationIDs[0],
192
+            toObservationID: observationID,
193
+            sampleTypeIdentifier: typeIdentifier
194
+        ))
195
+
196
+        XCTAssertEqual(observationIDs.count, 2)
197
+        XCTAssertEqual(summary.appearedCount, 1)
198
+        XCTAssertEqual(summary.disappearedCount, 1)
199
+        XCTAssertEqual(try countRows(in: "observations WHERE id = \(observationID) AND status = 'completed' AND selected_type_set_hash = 'selected-types'", at: url), 1)
200
+        XCTAssertEqual(try countRows(in: "sample_observation_events WHERE observation_id = \(observationID)", at: url), 2)
201
+        XCTAssertEqual(try countRows(in: "observation_type_runs WHERE observation_id = \(observationID) AND inserted_event_count = 1 AND deleted_event_count = 1 AND verified_visible_count = 1 AND status = 'completed'", at: url), 1)
202
+    }
203
+
153 204
     func testLargeSyntheticDiffReturnsCountsAndBoundedPages() async throws {
154 205
         let url = databaseURL()
155 206
         let store = SQLiteHealthArchiveStore(databaseURL: url)