Skip to content

Commit

Permalink
refactor(workspace): blob sync (#5037)
Browse files Browse the repository at this point in the history
This pr implements a blob engine.
It exposes a single `BlobStorage` to the `blocksuite`, and in it we sync blobs between multiple storages.

The implement still have few issues, but we can merge this pr first and fix them in future.

* BlobEngine currently **do nothing when delete**, because synchronization logic conflicts with deletion logic.
* BlobEngine sync between storages by querying the blob list at regular intervals. This will **cause many queries**, we can avoid this in the future by subscribing to remote changes.
  • Loading branch information
EYHN committed Nov 23, 2023
1 parent 1740e7e commit 23e0137
Show file tree
Hide file tree
Showing 13 changed files with 423 additions and 174 deletions.
35 changes: 35 additions & 0 deletions packages/frontend/core/src/layouts/workspace-layout.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
} from '@affine/component/workspace';
import { useAFFiNEI18N } from '@affine/i18n/hooks';
import { rootWorkspacesMetadataAtom } from '@affine/workspace/atom';
import { getBlobEngine } from '@affine/workspace/manager';
import { assertExists } from '@blocksuite/global/utils';
import type { DragEndEvent } from '@dnd-kit/core';
import {
Expand Down Expand Up @@ -116,10 +117,44 @@ type WorkspaceLayoutProps = {
migration?: MigrationPoint;
};

const useSyncWorkspaceBlob = () => {
// temporary solution for sync blob

const [currentWorkspace] = useCurrentWorkspace();

useEffect(() => {
const blobEngine = getBlobEngine(currentWorkspace.blockSuiteWorkspace);
let stopped = false;
function sync() {
if (stopped) {
return;
}

blobEngine
?.sync()
.catch(error => {
console.error('sync blob error', error);
})
.finally(() => {
// sync every 1 minute
setTimeout(sync, 60000);
});
}

// after currentWorkspace changed, wait 1 second to start sync
setTimeout(sync, 1000);

return () => {
stopped = true;
};
}, [currentWorkspace]);
};

export const WorkspaceLayout = function WorkspacesSuspense({
children,
migration,
}: PropsWithChildren<WorkspaceLayoutProps>) {
useSyncWorkspaceBlob();
return (
<AdapterProviderWrapper>
<CurrentWorkspaceContext>
Expand Down
1 change: 1 addition & 0 deletions packages/frontend/workspace/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"@toeverything/y-indexeddb": "workspace:*",
"async-call-rpc": "^6.3.1",
"idb": "^7.1.1",
"idb-keyval": "^6.2.1",
"is-svg": "^5.0.0",
"jotai": "^2.5.1",
"js-base64": "^3.7.5",
Expand Down
79 changes: 0 additions & 79 deletions packages/frontend/workspace/src/blob/cloud-blob-storage.ts

This file was deleted.

139 changes: 139 additions & 0 deletions packages/frontend/workspace/src/blob/engine.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import { DebugLogger } from '@affine/debug';
import { difference } from 'lodash-es';

const logger = new DebugLogger('affine:blob-engine');

export class BlobEngine {
constructor(
private local: BlobStorage,
private remotes: BlobStorage[]
) {}

get storages() {
return [this.local, ...this.remotes];
}

async sync() {
if (this.local.readonly) {
return;
}
logger.debug('start syncing blob...');
for (const remote of this.remotes) {
let localList;
let remoteList;
try {
localList = await this.local.list();
remoteList = await remote.list();
} catch (err) {
logger.error(`error when sync`, err);
continue;
}

if (!remote.readonly) {
const needUpload = difference(localList, remoteList);
for (const key of needUpload) {
try {
const data = await this.local.get(key);
if (data) {
await remote.set(key, data);
}
} catch (err) {
logger.error(
`error when sync ${key} from [${this.local.name}] to [${remote.name}]`,
err
);
}
}
}

const needDownload = difference(remoteList, localList);

for (const key of needDownload) {
try {
const data = await remote.get(key);
if (data) {
await this.local.set(key, data);
}
} catch (err) {
logger.error(
`error when sync ${key} from [${remote.name}] to [${this.local.name}]`,
err
);
}
}
}

logger.debug('finish syncing blob');
}

async get(key: string) {
logger.debug('get blob', key);
for (const storage of this.storages) {
const data = await storage.get(key);
if (data) {
return data;
}
}
return undefined;
}

async set(key: string, value: Blob) {
if (this.local.readonly) {
throw new Error('local peer is readonly');
}

// await upload to the local peer
await this.local.set(key, value);

// uploads to other peers in the background
Promise.allSettled(
this.remotes
.filter(r => !r.readonly)
.map(peer =>
peer.set(key, value).catch(err => {
logger.error('error when upload to peer', err);
})
)
)
.then(result => {
if (result.some(({ status }) => status === 'rejected')) {
logger.error(
`blob ${key} update finish, but some peers failed to update`
);
} else {
logger.debug(`blob ${key} update finish`);
}
})
.catch(() => {
// Promise.allSettled never reject
});
}

async delete(_key: string) {
// not supported
}

async list() {
const blobList = new Set<string>();

for (const peer of this.storages) {
const list = await peer.list();
if (list) {
for (const blob of list) {
blobList.add(blob);
}
}
}

return Array.from(blobList);
}
}

export interface BlobStorage {
name: string;
readonly: boolean;
get: (key: string) => Promise<Blob | undefined>;
set: (key: string, value: Blob) => Promise<void>;
delete: (key: string) => Promise<void>;
list: () => Promise<string[]>;
}
37 changes: 37 additions & 0 deletions packages/frontend/workspace/src/blob/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { BlobEngine } from './engine';
import {
createAffineCloudBlobStorage,
createIndexeddbBlobStorage,
createSQLiteBlobStorage,
createStaticBlobStorage,
} from './storage';

export * from './engine';
export * from './storage';

export function createLocalBlobStorage(workspaceId: string) {
if (environment.isDesktop) {
return createSQLiteBlobStorage(workspaceId);
} else {
return createIndexeddbBlobStorage(workspaceId);
}
}

export function createLocalBlobEngine(workspaceId: string) {
return new BlobEngine(createLocalBlobStorage(workspaceId), [
createStaticBlobStorage(),
]);
}

export function createAffineCloudBlobEngine(workspaceId: string) {
return new BlobEngine(createLocalBlobStorage(workspaceId), [
createStaticBlobStorage(),
createAffineCloudBlobStorage(workspaceId),
]);
}

export function createAffinePublicBlobEngine(workspaceId: string) {
return new BlobEngine(createAffineCloudBlobStorage(workspaceId), [
createStaticBlobStorage(),
]);
}
34 changes: 0 additions & 34 deletions packages/frontend/workspace/src/blob/sqlite-blob-storage.ts

This file was deleted.

Loading

0 comments on commit 23e0137

Please sign in to comment.