Showing 3 changed files with 155 additions and 49 deletions
+2 -2
HealthProbe/Doc/04-project/IMPLEMENTATION_STATUS.md
@@ -24,8 +24,8 @@ There are no real deployments, only test installations. Existing prototype datab
24 24
 | Area | Current Status | Target / Next Work |
25 25
 |------|----------------|--------------------|
26 26
 | Product docs | Updated | Keep `HealthProbe/Doc/README.md` as canonical index |
27
-| HealthKit capture | Capture now opens one archive observation per user-visible snapshot, attaches HealthKit pages, deleted-object evidence, and type verification to that observation id before finishing it, no longer aborts initial full-history imports after a fixed 30-minute wall-clock cap while page-level HealthKit timeouts remain in place, defers grouped observation summary/daily aggregate rebuilds until per-type verification instead of rebuilding after every imported page, and persists large HealthKit pages in smaller archive chunks while using a smaller anchored-query page size to avoid long monolithic SQLite stalls | Continue moving UI/cache reads to archive-backed observation ids and revisit fully adaptive paging/background collection separately |
28
-| SQLite archive | Archive v2 schema, snapshot-level observation grouping, differential write path, v2 verification/delete bookkeeping, daily aggregate rebuilds, integrity report, v2 record reads, SQL diff/count/aggregate/provenance/consolidation-evidence APIs, large synthetic diff pagination coverage, formal timing/memory metrics, and XCTest coverage are in place; the legacy `archive_samples` mirror has been removed, and the hot write path now reuses prepared SQLite statements within grouped page writes instead of reparsing the same SQL for every sample | Continue moving capture/Dashboard actions to archive/cache DTOs |
27
+| HealthKit capture | Capture now opens one archive observation per user-visible snapshot, attaches HealthKit pages, deleted-object evidence, and type verification to that observation id before finishing it, no longer aborts initial full-history imports after a fixed 30-minute wall-clock cap while page-level HealthKit timeouts remain in place, defers grouped observation summary/daily aggregate rebuilds until per-type verification instead of rebuilding after every imported page, and persists large HealthKit pages in smaller archive chunks while using smaller and more conservative initial-import page sizes for high-volume types, adaptive write chunk sizing, explicit task yields, and lower-allocation streaming loops to avoid long monolithic SQLite stalls | Continue moving UI/cache reads to archive-backed observation ids and revisit full checkpoint/resume and background collection separately |
28
+| SQLite archive | Archive v2 schema, snapshot-level observation grouping, differential write path, v2 verification/delete bookkeeping, daily aggregate rebuilds, integrity report, v2 record reads, SQL diff/count/aggregate/provenance/consolidation-evidence APIs, large synthetic diff pagination coverage, formal timing/memory metrics, and XCTest coverage are in place; the legacy `archive_samples` mirror has been removed, the hot write path now reuses prepared SQLite statements within grouped page writes instead of reparsing the same SQL for every sample, processes sample rows in a lower-allocation streaming loop, and opens SQLite connections with import-friendly busy timeout / synchronous / temp-store pragmas | Continue moving capture/Dashboard actions to archive/cache DTOs |
29 29
 | Core Data cache | Initial programmatic Core Data model, full-cache rebuild service, read DTOs for observation/type/diff/health rows, and Dashboard archive-cache status wiring are in place | Move remaining export/report paths to cache DTOs and add targeted partial invalidation |
30 30
 | SwiftData cache | Exists; test builds now reset legacy prototype UI/archive/cache stores once for archive v2 so old SwiftData-only snapshots are not treated as backed-up observations. Metric timeout calibration, local device profile settings, operation logging, ContentView preview, Settings data maintenance, legacy detail/PDF views, unused legacy repair/observer services, Dashboard view/view-model access, and legacy anomaly/count-drop review have moved outside SwiftData or been removed. Remaining SwiftData imports are inventoried in [`SwiftData-Retirement-Inventory.md`](SwiftData-Retirement-Inventory.md) | Treat as disposable prototype data; stop returning/storing `HealthSnapshot` bridge handles before removing `ModelContainer` |
