Skip to content

Commit

Permalink
* move parts about creating DbContext and transaction from method `…
Browse files Browse the repository at this point in the history
…(Metadata|Martix)Consumer.Consume()` to `ImagePipelineWorker.DoWork()` for ensuring only commit the transaction after all consumers finish

* make method `Consume()` being static since no other class member exists @ MetadataConsumer.cs
- `DbContext` dependencies and ctor @ class `(Matrix|Hash|Ocr)Consumer`

- unnessracy invoking `IQueryable.AsNoTracking()` on `DbSet` in serverside evaluated where expression @ `ImagePipelineWorker.ImageBatchGenerator()`
@ c#/imagePipeline
  • Loading branch information
n0099 committed May 15, 2023
1 parent ada5539 commit 91e617a
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 45 deletions.
15 changes: 7 additions & 8 deletions c#/imagePipeline/src/Consumer/HashConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ public class HashConsumer : MatrixConsumer, IDisposable
{
private readonly Dictionary<ImgHashBase, Action<ImageHash, byte[]>> _imageHashSettersKeyByAlgorithm;

public HashConsumer(ImagePipelineDbContext.New dbContextFactory) : base(dbContextFactory) =>
_imageHashSettersKeyByAlgorithm = new()
{
{PHash.Create(), (image, bytes) => image.PHash = BitConverter.ToUInt64(bytes)},
{AverageHash.Create(), (image, bytes) => image.AverageHash = BitConverter.ToUInt64(bytes)},
{BlockMeanHash.Create(), (image, bytes) => image.BlockMeanHash = bytes},
{MarrHildrethHash.Create(), (image, bytes) => image.MarrHildrethHash = bytes}
};
public HashConsumer() => _imageHashSettersKeyByAlgorithm = new()
{
{PHash.Create(), (image, bytes) => image.PHash = BitConverter.ToUInt64(bytes)},
{AverageHash.Create(), (image, bytes) => image.AverageHash = BitConverter.ToUInt64(bytes)},
{BlockMeanHash.Create(), (image, bytes) => image.BlockMeanHash = bytes},
{MarrHildrethHash.Create(), (image, bytes) => image.MarrHildrethHash = bytes}
};

public void Dispose() => _imageHashSettersKeyByAlgorithm.Keys.ForEach(hash => hash.Dispose());

Expand Down
14 changes: 1 addition & 13 deletions c#/imagePipeline/src/Consumer/MatrixConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,26 +1,14 @@
using System.Data;

namespace tbm.ImagePipeline.Consumer;

public abstract class MatrixConsumer
{
private readonly ImagePipelineDbContext.New _dbContextFactory;
private readonly string _script;

protected MatrixConsumer(ImagePipelineDbContext.New dbContextFactory, string script = "") =>
(_dbContextFactory, _script) = (dbContextFactory, script);

public async Task Consume(Dictionary<ImageId, Mat> matricesKeyByImageId, CancellationToken stoppingToken)
public void Consume(ImagePipelineDbContext db, Dictionary<ImageId, Mat> matricesKeyByImageId, CancellationToken stoppingToken)
{
// defensive clone to prevent any consumer mutate the original matrix in param
var clonedMatricesKeyByImageId = matricesKeyByImageId.ToDictionary(pair => pair.Key, pair => pair.Value.Clone());
try
{
var db = _dbContextFactory(_script);
await using var transaction = await db.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, stoppingToken);
ConsumeInternal(db, clonedMatricesKeyByImageId, stoppingToken);
_ = await db.SaveChangesAsync(stoppingToken);
await transaction.CommitAsync(stoppingToken);
}
finally
{
Expand Down
17 changes: 3 additions & 14 deletions c#/imagePipeline/src/Consumer/MetadataConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
using System.Data;
using System.IO.Hashing;
using SixLabors.ImageSharp.Formats.Jpeg;

namespace tbm.ImagePipeline.Consumer;

public class MetadataConsumer
{
private readonly ImagePipelineDbContext.New _dbContextFactory;

public MetadataConsumer(ImagePipelineDbContext.New dbContextFactory) => _dbContextFactory = dbContextFactory;

public async Task Consume(Dictionary<ImageId, (TiebaImage Image, byte[] Bytes)> imageAndBytesKeyById, CancellationToken stoppingToken)
{
var db = _dbContextFactory("");
await using var transaction = await db.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, stoppingToken);

public static void Consume(ImagePipelineDbContext db,
Dictionary<ImageId, (TiebaImage Image, byte[] Bytes)> imageAndBytesKeyById,
CancellationToken stoppingToken) =>
db.ImageMetadata.AddRange(imageAndBytesKeyById.Select(pair =>
{
var (imageId, (image, imageBytes)) = pair;
Expand Down Expand Up @@ -44,8 +37,4 @@ public async Task Consume(Dictionary<ImageId, (TiebaImage Image, byte[] Bytes)>
XxHash3 = XxHash3.HashToUInt64(imageBytes)
};
}));

_ = await db.SaveChangesAsync(stoppingToken);
await transaction.CommitAsync(stoppingToken);
}
}
5 changes: 1 addition & 4 deletions c#/imagePipeline/src/Consumer/OcrConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ public class OcrConsumer : MatrixConsumer

public delegate OcrConsumer New(string script);

public OcrConsumer(ImagePipelineDbContext.New dbContextFactory,
JoinedRecognizer.New recognizerFactory,
string script
) : base(dbContextFactory, script) =>
public OcrConsumer(JoinedRecognizer.New recognizerFactory, string script) =>
_recognizer = recognizerFactory(script);

public Task InitializePaddleOcr(CancellationToken stoppingToken = default) =>
Expand Down
15 changes: 9 additions & 6 deletions c#/imagePipeline/src/ImagePipelineWorker.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Data;
using System.Runtime.CompilerServices;

namespace tbm.ImagePipeline;
Expand All @@ -23,26 +24,28 @@ protected override async Task DoWork(CancellationToken stoppingToken)
{
await using var scope1 = _scope0.BeginLifetimeScope();
var db = scope1.Resolve<ImagePipelineDbContext.New>()(script);
var metadataConsumer = scope1.Resolve<MetadataConsumer>();
var hashConsumer = scope1.Resolve<HashConsumer>();
var ocrConsumer = scope1.Resolve<OcrConsumer.New>()(script);
await ocrConsumer.InitializePaddleOcr(stoppingToken);

await foreach (var imageAndBytesKeyById in ImageBatchGenerator(db, stoppingToken))
{
await metadataConsumer.Consume(imageAndBytesKeyById, stoppingToken);
await using var transaction = await db.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, stoppingToken);
MetadataConsumer.Consume(db, imageAndBytesKeyById, stoppingToken);
var matricesKeyByImageId = imageAndBytesKeyById.ToDictionary(pair => pair.Key,
// preserve alpha channel if there's any, so the type of mat might be CV_8UC3 or CV_8UC4
pair => Cv2.ImDecode(pair.Value.Bytes, ImreadModes.Unchanged));
try
{
await hashConsumer.Consume(matricesKeyByImageId, stoppingToken);
await ocrConsumer.Consume(matricesKeyByImageId, stoppingToken);
hashConsumer.Consume(db, matricesKeyByImageId, stoppingToken);
ocrConsumer.Consume(db, matricesKeyByImageId, stoppingToken);
}
finally
{
matricesKeyByImageId.Values.ForEach(mat => mat.Dispose());
}
_ = await db.SaveChangesAsync(stoppingToken);
await transaction.CommitAsync(stoppingToken);
}
}
}
Expand All @@ -55,8 +58,8 @@ protected override async Task DoWork(CancellationToken stoppingToken)
{
var images = (from image in db.Images.AsNoTracking()
where image.ImageId > lastImageIdInPreviousBatch
&& !db.ImageOcrBoxes.AsNoTracking().Select(e => e.ImageId).Contains(image.ImageId)
&& !db.ImageOcrLines.AsNoTracking().Select(e => e.ImageId).Contains(image.ImageId)
&& !db.ImageOcrBoxes.Select(e => e.ImageId).Contains(image.ImageId)
&& !db.ImageOcrLines.Select(e => e.ImageId).Contains(image.ImageId)
orderby image.ImageId
select image).Take(_batchSize).ToList();
if (images.Any()) yield return new(
Expand Down

0 comments on commit 91e617a

Please sign in to comment.