From 5f8d88dd308b58b1f44d9f5a9c15b46ab3d0676d Mon Sep 17 00:00:00 2001 From: yuranich Date: Thu, 12 Feb 2026 20:29:42 +0800 Subject: [PATCH 1/4] Add indexer quorum management with integration tests and deployment docs Co-authored-by: Cursor --- .../docs/DEPLOYMENT_GUIDE.md | 128 ++++++- .../qvac-lib-registry-server/lib/config.js | 10 + .../lib/registry-service.js | 202 +++++++---- .../indexer-quorum.integration.test.js | 314 ++++++++++++++++++ 4 files changed, 574 insertions(+), 80 deletions(-) create mode 100644 packages/qvac-lib-registry-server/tests/integration/indexer-quorum.integration.test.js diff --git a/packages/qvac-lib-registry-server/docs/DEPLOYMENT_GUIDE.md b/packages/qvac-lib-registry-server/docs/DEPLOYMENT_GUIDE.md index 2a46a59e2a..4f606b47b6 100644 --- a/packages/qvac-lib-registry-server/docs/DEPLOYMENT_GUIDE.md +++ b/packages/qvac-lib-registry-server/docs/DEPLOYMENT_GUIDE.md @@ -275,20 +275,125 @@ After all writers are indexers: ## Operations -### Adding a Writer +### Adding an Indexer -1. Start new writer with `--bootstrap ` -2. Copy its local key from logs -3. Add to `QVAC_ADDITIONAL_INDEXERS` on primary indexer -4. Restart primary indexer -5. Wait for `I have become an indexer` on new writer +Full walkthrough: add **Server 2** to a running **Server 1** cluster. -### Removing a Writer +**Server 1** (existing primary indexer, already running): -1. Stop the writer to be removed -2. Remove its key from `QVAC_ADDITIONAL_INDEXERS` -3. Restart an existing indexer -4. Optionally start a replacement writer +```bash +# 1. Note the Autobase key and writer local key from logs +# (also in .env as QVAC_AUTOBASE_KEY) +pm2 logs registry | grep "Autobase key" +``` + +**Server 2** (new machine): + +```bash +# 2. Start the new indexer, joining server 1's autobase +pm2 start scripts/bin.js --name registry -- run \ + --storage ./corestore \ + --bootstrap + +# 3. Note the "Writer local key" from server 2's logs — this is +# the key you will promote to indexer +pm2 logs registry | grep "Writer local key" +``` + +**Back on Server 1** — promote server 2: + +```bash +# 4. Add server 2's local key to .env on server 1 +# (z-base-32 format, as printed in server 2's logs) +echo 'QVAC_ADDITIONAL_INDEXERS=' >> .env + +# 5. Restart server 1 — it promotes the key on startup +pm2 restart registry +``` + +Server 2 logs should print: `RegistryService: I have become an indexer` + +**Optional: authorize RPC writer clients** + +If both servers need to accept `add-model` RPC calls, add the writer key(s) to each server's `.env` and restart: + +```bash +# On each server that should accept RPC writes: +# QVAC_ALLOWED_WRITER_KEYS=, +pm2 restart registry +``` + +Writer keypairs are created once (see [Step 3](#step-3-authorize-writer-rpc-clients)). Only the hex key needs to be copied to each server's `.env`. + +**Adding a third (or Nth) indexer** follows the same pattern. Append extra keys to `QVAC_ADDITIONAL_INDEXERS` (comma-separated) and restart the promoting indexer: + +```bash +QVAC_ADDITIONAL_INDEXERS=, +``` + +Already-promoted keys are skipped automatically. + +### Removing an Indexer + +Removing an indexer is a two-part operation: the key must be removed from the Autobase quorum via `QVAC_REMOVE_INDEXERS`, and cleaned from `QVAC_ADDITIONAL_INDEXERS` so it is not re-promoted on the next restart. + +**On any active indexer** (not the one being removed): + +```bash +# 1. Add the key of the indexer to remove +echo 'QVAC_REMOVE_INDEXERS=' >> .env + +# 2. Also remove the key from QVAC_ADDITIONAL_INDEXERS if present +# (prevents re-promotion on next restart) + +# 3. Restart — removal is executed during startup +pm2 restart registry +``` + +The removed node fires `is-non-indexer` and can no longer write to the log. Existing data remains intact. + +**After removal completes**, clean up `.env`: + +```bash +# 4. Remove QVAC_REMOVE_INDEXERS (one-shot operation, not needed after restart) +sed -i '' '/QVAC_REMOVE_INDEXERS/d' .env +``` + +**Constraints:** + +- A node cannot remove itself — self-removal is rejected with a warning +- The last indexer cannot be removed — Autobase enforces at least one indexer +- A removed node can be re-added later via `QVAC_ADDITIONAL_INDEXERS` + +### Example: Two-Server Setup with pm2 + +```bash +# ── Server 1 ────────────────────────────────────────────── +# Start the primary indexer +pm2 start scripts/bin.js --name registry -- run --storage ./corestore +# → .env gets QVAC_AUTOBASE_KEY and QVAC_REGISTRY_CORE_KEY +# → Logs print Autobase key, writer local key, RPC server key + +# Add pre-existing writer key(s) to .env for RPC authorization +# QVAC_ALLOWED_WRITER_KEYS= +pm2 restart registry + +# ── Server 2 ────────────────────────────────────────────── +# Start, joining server 1's autobase +pm2 start scripts/bin.js --name registry -- run \ + --storage ./corestore \ + --bootstrap +# → Logs print writer local key + +# Add the same writer key(s) to server 2's .env +# QVAC_ALLOWED_WRITER_KEYS= +pm2 restart registry + +# ── Back on Server 1 ───────────────────────────────────── +# Promote server 2 to indexer +# QVAC_ADDITIONAL_INDEXERS= +pm2 restart registry +``` ### Sync Models from JSON Config @@ -419,6 +524,7 @@ node scripts/bin.js run --storage ./new-writer --bootstrap --skip-storage- | `QVAC_AUTOBASE_KEY` | Autobase bootstrap key (auto-generated on first run) | | `QVAC_REGISTRY_CORE_KEY` | Registry view key (auto-generated on first run) | | `QVAC_ADDITIONAL_INDEXERS` | Comma-separated writer local keys to promote to indexers | +| `QVAC_REMOVE_INDEXERS` | Comma-separated writer local keys to remove from quorum (one-shot, clean up after restart) | | `QVAC_ALLOWED_WRITER_KEYS` | Comma-separated hex keys allowed to call add-model RPC | | `QVAC_BLIND_PEER_KEYS` | Comma-separated blind peer public keys for replication | | `QVAC_PRIMARY_KEY` | Optional: Deterministic key generation (testing only) | diff --git a/packages/qvac-lib-registry-server/lib/config.js b/packages/qvac-lib-registry-server/lib/config.js index bf3723f4cc..1d7ce1b63c 100644 --- a/packages/qvac-lib-registry-server/lib/config.js +++ b/packages/qvac-lib-registry-server/lib/config.js @@ -238,6 +238,16 @@ class RegistryConfig { .filter(Boolean) } + getRemoveIndexers () { + const rawKeys = getEnv('QVAC_REMOVE_INDEXERS', '') + if (!rawKeys) return [] + + return rawKeys + .split(',') + .map(key => key.trim()) + .filter(Boolean) + } + /** * Optionally load writer keypair from env (CI use-case) */ diff --git a/packages/qvac-lib-registry-server/lib/registry-service.js b/packages/qvac-lib-registry-server/lib/registry-service.js index f0e5d8ad3c..b94c8c3e60 100644 --- a/packages/qvac-lib-registry-server/lib/registry-service.js +++ b/packages/qvac-lib-registry-server/lib/registry-service.js @@ -140,7 +140,7 @@ class RegistryService extends ReadyResource { await this.view.ready() this._logAvailableModels().catch(err => { - this.logger.error('RegistryService: Failed to log available models', err) + this.logger.error({ err }, 'RegistryService: Failed to log available models') }) this.swarm.on('connection', (conn, peerInfo) => { @@ -188,13 +188,32 @@ class RegistryService extends ReadyResource { this.swarm.join(this._rpcDiscoveryKey, { server: true, client: false }) await this.swarm.flush() - this.logger.info('Swarm joined', { + this.logger.info({ autobaseKey: IdEnc.normalize(this.base.discoveryKey), viewKey: IdEnc.normalize(this.view.discoveryKey), rpcKey: IdEnc.normalize(this._rpcDiscoveryKey) - }) + }, 'Swarm joined') if (this.base.isIndexer) { + const linearizerIndexers = (this.base.linearizer?.indexers || []) + .map(idx => idx.core?.key ? IdEnc.normalize(idx.core.key) : 'unknown') + const pendingIndexers = (this.base.system?.pendingIndexers || []) + .map(k => IdEnc.normalize(k)) + + this.logger.info({ + isIndexer: this.base.isIndexer, + localKey: this.base.localWriter ? IdEnc.normalize(this.base.localWriter.core.key) : null, + systemMembers: this.base.system?.members ?? 0, + systemLength: this.base.system?.core?.length ?? -1, + signedLength: this.base.system?.core?.signedLength ?? -1, + linearizerIndexers, + pendingIndexers, + advancing: this.base._advancing !== null, + writable: this.base.writable, + viewLength: this.view?.core?.length ?? -1 + }, 'Pre-indexer-management diagnostics') + + await this._removeIndexers() await this._addAdditionalIndexers() } @@ -203,11 +222,9 @@ class RegistryService extends ReadyResource { const hasExistingData = this.view.core.length > 0 if (memberCount <= 1 && !hasExistingData) { - this.logger.error(`Configuration error: QVAC_AUTOBASE_KEY is set but storage appears fresh or mismatched - Solutions: - 1. Remove QVAC_AUTOBASE_KEY from .env for a fresh start - 2. Use the original storage directory that matches this autobase key - Current bootstrap key: ${IdEnc.normalize(this.autobaseBootstrap)}`) + this.logger.error({ + bootstrapKey: IdEnc.normalize(this.autobaseBootstrap) + }, 'Configuration error: QVAC_AUTOBASE_KEY is set but storage appears fresh or mismatched. Solutions: 1) Remove QVAC_AUTOBASE_KEY from .env for a fresh start, 2) Use the original storage directory that matches this autobase key') throw new Error('Storage/bootstrap key mismatch - cannot initialize as indexer') } } @@ -278,7 +295,7 @@ class RegistryService extends ReadyResource { if (this.blindPeering) { await this.blindPeering.close().catch(err => { - this.logger.warn('Failed to close blind peering', { error: err.message }) + this.logger.warn({ error: err.message }, 'Failed to close blind peering') }) this.blindPeering = null } @@ -290,10 +307,10 @@ class RegistryService extends ReadyResource { for (const { blobs, core } of this.blobsCores.values()) { await blobs.close().catch(err => { - this.logger.warn('Failed to close blob store', { error: err.message }) + this.logger.warn({ error: err.message }, 'Failed to close blob store') }) await core.close().catch(err => { - this.logger.warn('Failed to close blob core', { error: err.message }) + this.logger.warn({ error: err.message }, 'Failed to close blob core') }) } this.blobsCores.clear() @@ -305,13 +322,13 @@ class RegistryService extends ReadyResource { _startCompactionInterval () { if (this._compactionInterval) return - this.logger.info('RegistryService: starting periodic compaction', { + this.logger.info({ intervalMs: this.compactionIntervalMs - }) + }, 'RegistryService: starting periodic compaction') this._compactionInterval = setInterval(() => { this.compactStorage().catch(err => { - this.logger.error('Periodic compaction failed', { error: err.message }) + this.logger.error({ error: err.message }, 'Periodic compaction failed') }) }, this.compactionIntervalMs) } @@ -324,16 +341,16 @@ class RegistryService extends ReadyResource { if (this.store?.storage && typeof this.store.storage.compact === 'function') { await this.store.storage.compact() const duration = Date.now() - startTime - this.logger.info('RegistryService: storage compaction completed', { + this.logger.info({ durationMs: duration - }) + }, 'RegistryService: storage compaction completed') } else { this.logger.warn('RegistryService: storage compaction not available') } } catch (err) { - this.logger.error('RegistryService: storage compaction failed', { + this.logger.error({ error: err.message - }) + }, 'RegistryService: storage compaction failed') } } @@ -361,9 +378,9 @@ class RegistryService extends ReadyResource { const { core } = await this._getOrCreateBlobsCore(BLOB_CORE_NAME) await this._mirrorBlobCore(core) } catch (err) { - this.logger.warn('RegistryService: failed to initialize blob core for blind peers', { + this.logger.warn({ error: err.message - }) + }, 'RegistryService: failed to initialize blob core for blind peers') } } @@ -383,9 +400,9 @@ class RegistryService extends ReadyResource { if (this.blindPeering) { await this.blindPeering.close().catch(err => { - this.logger.warn('RegistryService: failed to close existing blind peering before reseed', { + this.logger.warn({ error: err.message - }) + }, 'RegistryService: failed to close existing blind peering before reseed') }) this.blindPeering = null } @@ -441,9 +458,9 @@ class RegistryService extends ReadyResource { const ensureWriterAccess = () => { if (this._isWriterAuthorized(remoteKeyHex)) return - this.logger.warn('RPC: unauthorized writer request', { + this.logger.warn({ remoteKey: remoteKeyZ32 || remoteKeyHex || 'unknown' - }) + }, 'RPC: unauthorized writer request') const err = new Error('Unauthorized writer RPC request') err.code = 'ERR_WRITER_UNAUTHORIZED' @@ -469,10 +486,10 @@ class RegistryService extends ReadyResource { const result = await this.addModel(modelEntry, { skipExisting }) - this.logger.info('RPC: add-model completed', { + this.logger.info({ path: result.path, source: result.source - }) + }, 'RPC: add-model completed') return { success: true, @@ -493,9 +510,9 @@ class RegistryService extends ReadyResource { await this._ensureIndexer() await this.putLicense(licenseRecord) - this.logger.info('RPC: put-license completed', { + this.logger.info({ spdxId: licenseRecord.spdxId - }) + }, 'RPC: put-license completed') return { success: true, @@ -541,7 +558,7 @@ class RegistryService extends ReadyResource { const viewLength = this.view?.core?.length ?? 0 const viewContiguous = this.view?.core?.contiguousLength ?? 0 const viewSigned = this.view?.core?.signedLength ?? 0 - this.logger.info(`RPC: update-model-metadata completed path=${data.path} L=${viewLength} C=${viewContiguous} S=${viewSigned}`) + this.logger.info({ path: data.path, viewLength, viewContiguous, viewSigned }, 'RPC: update-model-metadata completed') return { success: true, @@ -563,10 +580,10 @@ class RegistryService extends ReadyResource { const result = await this.deleteModel({ path: data.path, source: data.source }) - this.logger.info('RPC: delete-model completed', { + this.logger.info({ path: data.path, source: data.source - }) + }, 'RPC: delete-model completed') return result } @@ -639,15 +656,15 @@ class RegistryService extends ReadyResource { source: sourceInfo.protocol }) if (existing) { - this.logger.info('addModel: skipping existing model', { path: sourceInfo.path }) + this.logger.info({ path: sourceInfo.path }, 'addModel: skipping existing model') return existing } } - this.logger.info('addModel: starting', { + this.logger.info({ source: sourceInfo.canonicalUrl, path: sourceInfo.path - }) + }, 'addModel: starting') const tempBase = this.config.getTempStorage() const pathHash = crypto.createHash('sha256') @@ -667,9 +684,9 @@ class RegistryService extends ReadyResource { if (isGGUFSource(sourceInfo.canonicalUrl) && isFirstShard(sourceInfo.canonicalUrl)) { ggufMetadata = await extractGGUFMetadata(localPath) } else if (isGGUFSource(sourceInfo.canonicalUrl) && !isFirstShard(sourceInfo.canonicalUrl)) { - this.logger.info('Skipping GGUF metadata extraction for non-first shard', { + this.logger.info({ source: sourceInfo.canonicalUrl - }) + }, 'Skipping GGUF metadata extraction for non-first shard') } const { blobs, core } = await this._getOrCreateBlobsCore(BLOB_CORE_NAME) @@ -683,10 +700,10 @@ class RegistryService extends ReadyResource { if (this.reseedTracker) { await this.reseedTracker.waitForComplete() - this.logger.info('Blob core replicated to blind peers', { + this.logger.info({ core: core.key.toString('hex').substring(0, 16) + '...', blocks: core.length - }) + }, 'Blob core replicated to blind peers') if (this.clearAfterReseed && core.length > 0) { const blockCount = core.length @@ -696,16 +713,16 @@ class RegistryService extends ReadyResource { // Force a small delay to let storage update await new Promise(resolve => setTimeout(resolve, 100)) - this.logger.info('Cleared blob blocks after reseed', { + this.logger.info({ blocks: blockCount - }) + }, 'Cleared blob blocks after reseed') } } - this.logger.info('addModel: completed', { + this.logger.info({ path: modelData.path, source: modelData.source - }) + }, 'addModel: completed') return modelData } finally { @@ -747,12 +764,61 @@ class RegistryService extends ReadyResource { } if (added > 0 || skipped > 0) { - this.logger.info('Additional indexers processed', { + this.logger.info({ total: additionalIndexers.length, added, skipped, errors: errors.length - }) + }, 'Additional indexers processed') + } + } + + async _removeIndexers () { + const removeKeys = this.config.getRemoveIndexers() + if (removeKeys.length === 0) return + + let removed = 0 + let skipped = 0 + const errors = [] + + for (const keyZ32 of removeKeys) { + try { + const key = IdEnc.decode(keyZ32) + + const existingIndexers = this.base.linearizer?.indexers || [] + const alreadyRemoved = !existingIndexers.some(idx => + idx.core?.key && idx.core.key.equals(key) + ) + + if (alreadyRemoved) { + skipped++ + continue + } + + if (this.base.local?.key && this.base.local.key.equals(key)) { + this.logger.warn({ key: keyZ32 }, 'Cannot remove self as indexer, skipping') + skipped++ + continue + } + + await this._appendOperation(DISPATCH_REMOVE_INDEXER, { key }) + removed++ + } catch (err) { + errors.push({ key: keyZ32, error: err.message }) + this.logger.error({ + key: keyZ32, + error: err.message + }, 'Failed to remove indexer') + } + } + + if (removed > 0 || skipped > 0) { + this.logger.info({ + total: removeKeys.length, + removed, + skipped, + errors: errors.length + }, 'Remove indexers processed') } } @@ -801,7 +867,7 @@ class RegistryService extends ReadyResource { } async _downloadFromHuggingFace (hfUrl, localPath) { - this.logger.info('Downloading from HuggingFace', { url: hfUrl }) + this.logger.info({ url: hfUrl }, 'Downloading from HuggingFace') const hfToken = this.config.getHuggingFaceToken() const parsed = this._parseHfDownloadUrl(hfUrl) @@ -844,7 +910,7 @@ class RegistryService extends ReadyResource { } async _downloadFromS3 (bucket, key, localPath) { - this.logger.info('Downloading from S3', { bucket, key }) + this.logger.info({ bucket, key }, 'Downloading from S3') const s3 = await this._createS3Client() await this._ensureLocalPath(localPath) @@ -884,10 +950,10 @@ class RegistryService extends ReadyResource { await fsPromises.unlink(localPath) } catch (err) { if (err.code !== 'ENOENT') { - this.logger.warn('Failed to unlink existing file', { + this.logger.warn({ localPath, error: err.message - }) + }, 'Failed to unlink existing file') } } } @@ -895,12 +961,12 @@ class RegistryService extends ReadyResource { async _cleanupPath (targetPath) { try { await fsPromises.rm(targetPath, { recursive: true, force: true }) - this.logger.debug('Cleaned up temp path', { targetPath }) + this.logger.debug({ targetPath }, 'Cleaned up temp path') } catch (cleanupErr) { - this.logger.warn('Failed to clean up temp path', { + this.logger.warn({ targetPath, error: cleanupErr.message - }) + }, 'Failed to clean up temp path') } } @@ -909,7 +975,7 @@ class RegistryService extends ReadyResource { return this.blobsCores.get(label) } - this.logger.debug('Getting or creating Hyperblobs core', { label }) + this.logger.debug({ label }, 'Getting or creating Hyperblobs core') const core = this.blobsStore.get({ name: `blobs-${label}`, writable: true }) await core.ready() @@ -920,11 +986,11 @@ class RegistryService extends ReadyResource { const entry = { blobs, core } this.blobsCores.set(label, entry) - this.logger.debug('Hyperblobs core ready', { + this.logger.debug({ label, key: core.key.toString('hex'), discoveryKey: core.discoveryKey.toString('hex') - }) + }, 'Hyperblobs core ready') // Caller is responsible for mirroring after data is added return entry @@ -937,10 +1003,10 @@ class RegistryService extends ReadyResource { await pipeline(readStream, writeStream) - this.logger.debug('Uploaded file to Hyperblobs', { + this.logger.debug({ size: stat.size, path: localPath - }) + }, 'Uploaded file to Hyperblobs') return writeStream.id } @@ -1031,7 +1097,7 @@ class RegistryService extends ReadyResource { await this._appendOperation(DISPATCH_DELETE_MODEL, { path, source }) - this.logger.info('deleteModel: completed', { path, source }) + this.logger.info({ path, source }, 'deleteModel: completed') return { success: true, path, source } } @@ -1065,9 +1131,9 @@ class RegistryService extends ReadyResource { await this._appendOperation(DISPATCH_PUT_LICENSE, licenseRecord) - this.logger.info('putLicense: license operation appended', { + this.logger.info({ spdxId: licenseRecord.spdxId - }) + }, 'putLicense: license operation appended') return licenseRecord } @@ -1094,9 +1160,9 @@ class RegistryService extends ReadyResource { url: licenseMeta.url, text }) - this.logger.info('Auto-created license', { spdxId: licenseId }) + this.logger.info({ spdxId: licenseId }, 'Auto-created license') } catch (err) { - this.logger.error('Failed to load license', { licenseId, error: err.message }) + this.logger.error({ licenseId, error: err.message }, 'Failed to load license') throw new Error(`License ${licenseId} not available: ${err.message}`) } } @@ -1111,17 +1177,15 @@ class RegistryService extends ReadyResource { if (models.length === 0) { this.logger.info('RegistryService: No models in registry yet') } else { - this.logger.info(`RegistryService: ${models.length} model(s) available:`) const modelsToLog = models.length > 5 ? models.slice(-5) : models - if (models.length > 5) { - this.logger.info(` ... showing last 5 of ${models.length} models`) - } - for (const model of modelsToLog) { - this.logger.info(` - ${model.path} [${model.engine}]`) - } + this.logger.info({ + count: models.length, + showing: modelsToLog.length, + models: modelsToLog.map(m => `${m.path} [${m.engine}]`) + }, 'RegistryService: models available') } } catch (err) { - this.logger.error('RegistryService: Failed to log models', err) + this.logger.error({ err }, 'RegistryService: Failed to log models') } } diff --git a/packages/qvac-lib-registry-server/tests/integration/indexer-quorum.integration.test.js b/packages/qvac-lib-registry-server/tests/integration/indexer-quorum.integration.test.js new file mode 100644 index 0000000000..76b0577b61 --- /dev/null +++ b/packages/qvac-lib-registry-server/tests/integration/indexer-quorum.integration.test.js @@ -0,0 +1,314 @@ +'use strict' + +const test = require('brittle') +const Corestore = require('corestore') +const Hyperswarm = require('hyperswarm') +const createTestnet = require('hyperdht/testnet') + +const RegistryService = require('../../lib/registry-service') +const RegistryConfig = require('../../lib/config') +const { AUTOBASE_NAMESPACE, QVAC_MAIN_REGISTRY } = require('../../shared/constants') +const { createTempStorage, waitFor } = require('../helpers/test-utils') + +const DISPATCH_ADD_INDEXER = `@${QVAC_MAIN_REGISTRY}/add-indexer` +const DISPATCH_REMOVE_INDEXER = `@${QVAC_MAIN_REGISTRY}/remove-indexer` + +const noopLogger = { + info () {}, + debug () {}, + error () {}, + warn () {} +} + +async function createService (t, { storage, bootstrap, swarmBootstrap } = {}) { + const basePath = storage || await createTempStorage(t) + const store = new Corestore(basePath) + await store.ready() + + const swarm = new Hyperswarm({ bootstrap: swarmBootstrap || [] }) + const config = new RegistryConfig({ logger: noopLogger }) + + const service = new RegistryService( + store.namespace(AUTOBASE_NAMESPACE), + swarm, + config, + { + logger: noopLogger, + ackInterval: 5, + autobaseBootstrap: bootstrap || null, + skipStorageCheck: true + } + ) + + return { service, store, swarm, config, storage: basePath } +} + +async function cleanupService ({ service, store, swarm }) { + if (service && service.opened) { + await service.close() + } + if (swarm) { + await swarm.destroy().catch(() => {}) + } + if (store) { + await store.close().catch(() => {}) + } +} + +async function waitForConnection (swarm1, swarm2) { + await swarm1.flush() + await swarm2.flush() + await waitFor(async () => { + return swarm1.connections.size > 0 && swarm2.connections.size > 0 + }, 10000) +} + +async function flushAutobases (...bases) { + for (let i = 0; i < 3; i++) { + for (const base of bases) { + await base.update() + } + for (const base of bases) { + try { + if (base.localWriter && base.localWriter.core.length > 0) { + await base.ack() + } + } catch (_) { + // Writer may have been removed from quorum + } + } + await new Promise(resolve => setTimeout(resolve, 200)) + } +} + +async function ensureIndexer (service) { + if (service.base.isIndexer) return + await service._appendOperation(DISPATCH_ADD_INDEXER, { key: service.base.local.key }) + await waitFor(async () => service.base.isIndexer === true, 15000) +} + +test('Add indexer to quorum and verify data replication', async (t) => { + const { bootstrap } = await createTestnet(3, t.teardown) + + const writer1 = await createService(t, { swarmBootstrap: bootstrap }) + await writer1.service.ready() + await ensureIndexer(writer1.service) + + const writer2 = await createService(t, { + bootstrap: writer1.service.base.key, + swarmBootstrap: bootstrap + }) + await writer2.service.ready() + + try { + await waitForConnection(writer1.swarm, writer2.swarm) + + await writer1.service._appendOperation(DISPATCH_ADD_INDEXER, { key: writer2.service.base.local.key }) + await flushAutobases(writer1.service.base, writer2.service.base) + await waitFor(async () => writer2.service.base.isIndexer === true, 15000) + + t.ok(writer1.service.base.isIndexer, 'writer1 is indexer') + t.ok(writer2.service.base.isIndexer, 'writer2 is indexer') + + await writer1.service.putLicense({ + spdxId: 'MIT', + name: 'MIT License', + url: 'https://opensource.org/licenses/MIT', + text: 'MIT License text' + }) + await flushAutobases(writer1.service.base, writer2.service.base) + + await waitFor(async () => { + const l = await writer2.service.getLicenseByKey({ spdxId: 'MIT' }) + return !!l + }, 15000) + + const license1 = await writer1.service.getLicenseByKey({ spdxId: 'MIT' }) + const license2 = await writer2.service.getLicenseByKey({ spdxId: 'MIT' }) + + t.ok(license1, 'writer1 sees license') + t.ok(license2, 'writer2 sees license after replication') + t.is(license2.spdxId, 'MIT', 'replicated data matches') + } finally { + await cleanupService(writer2) + await cleanupService(writer1) + } +}) + +test('Remove indexer from quorum preserves data and remaining indexer', async (t) => { + const { bootstrap } = await createTestnet(3, t.teardown) + + const writer1 = await createService(t, { swarmBootstrap: bootstrap }) + await writer1.service.ready() + await ensureIndexer(writer1.service) + + const writer2 = await createService(t, { + bootstrap: writer1.service.base.key, + swarmBootstrap: bootstrap + }) + await writer2.service.ready() + + try { + await waitForConnection(writer1.swarm, writer2.swarm) + + // Build 2-indexer quorum + await writer1.service._appendOperation(DISPATCH_ADD_INDEXER, { key: writer2.service.base.local.key }) + await flushAutobases(writer1.service.base, writer2.service.base) + await waitFor(async () => writer2.service.base.isIndexer === true, 15000) + + // Seed data before removal + await writer1.service.putLicense({ + spdxId: 'MIT', + name: 'MIT License', + url: 'https://opensource.org/licenses/MIT', + text: 'MIT License text' + }) + await flushAutobases(writer1.service.base, writer2.service.base) + await waitFor(async () => { + const l = await writer2.service.getLicenseByKey({ spdxId: 'MIT' }) + return !!l + }, 15000) + + // Remove writer2 from quorum + await writer1.service._appendOperation(DISPATCH_REMOVE_INDEXER, { key: writer2.service.base.local.key }) + await flushAutobases(writer1.service.base, writer2.service.base) + await waitFor(async () => writer2.service.base.isIndexer === false, 15000) + + t.ok(writer1.service.base.isIndexer, 'writer1 still indexer after removal') + t.is(writer2.service.base.isIndexer, false, 'writer2 no longer indexer') + + // Remaining indexer can still write + await writer1.service.putLicense({ + spdxId: 'Apache-2.0', + name: 'Apache License 2.0', + url: 'https://opensource.org/licenses/Apache-2.0', + text: 'Apache License text' + }) + await flushAutobases(writer1.service.base) + + const apache = await writer1.service.getLicenseByKey({ spdxId: 'Apache-2.0' }) + t.ok(apache, 'remaining indexer can write after removal') + + // Data written before removal is still accessible + const mit = await writer1.service.getLicenseByKey({ spdxId: 'MIT' }) + t.ok(mit, 'pre-removal data preserved') + } finally { + await cleanupService(writer2) + await cleanupService(writer1) + } +}) + +test('Full indexer lifecycle: add, remove, re-add', async (t) => { + const { bootstrap } = await createTestnet(3, t.teardown) + + const writer1 = await createService(t, { swarmBootstrap: bootstrap }) + await writer1.service.ready() + await ensureIndexer(writer1.service) + + const writer2 = await createService(t, { + bootstrap: writer1.service.base.key, + swarmBootstrap: bootstrap + }) + await writer2.service.ready() + + const writer3 = await createService(t, { + bootstrap: writer1.service.base.key, + swarmBootstrap: bootstrap + }) + await writer3.service.ready() + + try { + await waitForConnection(writer1.swarm, writer2.swarm) + await waitForConnection(writer1.swarm, writer3.swarm) + + // Phase 1: Build 3-indexer quorum + await writer1.service._appendOperation(DISPATCH_ADD_INDEXER, { key: writer2.service.base.local.key }) + await flushAutobases(writer1.service.base, writer2.service.base) + await waitFor(async () => writer2.service.base.isIndexer === true, 15000) + + await writer1.service._appendOperation(DISPATCH_ADD_INDEXER, { key: writer3.service.base.local.key }) + await flushAutobases(writer1.service.base, writer2.service.base, writer3.service.base) + await waitFor(async () => writer3.service.base.isIndexer === true, 15000) + + t.ok(writer1.service.base.isIndexer, 'phase1: writer1 is indexer') + t.ok(writer2.service.base.isIndexer, 'phase1: writer2 is indexer') + t.ok(writer3.service.base.isIndexer, 'phase1: writer3 is indexer') + + // Seed data with full quorum + await writer1.service.putLicense({ + spdxId: 'MIT', + name: 'MIT License', + url: 'https://opensource.org/licenses/MIT', + text: 'MIT License text' + }) + await flushAutobases(writer1.service.base, writer2.service.base, writer3.service.base) + + await waitFor(async () => { + const [a, b, c] = await Promise.all([ + writer1.service.getLicenseByKey({ spdxId: 'MIT' }), + writer2.service.getLicenseByKey({ spdxId: 'MIT' }), + writer3.service.getLicenseByKey({ spdxId: 'MIT' }) + ]) + return !!a && !!b && !!c + }, 15000) + + // Phase 2: Remove writer3 from quorum + await writer1.service._appendOperation(DISPATCH_REMOVE_INDEXER, { key: writer3.service.base.local.key }) + await flushAutobases(writer1.service.base, writer2.service.base, writer3.service.base) + await waitFor(async () => writer3.service.base.isIndexer === false, 15000) + + t.ok(writer1.service.base.isIndexer, 'phase2: writer1 still indexer') + t.ok(writer2.service.base.isIndexer, 'phase2: writer2 still indexer') + t.is(writer3.service.base.isIndexer, false, 'phase2: writer3 removed') + + // Verify 2-indexer quorum still works + await writer1.service.putLicense({ + spdxId: 'Apache-2.0', + name: 'Apache License 2.0', + url: 'https://opensource.org/licenses/Apache-2.0', + text: 'Apache License text' + }) + await flushAutobases(writer1.service.base, writer2.service.base) + + await waitFor(async () => { + const l = await writer2.service.getLicenseByKey({ spdxId: 'Apache-2.0' }) + return !!l + }, 15000) + + t.ok(await writer2.service.getLicenseByKey({ spdxId: 'Apache-2.0' }), 'phase2: 2-indexer quorum writes') + + // Phase 3: Re-add writer3 as indexer + await writer1.service._appendOperation(DISPATCH_ADD_INDEXER, { key: writer3.service.base.local.key }) + await flushAutobases(writer1.service.base, writer2.service.base, writer3.service.base) + await waitFor(async () => writer3.service.base.isIndexer === true, 15000) + + t.ok(writer3.service.base.isIndexer, 'phase3: writer3 re-added as indexer') + + // Verify restored quorum sees all data + await writer1.service.putLicense({ + spdxId: 'BSD-3-Clause', + name: 'BSD 3-Clause', + url: 'https://opensource.org/licenses/BSD-3-Clause', + text: 'BSD 3-Clause text' + }) + await flushAutobases(writer1.service.base, writer2.service.base, writer3.service.base) + + await waitFor(async () => { + const l = await writer3.service.getLicenseByKey({ spdxId: 'BSD-3-Clause' }) + return !!l + }, 15000) + + const bsd = await writer3.service.getLicenseByKey({ spdxId: 'BSD-3-Clause' }) + t.ok(bsd, 'phase3: restored quorum replicates new data') + + const apache = await writer3.service.getLicenseByKey({ spdxId: 'Apache-2.0' }) + t.ok(apache, 'phase3: data written during removal is accessible') + + const mit = await writer3.service.getLicenseByKey({ spdxId: 'MIT' }) + t.ok(mit, 'phase3: original data preserved through full lifecycle') + } finally { + await cleanupService(writer3) + await cleanupService(writer2) + await cleanupService(writer1) + } +}) From 3307d5ffd6aa3a35bfca5f0a62b19dad70d44cfb Mon Sep 17 00:00:00 2001 From: yuranich Date: Thu, 12 Feb 2026 20:39:36 +0800 Subject: [PATCH 2/4] updated schema dep --- packages/qvac-lib-registry-server/package-lock.json | 8 ++++---- packages/qvac-lib-registry-server/package.json | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/qvac-lib-registry-server/package-lock.json b/packages/qvac-lib-registry-server/package-lock.json index 8dcb01c43b..9af0209ac6 100644 --- a/packages/qvac-lib-registry-server/package-lock.json +++ b/packages/qvac-lib-registry-server/package-lock.json @@ -12,7 +12,7 @@ "@aws-sdk/client-s3": "^3.864.0", "@huggingface/gguf": "^0.3.3", "@huggingface/hub": "^2.4.1", - "@tetherto/qvac-registry-schema-mono": "^0.1.0", + "@tetherto/qvac-registry-schema-mono": "^0.2.1", "autobase": "^7.18.1", "b4a": "^1.6.7", "blind-peer": "^2.9.4", @@ -1684,9 +1684,9 @@ } }, "node_modules/@tetherto/qvac-registry-schema-mono": { - "version": "0.1.0", - "resolved": "https://npm.pkg.github.com/download/@tetherto/qvac-registry-schema-mono/0.1.0/d3ada530a48ea6fe75203fd7bc1a01fed0e81508", - "integrity": "sha512-k6bx2cMggKoGXBtVuk1eixl3y8zwXAFEVT6564SOZZBsWXaZyuIDTmmK8nSSXKsyPQ8q6lsTmfh89t1aALEY3g==", + "version": "0.2.1", + "resolved": "https://npm.pkg.github.com/download/@tetherto/qvac-registry-schema-mono/0.2.1/479fa2834c1206be35189b335fcc6a0f5dd3b08c", + "integrity": "sha512-0V8F2ymtHJUUOFf3kI8f6a83U1sjWuEIAN44Xlv6Eb6sl3M7T+WH3zSmY10/PE99rWTD6ZrNtmOvvNLcAaVYBQ==", "license": "Apache-2.0", "dependencies": { "hyperdb": "^4.16.0", diff --git a/packages/qvac-lib-registry-server/package.json b/packages/qvac-lib-registry-server/package.json index a7ecc7df6f..a1223429fb 100644 --- a/packages/qvac-lib-registry-server/package.json +++ b/packages/qvac-lib-registry-server/package.json @@ -53,7 +53,7 @@ "@aws-sdk/client-s3": "^3.864.0", "@huggingface/gguf": "^0.3.3", "@huggingface/hub": "^2.4.1", - "@tetherto/qvac-registry-schema-mono": "^0.1.0", + "@tetherto/qvac-registry-schema-mono": "^0.2.1", "autobase": "^7.18.1", "b4a": "^1.6.7", "blind-peer": "^2.9.4", From e2c81a8498bbc0a56c1fe79672ee124861111ed1 Mon Sep 17 00:00:00 2001 From: yuranich Date: Thu, 12 Feb 2026 21:16:34 +0800 Subject: [PATCH 3/4] Increase timeout for license fetch integration test on CI Co-authored-by: Cursor --- .../tests/integration/registry-service.integration.test.js | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/qvac-lib-registry-server/tests/integration/registry-service.integration.test.js b/packages/qvac-lib-registry-server/tests/integration/registry-service.integration.test.js index 0a772aaf79..92c3c97bae 100644 --- a/packages/qvac-lib-registry-server/tests/integration/registry-service.integration.test.js +++ b/packages/qvac-lib-registry-server/tests/integration/registry-service.integration.test.js @@ -182,6 +182,7 @@ test('License collection CRUD operations', async (t) => { }) test('Model with licenseId can fetch license info', async (t) => { + t.timeout(60000) const { bootstrap } = await createTestnet(3, t.teardown) const ctx = await createService(t, { swarmBootstrap: bootstrap }) From 70cb12f0ebbc1978464a2ec66cb160d1f8d1f0d7 Mon Sep 17 00:00:00 2001 From: yuranich Date: Thu, 12 Feb 2026 21:19:59 +0800 Subject: [PATCH 4/4] Add QVAC_REMOVE_INDEXERS to ENV_KEYS in shared constants Co-authored-by: Cursor --- packages/qvac-lib-registry-server/lib/config.js | 2 +- packages/qvac-lib-registry-server/shared/constants.js | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/qvac-lib-registry-server/lib/config.js b/packages/qvac-lib-registry-server/lib/config.js index 1d7ce1b63c..b4ebc1529d 100644 --- a/packages/qvac-lib-registry-server/lib/config.js +++ b/packages/qvac-lib-registry-server/lib/config.js @@ -239,7 +239,7 @@ class RegistryConfig { } getRemoveIndexers () { - const rawKeys = getEnv('QVAC_REMOVE_INDEXERS', '') + const rawKeys = getEnv(ENV_KEYS.QVAC_REMOVE_INDEXERS, '') if (!rawKeys) return [] return rawKeys diff --git a/packages/qvac-lib-registry-server/shared/constants.js b/packages/qvac-lib-registry-server/shared/constants.js index 3dc1756675..3c7915977c 100644 --- a/packages/qvac-lib-registry-server/shared/constants.js +++ b/packages/qvac-lib-registry-server/shared/constants.js @@ -44,6 +44,8 @@ const ENV_KEYS = { QVAC_WRITER_SECRET_KEY: 'QVAC_WRITER_SECRET_KEY', // Additional indexer writer local keys (comma-separated z-base-32 keys) QVAC_ADDITIONAL_INDEXERS: 'QVAC_ADDITIONAL_INDEXERS', + // Indexer keys to remove from quorum on startup (comma-separated z-base-32, one-shot) + QVAC_REMOVE_INDEXERS: 'QVAC_REMOVE_INDEXERS', // AWS S3 credentials AWS_ACCESS_KEY_ID: 'AWS_ACCESS_KEY_ID',