Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions mobile/lib/domain/services/background_worker.service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ import 'package:immich_mobile/services/upload.service.dart';
import 'package:immich_mobile/utils/bootstrap.dart';
import 'package:immich_mobile/utils/debug_print.dart';
import 'package:immich_mobile/utils/http_ssl_options.dart';
import 'package:immich_mobile/wm_executor.dart';
import 'package:isar/isar.dart';
import 'package:logging/logging.dart';
import 'package:worker_manager/worker_manager.dart';

class BackgroundWorkerFgService {
final BackgroundWorkerFgHostApi _foregroundHostApi;
Expand Down Expand Up @@ -94,7 +94,7 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
await Future.wait(
[
loadTranslations(),
workerManager.init(dynamicSpawning: true),
workerManagerPatch.init(dynamicSpawning: true),
_ref?.read(authServiceProvider).setOpenApiServiceEndpoint(),
// Initialize the file downloader
FileDownloader().configure(
Expand Down Expand Up @@ -193,7 +193,7 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
_logger.info("Cleaning up background worker");
final cleanupFutures = [
nativeSyncApi?.cancelHashing(),
workerManager.dispose().catchError((_) async {
workerManagerPatch.dispose().catchError((_) async {
// Discard any errors on the dispose call
return;
}),
Expand Down
5 changes: 3 additions & 2 deletions mobile/lib/main.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import 'dart:async';
import 'dart:io';
import 'dart:math';

import 'package:auto_route/auto_route.dart';
import 'package:background_downloader/background_downloader.dart';
Expand Down Expand Up @@ -40,10 +41,10 @@ import 'package:immich_mobile/utils/debug_print.dart';
import 'package:immich_mobile/utils/http_ssl_options.dart';
import 'package:immich_mobile/utils/licenses.dart';
import 'package:immich_mobile/utils/migration.dart';
import 'package:immich_mobile/wm_executor.dart';
import 'package:intl/date_symbol_data_local.dart';
import 'package:logging/logging.dart';
import 'package:timezone/data/latest.dart';
import 'package:worker_manager/worker_manager.dart';

void main() async {
ImmichWidgetsBinding();
Expand All @@ -52,7 +53,7 @@ void main() async {
await Bootstrap.initDomain(isar, drift, logDb);
await initApp();
// Warm-up isolate pool for worker manager
await workerManager.init(dynamicSpawning: true);
await workerManagerPatch.init(dynamicSpawning: true, isolatesCount: max(Platform.numberOfProcessors - 1, 5));
await migrateDatabaseIfNeeded(isar, drift);
HttpSSLOptions.apply();

Expand Down
3 changes: 2 additions & 1 deletion mobile/lib/utils/isolate.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
import 'package:immich_mobile/utils/bootstrap.dart';
import 'package:immich_mobile/utils/debug_print.dart';
import 'package:immich_mobile/utils/http_ssl_options.dart';
import 'package:immich_mobile/wm_executor.dart';
import 'package:logging/logging.dart';
import 'package:worker_manager/worker_manager.dart';

Expand All @@ -31,7 +32,7 @@ Cancelable<T?> runInIsolateGentle<T>({
throw const InvalidIsolateUsageException();
}

return workerManager.executeGentle((cancelledChecker) async {
return workerManagerPatch.executeGentle((cancelledChecker) async {
T? result;
await runZonedGuarded(
() async {
Expand Down
251 changes: 251 additions & 0 deletions mobile/lib/wm_executor.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
// part of 'package:worker_manager/worker_manager.dart';
// ignore_for_file: implementation_imports, avoid_print

import 'dart:async';
import 'dart:math';

import 'package:collection/collection.dart';
import 'package:flutter/foundation.dart';
import 'package:worker_manager/src/number_of_processors/processors_io.dart';
import 'package:worker_manager/src/worker/worker.dart';
import 'package:worker_manager/worker_manager.dart';

final workerManagerPatch = _Executor();

// [-2^54; 2^53] is compatible with dart2js, see core.int doc
const _minId = -9007199254740992;
const _maxId = 9007199254740992;

class Mixinable<T> {
late final itSelf = this as T;
}

mixin _ExecutorLogger on Mixinable<_Executor> {
var log = false;

@mustCallSuper
void init() {
logMessage("${itSelf._isolatesCount} workers have been spawned and initialized");
}

void logTaskAdded<R>(String uid) {
logMessage("added task with number $uid");
}

@mustCallSuper
void dispose() {
logMessage("worker_manager have been disposed");
}

@mustCallSuper
void _cancel(Task task) {
logMessage("Task ${task.id} have been canceled");
}

void logMessage(String message) {
if (log) print(message);
}
}

class _Executor extends Mixinable<_Executor> with _ExecutorLogger {
final _queue = PriorityQueue<Task>();
final _pool = <Worker>[];
var _nextTaskId = _minId;
var _dynamicSpawning = false;
var _isolatesCount = numberOfProcessors;

@override
Future<void> init({int? isolatesCount, bool? dynamicSpawning}) async {
if (_pool.isNotEmpty) {
print("worker_manager already warmed up, init is ignored. Dispose before init");
return;
}
if (isolatesCount != null) {
if (isolatesCount < 0) {
throw Exception("isolatesCount must be greater than 0");
}

_isolatesCount = isolatesCount;
}
_dynamicSpawning = dynamicSpawning ?? false;
await _ensureWorkersInitialized();
super.init();
}

@override
Future<void> dispose() async {
_queue.clear();
for (final worker in _pool) {
worker.kill();
}
_pool.clear();
super.dispose();
}

Cancelable<R> execute<R>(Execute<R> execution, {WorkPriority priority = WorkPriority.immediately}) {
return _createCancelable<R>(execution: execution, priority: priority);
}

Cancelable<R> executeNow<R>(ExecuteGentle<R> execution) {
final task = TaskGentle<R>(
id: "",
workPriority: WorkPriority.immediately,
execution: execution,
completer: Completer<R>(),
);

Future<void> run() async {
try {
final result = await execution(() => task.canceled);
task.complete(result, null, null);
} catch (error, st) {
task.complete(null, error, st);
}
}

run();
return Cancelable(completer: task.completer, onCancel: () => _cancel(task));
}

Cancelable<R> executeWithPort<R, T>(
ExecuteWithPort<R> execution, {
WorkPriority priority = WorkPriority.immediately,
required void Function(T value) onMessage,
}) {
return _createCancelable<R>(
execution: execution,
priority: priority,
onMessage: (message) => onMessage(message as T),
);
}

Cancelable<R> executeGentle<R>(ExecuteGentle<R> execution, {WorkPriority priority = WorkPriority.immediately}) {
return _createCancelable<R>(execution: execution, priority: priority);
}

Cancelable<R> executeGentleWithPort<R, T>(
ExecuteGentleWithPort<R> execution, {
WorkPriority priority = WorkPriority.immediately,
required void Function(T value) onMessage,
}) {
return _createCancelable<R>(
execution: execution,
priority: priority,
onMessage: (message) => onMessage(message as T),
);
}

void _createWorkers() {
for (var i = 0; i < _isolatesCount; i++) {
_pool.add(Worker());
}
}

Future<void> _initializeWorkers() async {
await Future.wait(_pool.map((e) => e.initialize()));
}

Cancelable<R> _createCancelable<R>({
required Function execution,
WorkPriority priority = WorkPriority.immediately,
void Function(Object value)? onMessage,
}) {
if (_nextTaskId + 1 == _maxId) {
_nextTaskId = _minId;
}
final id = _nextTaskId.toString();
_nextTaskId++;
late final Task<R> task;
final completer = Completer<R>();
if (execution is Execute<R>) {
task = TaskRegular<R>(id: id, workPriority: priority, execution: execution, completer: completer);
} else if (execution is ExecuteWithPort<R>) {
task = TaskWithPort<R>(
id: id,
workPriority: priority,
execution: execution,
completer: completer,
onMessage: onMessage!,
);
} else if (execution is ExecuteGentle<R>) {
task = TaskGentle<R>(id: id, workPriority: priority, execution: execution, completer: completer);
} else if (execution is ExecuteGentleWithPort<R>) {
task = TaskGentleWithPort<R>(
id: id,
workPriority: priority,
execution: execution,
completer: completer,
onMessage: onMessage!,
);
}
_queue.add(task);
_schedule();
logTaskAdded(task.id);
return Cancelable(completer: task.completer, onCancel: () => _cancel(task));
}

Future<void> _ensureWorkersInitialized() async {
if (_pool.isEmpty) {
_createWorkers();
if (!_dynamicSpawning) {
await _initializeWorkers();
final poolSize = _pool.length;
final queueSize = _queue.length;
for (int i = 0; i <= min(poolSize, queueSize); i++) {
_schedule();
}
}
}
if (_pool.every((worker) => worker.taskId != null)) {
return;
}
if (_dynamicSpawning) {
final freeWorker = _pool.firstWhereOrNull(
(worker) => worker.taskId == null && !worker.initialized && !worker.initializing,
);
await freeWorker?.initialize();
_schedule();
}
}

void _schedule() {
final availableWorker = _pool.firstWhereOrNull((worker) => worker.taskId == null && worker.initialized);
if (availableWorker == null) {
_ensureWorkersInitialized();
return;
}
if (_queue.isEmpty) return;
final task = _queue.removeFirst();

availableWorker
.work(task)
.then(
(value) {
//could be completed already by cancel and it is normal.
//Assuming that worker finished with error and cleaned gracefully
task.complete(value, null, null);
},
onError: (error, st) {
task.complete(null, error, st);
},
)
.whenComplete(() {
if (_dynamicSpawning && _queue.isEmpty) availableWorker.kill();
_schedule();
});
}

@override
void _cancel(Task task) {
task.cancel();
_queue.remove(task);
final targetWorker = _pool.firstWhereOrNull((worker) => worker.taskId == task.id);
if (task is Gentle) {
targetWorker?.cancelGentle();
} else {
targetWorker?.kill();
if (!_dynamicSpawning) targetWorker?.initialize();
}
super._cancel(task);
}
}
Loading