diff --git a/c#/imagePipeline/src/Consumer/HashConsumer.cs b/c#/imagePipeline/src/Consumer/HashConsumer.cs index 51ec972f..fed73021 100644 --- a/c#/imagePipeline/src/Consumer/HashConsumer.cs +++ b/c#/imagePipeline/src/Consumer/HashConsumer.cs @@ -7,14 +7,13 @@ public class HashConsumer : MatrixConsumer, IDisposable { private readonly Dictionary> _imageHashSettersKeyByAlgorithm; - public HashConsumer(ImagePipelineDbContext.New dbContextFactory) : base(dbContextFactory) => - _imageHashSettersKeyByAlgorithm = new() - { - {PHash.Create(), (image, bytes) => image.PHash = BitConverter.ToUInt64(bytes)}, - {AverageHash.Create(), (image, bytes) => image.AverageHash = BitConverter.ToUInt64(bytes)}, - {BlockMeanHash.Create(), (image, bytes) => image.BlockMeanHash = bytes}, - {MarrHildrethHash.Create(), (image, bytes) => image.MarrHildrethHash = bytes} - }; + public HashConsumer() => _imageHashSettersKeyByAlgorithm = new() + { + {PHash.Create(), (image, bytes) => image.PHash = BitConverter.ToUInt64(bytes)}, + {AverageHash.Create(), (image, bytes) => image.AverageHash = BitConverter.ToUInt64(bytes)}, + {BlockMeanHash.Create(), (image, bytes) => image.BlockMeanHash = bytes}, + {MarrHildrethHash.Create(), (image, bytes) => image.MarrHildrethHash = bytes} + }; public void Dispose() => _imageHashSettersKeyByAlgorithm.Keys.ForEach(hash => hash.Dispose()); diff --git a/c#/imagePipeline/src/Consumer/MatrixConsumer.cs b/c#/imagePipeline/src/Consumer/MatrixConsumer.cs index 79b37c3a..03de1a30 100644 --- a/c#/imagePipeline/src/Consumer/MatrixConsumer.cs +++ b/c#/imagePipeline/src/Consumer/MatrixConsumer.cs @@ -1,26 +1,14 @@ -using System.Data; - namespace tbm.ImagePipeline.Consumer; public abstract class MatrixConsumer { - private readonly ImagePipelineDbContext.New _dbContextFactory; - private readonly string _script; - - protected MatrixConsumer(ImagePipelineDbContext.New dbContextFactory, string script = "") => - (_dbContextFactory, _script) = (dbContextFactory, script); - - public async Task Consume(Dictionary matricesKeyByImageId, CancellationToken stoppingToken) + public void Consume(ImagePipelineDbContext db, Dictionary matricesKeyByImageId, CancellationToken stoppingToken) { // defensive clone to prevent any consumer mutate the original matrix in param var clonedMatricesKeyByImageId = matricesKeyByImageId.ToDictionary(pair => pair.Key, pair => pair.Value.Clone()); try { - var db = _dbContextFactory(_script); - await using var transaction = await db.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, stoppingToken); ConsumeInternal(db, clonedMatricesKeyByImageId, stoppingToken); - _ = await db.SaveChangesAsync(stoppingToken); - await transaction.CommitAsync(stoppingToken); } finally { diff --git a/c#/imagePipeline/src/Consumer/MetadataConsumer.cs b/c#/imagePipeline/src/Consumer/MetadataConsumer.cs index a1b08ce5..7180168e 100644 --- a/c#/imagePipeline/src/Consumer/MetadataConsumer.cs +++ b/c#/imagePipeline/src/Consumer/MetadataConsumer.cs @@ -1,4 +1,3 @@ -using System.Data; using System.IO.Hashing; using SixLabors.ImageSharp.Formats.Jpeg; @@ -6,15 +5,9 @@ namespace tbm.ImagePipeline.Consumer; public class MetadataConsumer { - private readonly ImagePipelineDbContext.New _dbContextFactory; - - public MetadataConsumer(ImagePipelineDbContext.New dbContextFactory) => _dbContextFactory = dbContextFactory; - - public async Task Consume(Dictionary imageAndBytesKeyById, CancellationToken stoppingToken) - { - var db = _dbContextFactory(""); - await using var transaction = await db.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, stoppingToken); - + public static void Consume(ImagePipelineDbContext db, + Dictionary imageAndBytesKeyById, + CancellationToken stoppingToken) => db.ImageMetadata.AddRange(imageAndBytesKeyById.Select(pair => { var (imageId, (image, imageBytes)) = pair; @@ -44,8 +37,4 @@ public async Task Consume(Dictionary XxHash3 = XxHash3.HashToUInt64(imageBytes) }; })); - - _ = await db.SaveChangesAsync(stoppingToken); - await transaction.CommitAsync(stoppingToken); - } } diff --git a/c#/imagePipeline/src/Consumer/OcrConsumer.cs b/c#/imagePipeline/src/Consumer/OcrConsumer.cs index 3831efba..0174f55a 100644 --- a/c#/imagePipeline/src/Consumer/OcrConsumer.cs +++ b/c#/imagePipeline/src/Consumer/OcrConsumer.cs @@ -6,10 +6,7 @@ public class OcrConsumer : MatrixConsumer public delegate OcrConsumer New(string script); - public OcrConsumer(ImagePipelineDbContext.New dbContextFactory, - JoinedRecognizer.New recognizerFactory, - string script - ) : base(dbContextFactory, script) => + public OcrConsumer(JoinedRecognizer.New recognizerFactory, string script) => _recognizer = recognizerFactory(script); public Task InitializePaddleOcr(CancellationToken stoppingToken = default) => diff --git a/c#/imagePipeline/src/ImagePipelineWorker.cs b/c#/imagePipeline/src/ImagePipelineWorker.cs index d79e1781..1d223c2b 100644 --- a/c#/imagePipeline/src/ImagePipelineWorker.cs +++ b/c#/imagePipeline/src/ImagePipelineWorker.cs @@ -1,3 +1,4 @@ +using System.Data; using System.Runtime.CompilerServices; namespace tbm.ImagePipeline; @@ -23,26 +24,28 @@ protected override async Task DoWork(CancellationToken stoppingToken) { await using var scope1 = _scope0.BeginLifetimeScope(); var db = scope1.Resolve()(script); - var metadataConsumer = scope1.Resolve(); var hashConsumer = scope1.Resolve(); var ocrConsumer = scope1.Resolve()(script); await ocrConsumer.InitializePaddleOcr(stoppingToken); await foreach (var imageAndBytesKeyById in ImageBatchGenerator(db, stoppingToken)) { - await metadataConsumer.Consume(imageAndBytesKeyById, stoppingToken); + await using var transaction = await db.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, stoppingToken); + MetadataConsumer.Consume(db, imageAndBytesKeyById, stoppingToken); var matricesKeyByImageId = imageAndBytesKeyById.ToDictionary(pair => pair.Key, // preserve alpha channel if there's any, so the type of mat might be CV_8UC3 or CV_8UC4 pair => Cv2.ImDecode(pair.Value.Bytes, ImreadModes.Unchanged)); try { - await hashConsumer.Consume(matricesKeyByImageId, stoppingToken); - await ocrConsumer.Consume(matricesKeyByImageId, stoppingToken); + hashConsumer.Consume(db, matricesKeyByImageId, stoppingToken); + ocrConsumer.Consume(db, matricesKeyByImageId, stoppingToken); } finally { matricesKeyByImageId.Values.ForEach(mat => mat.Dispose()); } + _ = await db.SaveChangesAsync(stoppingToken); + await transaction.CommitAsync(stoppingToken); } } } @@ -55,8 +58,8 @@ protected override async Task DoWork(CancellationToken stoppingToken) { var images = (from image in db.Images.AsNoTracking() where image.ImageId > lastImageIdInPreviousBatch - && !db.ImageOcrBoxes.AsNoTracking().Select(e => e.ImageId).Contains(image.ImageId) - && !db.ImageOcrLines.AsNoTracking().Select(e => e.ImageId).Contains(image.ImageId) + && !db.ImageOcrBoxes.Select(e => e.ImageId).Contains(image.ImageId) + && !db.ImageOcrLines.Select(e => e.ImageId).Contains(image.ImageId) orderby image.ImageId select image).Take(_batchSize).ToList(); if (images.Any()) yield return new(