Showing 2 changed files with 49 additions and 14 deletions
+48 -14
HealthProbe/Services/HealthKitService.swift
@@ -1381,6 +1381,28 @@ final class HealthKitService {
1381 1381
         var archiveWriter = HealthRecordArchive.makeCompactWriter(typeIdentifier: typeIdentifier)
1382 1382
         var hashBuilder = HashService.TypeHashBuilder(typeIdentifier: typeIdentifier)
1383 1383
         var shouldFetchNextPage = true
1384
+        var pendingArchiveSamples: [HKSample] = []
1385
+        pendingArchiveSamples.reserveCapacity(importStrategy.initialArchiveFlushSampleLimit)
1386
+
1387
+        func flushPendingArchiveSamples(pageNumber: Int) async throws {
1388
+            guard !pendingArchiveSamples.isEmpty else { return }
1389
+            let samplesToArchive = pendingArchiveSamples
1390
+            pendingArchiveSamples.removeAll(keepingCapacity: true)
1391
+            let archiveResult = try await archivePage(
1392
+                SampleDistributionPage(samples: samplesToArchive, deletedObjects: [], anchor: nil),
1393
+                sampleType: sampleType,
1394
+                observationID: archiveObservationID,
1395
+                progress: progress,
1396
+                typeIdentifier: typeIdentifier,
1397
+                progressDetail: "Persisting import pages through \(pageNumber)",
1398
+                recordCount: nil,
1399
+                progressStarted: progressStarted,
1400
+                processedEventCount: processedEventCount,
1401
+                persistenceState: persistenceState
1402
+            )
1403
+            persistenceState = archiveResult.persistenceState
1404
+            captureTimings.insertElapsedSeconds += archiveResult.insertElapsedSeconds
1405
+        }
1384 1406
 
1385 1407
         while shouldFetchNextPage {
1386 1408
             pageNumber += 1
@@ -1410,21 +1432,25 @@ final class HealthKitService {
1410 1432
                 )
1411 1433
             }
1412 1434
             captureTimings.fetchElapsedSeconds += Date().timeIntervalSince(pageFetchStartedAt)
1413
-            let archiveResult = try await archivePage(
1414
-                page,
1415
-                sampleType: sampleType,
1416
-                observationID: archiveObservationID,
1417
-                progress: progress,
1418
-                typeIdentifier: typeIdentifier,
1419
-                progressDetail: "Persisting import page \(pageNumber)",
1420
-                recordCount: recordCount,
1421
-                progressStarted: progressStarted,
1422
-                processedEventCount: processedEventCount,
1423
-                persistenceState: persistenceState
1424
-            )
1425
-            persistenceState = archiveResult.persistenceState
1426
-            captureTimings.insertElapsedSeconds += archiveResult.insertElapsedSeconds
1427 1435
             anchor = page.anchor
1436
+            pendingArchiveSamples.append(contentsOf: page.samples)
1437
+            if !page.deletedObjects.isEmpty {
1438
+                try await flushPendingArchiveSamples(pageNumber: pageNumber)
1439
+                let archiveResult = try await archivePage(
1440
+                    SampleDistributionPage(samples: [], deletedObjects: page.deletedObjects, anchor: page.anchor),
1441
+                    sampleType: sampleType,
1442
+                    observationID: archiveObservationID,
1443
+                    progress: progress,
1444
+                    typeIdentifier: typeIdentifier,
1445
+                    progressDetail: "Persisting import deletes page \(pageNumber)",
1446
+                    recordCount: nil,
1447
+                    progressStarted: progressStarted,
1448
+                    processedEventCount: processedEventCount,
1449
+                    persistenceState: persistenceState
1450
+                )
1451
+                persistenceState = archiveResult.persistenceState
1452
+                captureTimings.insertElapsedSeconds += archiveResult.insertElapsedSeconds
1453
+            }
1428 1454
 
1429 1455
             let processStartedAt = Date()
1430 1456
             for sample in page.samples {
@@ -1442,6 +1468,9 @@ final class HealthKitService {
1442 1468
 
1443 1469
             processedEventCount += pageEventCount(page)
1444 1470
             shouldFetchNextPage = page.samples.count + page.deletedObjects.count >= importStrategy.queryPageLimit
1471
+            if pendingArchiveSamples.count >= importStrategy.initialArchiveFlushSampleLimit || !shouldFetchNextPage {
1472
+                try await flushPendingArchiveSamples(pageNumber: pageNumber)
1473
+            }
1445 1474
             let elapsedAfterPage = Date().timeIntervalSince(progressStarted)
1446 1475
             progress?.updateBlockProgress(
1447 1476
                 typeIdentifier,
@@ -2630,6 +2659,7 @@ private extension SnapshotFetchProgress {
2630 2659
 struct DistributionCaptureStrategy: Equatable {
2631 2660
     let queryPageLimit: Int
2632 2661
     let initialWriteChunkSize: Int
2662
+    let initialArchiveFlushSampleLimit: Int
2633 2663
     let minimumWriteChunkSize: Int
2634 2664
     let slowBatchThresholdSeconds: TimeInterval
2635 2665
     let severeBatchThresholdSeconds: TimeInterval
@@ -2697,6 +2727,7 @@ enum DistributionCaptureConfiguration {
2697 2727
     static let incrementalStrategy = DistributionCaptureStrategy(
2698 2728
         queryPageLimit: 10_000,
2699 2729
         initialWriteChunkSize: 2_000,
2730
+        initialArchiveFlushSampleLimit: 2_000,
2700 2731
         minimumWriteChunkSize: 500,
2701 2732
         slowBatchThresholdSeconds: 2.5,
2702 2733
         severeBatchThresholdSeconds: 6
@@ -2721,6 +2752,7 @@ enum DistributionCaptureConfiguration {
2721 2752
             return DistributionCaptureStrategy(
2722 2753
                 queryPageLimit: 2_000,
2723 2754
                 initialWriteChunkSize: 2_000,
2755
+                initialArchiveFlushSampleLimit: 10_000,
2724 2756
                 minimumWriteChunkSize: 500,
2725 2757
                 slowBatchThresholdSeconds: 2.5,
2726 2758
                 severeBatchThresholdSeconds: 6
@@ -2731,6 +2763,7 @@ enum DistributionCaptureConfiguration {
2731 2763
             return DistributionCaptureStrategy(
2732 2764
                 queryPageLimit: 5_000,
2733 2765
                 initialWriteChunkSize: 1_000,
2766
+                initialArchiveFlushSampleLimit: 10_000,
2734 2767
                 minimumWriteChunkSize: 250,
2735 2768
                 slowBatchThresholdSeconds: 1.75,
2736 2769
                 severeBatchThresholdSeconds: 4.5
@@ -2740,6 +2773,7 @@ enum DistributionCaptureConfiguration {
2740 2773
         return DistributionCaptureStrategy(
2741 2774
             queryPageLimit: 20_000,
2742 2775
             initialWriteChunkSize: 5_000,
2776
+            initialArchiveFlushSampleLimit: 5_000,
2743 2777
             minimumWriteChunkSize: 500,
2744 2778
             slowBatchThresholdSeconds: 2.5,
2745 2779
             severeBatchThresholdSeconds: 6
+1 -0
HealthProbeTests/DistributionCaptureConfigurationTests.swift
@@ -10,6 +10,7 @@ final class DistributionCaptureConfigurationTests: XCTestCase {
10 10
 
11 11
         XCTAssertEqual(strategy.queryPageLimit, 2_000)
12 12
         XCTAssertEqual(strategy.initialWriteChunkSize, 2_000)
13
+        XCTAssertEqual(strategy.initialArchiveFlushSampleLimit, 10_000)
13 14
         XCTAssertEqual(strategy.minimumWriteChunkSize, 500)
14 15
         XCTAssertEqual(strategy.slowBatchThresholdSeconds, 2.5)
15 16
         XCTAssertEqual(strategy.severeBatchThresholdSeconds, 6)