31 31
 | UI | Prototype exists; Dashboard status reads archive/cache observation rows and shows cache health, and Dashboard view/view-model code no longer imports SwiftData or reads `ModelContext`; capture/review actions now route through DTOs and snapshot ids, with the remaining legacy bridge isolated in `HealthKitService`. Snapshots and Data Types tab roots no longer import SwiftData, load Core Data cached observation rows, and open archive/cache-backed detail rows; `SnapshotArchiveDetailView` and `DataTypeArchiveDetailView` read Core Data type/diff summaries and page record drill-down through SQLite; unused legacy SwiftData snapshot/type detail and PDF views have been deleted; record-change evolution and temporal distribution screens now receive DTO rows/cache input instead of querying SwiftData directly; export preview reads the archive export API before showing/exporting JSON; simplified detail mode replaces heavy charts with summary rows on small/accessibility layouts or when enabled in Settings; visible change labels now use neutral new/missing/change-review language | Stop writing prototype `HealthSnapshot` bridge rows during capture/review |
+134 -39
HealthProbe/Services/HealthKitService.swift
@@ -1070,8 +1070,13 @@ final class HealthKitService {
1070 1070
     ) async throws -> SampleDistribution {
1071 1071
         var anchor = previousDistribution.globalAnchor
1072 1072
         let startedFromAnchor = anchor != nil
1073
+        let incrementalStrategy = DistributionCaptureConfiguration.incrementalStrategy
1074
+        var persistenceState = incrementalStrategy.makePersistenceState()
1073 1075
         let estimatedPageCount = startedFromAnchor
1074
-            ? Self.estimatedPageCount(for: previousDistribution.count)
1076
+            ? Self.estimatedPageCount(
1077
+                for: previousDistribution.count,
1078
+                pageLimit: incrementalStrategy.queryPageLimit
1079
+            )
1075 1080
             : nil
1076 1081
         let progressStarted = Date()
1077 1082
         var pageNumber = 0
@@ -1096,10 +1101,11 @@ final class HealthKitService {
1096 1101
                 try await self.fetchDistributionPage(
1097 1102
                     for: sampleType,
1098 1103
                     predicate: nil,
1099
-                    anchor: anchor
1104
+                    anchor: anchor,
1105
+                    limit: incrementalStrategy.queryPageLimit
1100 1106
                 )
1101 1107
             }
1102
-            try await archivePage(
1108
+            persistenceState = try await archivePage(
1103 1109
                 page,
1104 1110
                 sampleType: sampleType,
1105 1111
                 observationID: archiveObservationID,
@@ -1108,7 +1114,8 @@ final class HealthKitService {
1108 1114
                 progressDetail: "Persisting delta page \(pageNumber)",
1109 1115
                 recordCount: previousDistribution.count,
1110 1116
                 progressStarted: progressStarted,
1111
-                processedEventCount: processedEventCount
1117
+                processedEventCount: processedEventCount,
1118
+                persistenceState: persistenceState
1112 1119
             )
1113 1120
             anchor = page.anchor
1114 1121
 
@@ -1155,7 +1162,7 @@ final class HealthKitService {
1155 1162
         if let firstDeltaPage {
1156 1163
             applyDistributionPage(firstDeltaPage, sampleType: sampleType, to: &recordMap)
1157 1164
             processedEventCount += pageEventCount(firstDeltaPage)
1158
-            shouldFetchNextPage = firstDeltaPage.samples.count + firstDeltaPage.deletedObjects.count >= DistributionCaptureConfiguration.queryPageLimit
1165
+            shouldFetchNextPage = firstDeltaPage.samples.count + firstDeltaPage.deletedObjects.count >= incrementalStrategy.queryPageLimit
1159 1166
             progress?.updateBlockProgress(
1160 1167
                 typeIdentifier,
1161 1168
                 detail: Self.pageProgressDetail(
@@ -1193,10 +1200,11 @@ final class HealthKitService {
1193 1200
                 try await self.fetchDistributionPage(
1194 1201
                     for: sampleType,
1195 1202
                     predicate: nil,
1196
-                    anchor: anchor
1203
+                    anchor: anchor,
1204
+                    limit: incrementalStrategy.queryPageLimit
1197 1205
                 )
1198 1206
             }
1199
-            try await archivePage(
1207
+            persistenceState = try await archivePage(
1200 1208
                 page,
1201 1209
                 sampleType: sampleType,
1202 1210
                 observationID: archiveObservationID,
@@ -1205,13 +1213,14 @@ final class HealthKitService {
1205 1213
                 progressDetail: "Persisting delta page \(pageNumber)",
1206 1214
                 recordCount: recordMap.count,
1207 1215
                 progressStarted: progressStarted,
1208
-                processedEventCount: processedEventCount
1216
+                processedEventCount: processedEventCount,
1217
+                persistenceState: persistenceState
1209 1218
             )
1210 1219
             anchor = page.anchor
1211 1220
 
1212 1221
             applyDistributionPage(page, sampleType: sampleType, to: &recordMap)
1213 1222
             processedEventCount += pageEventCount(page)
1214
-            shouldFetchNextPage = page.samples.count + page.deletedObjects.count >= DistributionCaptureConfiguration.queryPageLimit
1223
+            shouldFetchNextPage = page.samples.count + page.deletedObjects.count >= incrementalStrategy.queryPageLimit
1215 1224
             progress?.updateBlockProgress(
1216 1225
                 typeIdentifier,
1217 1226
                 detail: Self.pageProgressDetail(
@@ -1308,6 +1317,8 @@ final class HealthKitService {
1308 1317
         archiveObservationID: Int64,
1309 1318
         progress: SnapshotFetchProgress?
1310 1319
     ) async throws -> SampleDistribution {
1320
+        let importStrategy = DistributionCaptureConfiguration.initialImportStrategy(for: typeIdentifier)
1321
+        var persistenceState = importStrategy.makePersistenceState()
1311 1322
         var anchor: HKQueryAnchor?
1312 1323
         var pageNumber = 0
1313 1324
         var recordCount = 0
@@ -1341,10 +1352,11 @@ final class HealthKitService {
1341 1352
                 try await self.fetchDistributionPage(
1342 1353
                     for: sampleType,
1343 1354
                     predicate: nil,
1344
-                    anchor: anchor
1355
+                    anchor: anchor,
1356
+                    limit: importStrategy.queryPageLimit
1345 1357
                 )
1346 1358
             }
1347
-            try await archivePage(
1359
+            persistenceState = try await archivePage(
1348 1360
                 page,
1349 1361
                 sampleType: sampleType,
1350 1362
                 observationID: archiveObservationID,
@@ -1353,22 +1365,25 @@ final class HealthKitService {
1353 1365
                 progressDetail: "Persisting import page \(pageNumber)",
1354 1366
                 recordCount: recordCount,
1355 1367
                 progressStarted: progressStarted,
1356
-                processedEventCount: processedEventCount
1368
+                processedEventCount: processedEventCount,
1369
+                persistenceState: persistenceState
1357 1370
             )
1358 1371
             anchor = page.anchor
1359 1372
 
1360 1373
             for sample in page.samples {
1361
-                let value = Self.recordValue(for: sample, sampleType: sampleType, typeIdentifier: typeIdentifier)
1362
-                archiveWriter.append(value)
1363
-                hashBuilder.append(recordFingerprint: value.recordFingerprint)
1364
-                yearMap[Calendar.current.component(.year, from: value.startDate), default: 0] += 1
1365
-                firstRecordDate = min(firstRecordDate ?? value.startDate, value.startDate)
1366
-                latestRecordDate = max(latestRecordDate ?? value.endDate, value.endDate)
1367
-                recordCount += 1
1374
+                autoreleasepool {
1375
+                    let value = Self.recordValue(for: sample, sampleType: sampleType, typeIdentifier: typeIdentifier)
1376
+                    archiveWriter.append(value)
1377
+                    hashBuilder.append(recordFingerprint: value.recordFingerprint)
1378
+                    yearMap[Calendar.current.component(.year, from: value.startDate), default: 0] += 1
1379
+                    firstRecordDate = min(firstRecordDate ?? value.startDate, value.startDate)
1380
+                    latestRecordDate = max(latestRecordDate ?? value.endDate, value.endDate)
1381
+                    recordCount += 1
1382
+                }
1368 1383
             }
1369 1384
 
1370 1385
             processedEventCount += pageEventCount(page)
1371
-            shouldFetchNextPage = page.samples.count + page.deletedObjects.count >= DistributionCaptureConfiguration.queryPageLimit
1386
+            shouldFetchNextPage = page.samples.count + page.deletedObjects.count >= importStrategy.queryPageLimit
1372 1387
             let elapsedAfterPage = Date().timeIntervalSince(progressStarted)
1373 1388
             progress?.updateBlockProgress(
1374 1389
                 typeIdentifier,
@@ -1384,6 +1399,7 @@ final class HealthKitService {
1384 1399
                     elapsedSeconds: elapsedAfterPage
1385 1400
                 )
1386 1401
             )
1402
+            await Task.yield()
1387 1403
         }
1388 1404
 
1389 1405
         let contentHash = hashBuilder.finalize()
@@ -1478,10 +1494,10 @@ final class HealthKitService {
1478 1494
         page.samples.count + page.deletedObjects.count
1479 1495
     }
1480 1496
 
1481
-    private static func estimatedPageCount(for recordCount: Int) -> Int {
1497
+    private static func estimatedPageCount(for recordCount: Int, pageLimit: Int) -> Int {
1482 1498
         let count = max(recordCount, 0)
1483
-        let pageLimit = DistributionCaptureConfiguration.queryPageLimit
1484
-        return max(1, (count + pageLimit - 1) / pageLimit)
1499
+        let normalizedPageLimit = max(pageLimit, 1)
1500
+        return max(1, (count + normalizedPageLimit - 1) / normalizedPageLimit)
1485 1501
     }
1486 1502
 
1487 1503
     private static func samplesPerSecond(processedCount: Int, elapsedSeconds: TimeInterval) -> Double {
@@ -1508,10 +1524,12 @@ final class HealthKitService {
1508 1524
     private func fetchDistributionPage(
1509 1525
         for sampleType: HKSampleType,
1510 1526
         predicate: NSPredicate?,
1511
-        anchor: HKQueryAnchor?
1527
+        anchor: HKQueryAnchor?,
1528
+        limit: Int
1512 1529
     ) async throws -> SampleDistributionPage {
1513 1530
         let box = HealthKitQueryContinuationBox<SampleDistributionPage>()
1514 1531
         nonisolated(unsafe) let queryPredicate = predicate
1532
+        let queryLimit = max(limit, 1)
1515 1533
         return try await withTaskCancellationHandler {
1516 1534
             try await withCheckedThrowingContinuation { continuation in
1517 1535
                 box.setContinuation(continuation)
@@ -1519,7 +1537,7 @@ final class HealthKitService {
1519 1537
                     type: sampleType,
1520 1538
                     predicate: queryPredicate,
1521 1539
                     anchor: anchor,
1522
-                    limit: DistributionCaptureConfiguration.queryPageLimit
1540
+                    limit: queryLimit
1523 1541
                 ) { _, samples, deletedObjects, newAnchor, error in
1524 1542
                     if let error {
1525 1543
                         box.resume(throwing: error)
@@ -1551,27 +1569,27 @@ final class HealthKitService {
1551 1569
         progressDetail: String? = nil,
1552 1570
         recordCount: Int? = nil,
1553 1571
         progressStarted: Date? = nil,
1554
-        processedEventCount: Int? = nil
1555
-    ) async throws {
1572
+        processedEventCount: Int? = nil,
1573
+        persistenceState: DistributionPagePersistenceState
1574
+    ) async throws -> DistributionPagePersistenceState {
1575
+        var persistenceState = persistenceState
1556 1576
         let observedAt = Date()
1557 1577
         if !page.samples.isEmpty {
1558
-            let totalSampleBatches = max(
1559
-                1,
1560
-                Int(ceil(Double(page.samples.count) / Double(DistributionCaptureConfiguration.archiveWriteChunkSize)))
1561
-            )
1562
-            var batchNumber = 0
1563 1578
             var batchStart = 0
1564 1579
             while batchStart < page.samples.count {
1565
-                batchNumber += 1
1580
+                let chunkSize = min(
1581
+                    max(persistenceState.currentWriteChunkSize, persistenceState.minimumWriteChunkSize),
1582
+                    page.samples.count - batchStart
1583
+                )
1566 1584
                 let batchEnd = min(
1567
-                    batchStart + DistributionCaptureConfiguration.archiveWriteChunkSize,
1585
+                    batchStart + chunkSize,
1568 1586
                     page.samples.count
1569 1587
                 )
1570 1588
                 let sampleBatch = Array(page.samples[batchStart..<batchEnd])
1571 1589
                 if let progress, let typeIdentifier, let progressDetail {
1572
-                    let detail = totalSampleBatches == 1
1590
+                    let detail = page.samples.count <= chunkSize
1573 1591
                         ? progressDetail
1574
-                        : "\(progressDetail) (\(batchNumber)/\(totalSampleBatches))"
1592
+                        : "\(progressDetail) (\(batchStart + 1)-\(batchEnd)/\(page.samples.count))"
1575 1593
                     progress.updateBlockProgress(
1576 1594
                         typeIdentifier,
1577 1595
                         detail: detail,
@@ -1586,12 +1604,15 @@ final class HealthKitService {
1586 1604
                         }()
1587 1605
                     )
1588 1606
                 }
1607
+                let batchStartedAt = Date()
1589 1608
                 _ = try await archiveStore.upsertSamples(
1590 1609
                     sampleBatch,
1591 1610
                     observedAt: observedAt,
1592 1611
                     observationID: observationID
1593 1612
                 )
1613
+                persistenceState.registerBatchDuration(Date().timeIntervalSince(batchStartedAt))
1594 1614
                 batchStart = batchEnd
1615
+                await Task.yield()
1595 1616
             }
1596 1617
         }
1597 1618
         for deletedObject in page.deletedObjects {
@@ -1602,6 +1623,7 @@ final class HealthKitService {
1602 1623
                 observationID: observationID
1603 1624
             )
1604 1625
         }
1626
+        return persistenceState
1605 1627
     }
1606 1628
 
1607 1629
     private static func archiveAnchor(_ anchor: HKQueryAnchor) -> Data? {
@@ -2438,8 +2460,81 @@ private extension SnapshotFetchProgress {
2438 2460
     }
2439 2461
 }
2440 2462
 
2441
-private enum DistributionCaptureConfiguration {
2442
-    static let queryPageLimit = 5_000
2463
+struct DistributionCaptureStrategy: Equatable {
2464
+    let queryPageLimit: Int
2465
+    let initialWriteChunkSize: Int
2466
+    let minimumWriteChunkSize: Int
2467
+    let slowBatchThresholdSeconds: TimeInterval
2468
+    let severeBatchThresholdSeconds: TimeInterval
2469
+
2470
+    func makePersistenceState() -> DistributionPagePersistenceState {
2471
+        DistributionPagePersistenceState(
2472
+            currentWriteChunkSize: initialWriteChunkSize,
2473
+            maximumWriteChunkSize: initialWriteChunkSize,
2474
+            minimumWriteChunkSize: minimumWriteChunkSize,
2475
+            slowBatchThresholdSeconds: slowBatchThresholdSeconds,
2476
+            severeBatchThresholdSeconds: severeBatchThresholdSeconds
2477
+        )
2478
+    }
2479
+}
2480
+
2481
+struct DistributionPagePersistenceState: Equatable {
2482
+    var currentWriteChunkSize: Int
2483
+    let maximumWriteChunkSize: Int
2484
+    let minimumWriteChunkSize: Int
2485
+    let slowBatchThresholdSeconds: TimeInterval
2486
+    let severeBatchThresholdSeconds: TimeInterval
2487
+
2488
+    mutating func registerBatchDuration(_ duration: TimeInterval) {
2489
+        if duration >= severeBatchThresholdSeconds {
2490
+            currentWriteChunkSize = max(minimumWriteChunkSize, currentWriteChunkSize / 2)
2491
+            return
2492
+        }
2493
+        if duration >= slowBatchThresholdSeconds {
2494
+            currentWriteChunkSize = max(minimumWriteChunkSize, currentWriteChunkSize - max(minimumWriteChunkSize / 2, 50))
2495
+            return
2496
+        }
2497
+        if duration <= 0.35, currentWriteChunkSize < maximumWriteChunkSize {
2498
+            currentWriteChunkSize = min(maximumWriteChunkSize, currentWriteChunkSize + max(minimumWriteChunkSize / 2, 50))
2499
+        }
2500
+    }
2501
+}
2502
+
2503
+enum DistributionCaptureConfiguration {
2443 2504
     static let pageTimeoutSeconds: TimeInterval = 60
2444
-    static let archiveWriteChunkSize = 1_000
2505
+    static let incrementalStrategy = DistributionCaptureStrategy(
2506
+        queryPageLimit: 5_000,
2507
+        initialWriteChunkSize: 1_000,
2508
+        minimumWriteChunkSize: 250,
2509
+        slowBatchThresholdSeconds: 2,
2510
+        severeBatchThresholdSeconds: 6
2511
+    )
2512
+
2513
+    private static let highVolumeTypeIdentifiers: Set<String> = [
2514
+        HKQuantityTypeIdentifier.heartRate.rawValue,
2515
+        HKQuantityTypeIdentifier.stepCount.rawValue,
2516
+        HKQuantityTypeIdentifier.distanceWalkingRunning.rawValue,
2517
+        HKQuantityTypeIdentifier.activeEnergyBurned.rawValue,
2518
+        HKQuantityTypeIdentifier.appleExerciseTime.rawValue
2519
+    ]
2520
+
2521
+    static func initialImportStrategy(for typeIdentifier: String) -> DistributionCaptureStrategy {
2522
+        if highVolumeTypeIdentifiers.contains(typeIdentifier) {
2523
+            return DistributionCaptureStrategy(
2524
+                queryPageLimit: 1_000,
2525
+                initialWriteChunkSize: 250,
2526
+                minimumWriteChunkSize: 100,
2527
+                slowBatchThresholdSeconds: 1.25,
2528
+                severeBatchThresholdSeconds: 4
2529
+            )
2530
+        }
2531
+
2532
+        return DistributionCaptureStrategy(
2533
+            queryPageLimit: 2_000,
2534
+            initialWriteChunkSize: 500,
2535
+            minimumWriteChunkSize: 100,
2536
+            slowBatchThresholdSeconds: 1.5,
2537
+            severeBatchThresholdSeconds: 4
2538
+        )
2539
+    }
2445 2540
 }
+19 -8
HealthProbe/Services/SQLiteHealthArchiveStore.swift
@@ -1408,6 +1408,15 @@ actor SQLiteHealthArchiveStore: HealthArchiveStore {
1408 1408
             sqlite3_close(db)
1409 1409
             throw SQLiteHealthArchiveStoreError.openFailed(message)
1410 1410
         }
1411
+        sqlite3_busy_timeout(db, 5_000)
1412
+        do {
1413
+            try execute("PRAGMA synchronous = NORMAL", db: db)
1414
+            try execute("PRAGMA temp_store = MEMORY", db: db)
1415
+            try execute("PRAGMA wal_autocheckpoint = 4000", db: db)
1416
+        } catch {
1417
+            sqlite3_close(db)
1418
+            throw error
1419
+        }
1411 1420
         return db
1412 1421
     }
1413 1422
 
@@ -1810,19 +1819,21 @@ actor SQLiteHealthArchiveStore: HealthArchiveStore {
1810 1819
     ) throws -> HealthArchiveWriteSummary {
1811 1820
         let statementCache = SQLiteStatementCache(db: db)
1812 1821
         defer { statementCache.finalizeAll() }
1813
-        let rows = samples.map { ArchiveSampleRow(sample: $0, observedAt: observedAt) }
1814 1822
         var inserted = 0
1815 1823
         var updated = 0
1816 1824
         var unchanged = 0
1817 1825
         var typeEventCounts: [Int64: (inserted: Int, deleted: Int)] = [:]
1818 1826
 
1819
-        for row in rows {
1820
-            let result = try upsertArchiveV2Sample(
1821
-                row,
1822
-                observationID: observationID,
1823
-                db: db,
1824
-                statementCache: statementCache
1825
-            )
1827
+        for sample in samples {
1828
+            let result: ArchiveV2SampleWriteResult = try autoreleasepool {
1829
+                let row = ArchiveSampleRow(sample: sample, observedAt: observedAt)
1830
+                return try upsertArchiveV2Sample(
1831
+                    row,
1832
+                    observationID: observationID,
1833
+                    db: db,
1834
+                    statementCache: statementCache
1835
+                )
1836
+            }
1826 1837
             var counts = typeEventCounts[result.sampleTypeID, default: (inserted: 0, deleted: 0)]
1827 1838
             switch result.kind {
1828 1839
             case .inserted: