diff --git a/src/ctia/task/migration/migrate_es_stores.clj b/src/ctia/task/migration/migrate_es_stores.clj index c5fc461b8..4dd896622 100644 --- a/src/ctia/task/migration/migrate_es_stores.clj +++ b/src/ctia/task/migration/migrate_es_stores.clj @@ -124,12 +124,14 @@ :documents data :search_after next-search-after))))) -(s/defn read-source ;; WARNING: defining schema output breaks lazyness +(s/defn read-source "returns a lazy-seq of batch from source store" [read-params :- (s/maybe BatchParams)] - (lazy-seq - (when-let [batch (read-source-batch read-params)] - (cons batch (read-source (dissoc batch :documents)))))) + (iteration read-source-batch + :vf identity + :initk read-params + :kf #(dissoc % :documents) + :somef seq)) (s/defn write-target :- s/Int "This function writes a batch of documents which are (1) modified with `migrations` functions, diff --git a/src/ctia/task/migration/store.clj b/src/ctia/task/migration/store.clj index 607a6b3d4..9e8bc714c 100644 --- a/src/ctia/task/migration/store.clj +++ b/src/ctia/task/migration/store.clj @@ -430,7 +430,8 @@ Rollover requires refresh so we cannot just call ES with condition since refresh {"id" sort-order}) params (merge - {:offset (or offset 0) + {:track_total_hits true + :offset (or offset 0) :limit batch-size} (when sort-order {:sort sort-by})