-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add command for dumping an Elasticsearch instance in the PaaS #188
Merged
Merged
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
935f210
feat: add command for dumping an Elasticsearch instance in the PaaS
Kuruyia 2692da9
docs: add README section for the PaaS Elasticsearch dump command
Kuruyia 77bae1e
chore: rename PaaS ES dump class
Kuruyia edd3440
feat: add flag to the PaaS ES dump command to configure the document …
Kuruyia 16fad0f
fix: attempt to cleanly finish dumping PaaS ES documents if dumping f…
Kuruyia 8b0cd2c
feat: store dumped PaaS ES documents in a single JSONL file
Kuruyia 1eb4b2c
docs: update README
Kuruyia f3126d8
chore: merge 1-dev and fix conflicts
Kuruyia 0c64151
feat: check the batch size
Kuruyia File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,226 @@ | ||
import path from "path"; | ||
import fs from "node:fs/promises"; | ||
|
||
import ndjson from "ndjson"; | ||
import { flags } from "@oclif/command"; | ||
|
||
import { PaasKommand } from "../../../support/PaasKommand"; | ||
|
||
/** | ||
* Results of the document dump action. | ||
*/ | ||
type DocumentDump = { | ||
pit_id: string; | ||
hits: DocumentDumpHits; | ||
}; | ||
|
||
type DocumentDumpHits = { | ||
total: DocumentDumpHitsTotal; | ||
hits: DocumentDumpHit[]; | ||
}; | ||
|
||
type DocumentDumpHitsTotal = { | ||
value: number; | ||
}; | ||
|
||
type DocumentDumpHit = { | ||
sort: string[]; | ||
}; | ||
|
||
class PaasEsDump extends PaasKommand { | ||
public static description = "Dump data from the Elasticsearch of a PaaS application"; | ||
|
||
public static flags = { | ||
help: flags.help(), | ||
project: flags.string({ | ||
description: "Current PaaS project", | ||
}), | ||
"batch-size": flags.integer({ | ||
description: "Maximum batch size", | ||
default: 2000, | ||
}), | ||
}; | ||
|
||
static args = [ | ||
{ | ||
name: "environment", | ||
description: "Project environment name", | ||
required: true, | ||
}, | ||
{ | ||
name: "applicationId", | ||
description: "Application Identifier", | ||
required: true, | ||
}, | ||
{ | ||
name: "dumpDirectory", | ||
description: "Directory where to store dump files", | ||
required: true, | ||
} | ||
]; | ||
|
||
async runSafe() { | ||
// Check that the batch size is positive | ||
if (this.flags["batch-size"] <= 0) { | ||
this.logKo(`The batch size must be greater than zero. (Specified batch size: ${this.flags["batch-size"]})`); | ||
process.exit(1); | ||
} | ||
|
||
// Log in to the PaaS | ||
const apiKey = await this.getCredentials(); | ||
|
||
await this.initPaasClient({ apiKey }); | ||
|
||
const user = await this.paas.auth.getCurrentUser(); | ||
this.logInfo( | ||
`Logged as "${user._id}" for project "${this.flags.project || this.getProject() | ||
}"` | ||
); | ||
|
||
// Create the dump directory | ||
await fs.mkdir(this.args.dumpDirectory, { recursive: true }); | ||
|
||
// Dump the indexes | ||
this.logInfo("Dumping Elasticsearch indexes..."); | ||
|
||
const indexesResult = await this.getAllIndexes(); | ||
await fs.writeFile(path.join(this.args.dumpDirectory, "indexes.json"), JSON.stringify(indexesResult)); | ||
|
||
this.logOk("Elasticsearch indexes dumped!"); | ||
|
||
// Dump all the documents | ||
this.logInfo("Dumping Elasticsearch documents..."); | ||
await this.dumpAllDocuments(); | ||
|
||
this.logOk("Elasticsearch documents dumped!"); | ||
this.logOk(`The dumped files are available under "${path.resolve(this.args.dumpDirectory)}"`) | ||
} | ||
|
||
/** | ||
* @description Get all indexes from the Elasticsearch of the PaaS application. | ||
* @returns The indexes. | ||
*/ | ||
private async getAllIndexes() { | ||
const { result }: any = await this.paas.query({ | ||
controller: "application/storage", | ||
action: "getIndexes", | ||
environmentId: this.args.environment, | ||
projectId: this.flags.project || this.getProject(), | ||
applicationId: this.args.applicationId, | ||
body: {}, | ||
}); | ||
|
||
return result; | ||
} | ||
|
||
/** | ||
* @description Dump documents from the Elasticsearch of the PaaS application. | ||
* @param pitId ID of the PIT opened on Elasticsearch. | ||
* @param searchAfter Cursor for dumping documents after a certain one. | ||
* @returns The dumped documents. | ||
*/ | ||
private async dumpDocuments(pitId: string, searchAfter: string[]): Promise<DocumentDump> { | ||
const { result }: any = await this.paas.query({ | ||
controller: "application/storage", | ||
action: "dumpDocuments", | ||
environmentId: this.args.environment, | ||
projectId: this.flags.project || this.getProject(), | ||
applicationId: this.args.applicationId, | ||
body: { | ||
pitId, | ||
searchAfter: JSON.stringify(searchAfter), | ||
size: this.flags["batch-size"], | ||
}, | ||
}); | ||
|
||
return result; | ||
} | ||
|
||
private async dumpAllDocuments() { | ||
// Prepare dumping all documents | ||
let pitId = ""; | ||
let searchAfter: string[] = []; | ||
|
||
let dumpedDocuments = 0; | ||
let totalDocuments = 0; | ||
|
||
const fd = await fs.open(path.join(this.args.dumpDirectory, "documents.jsonl"), "w"); | ||
const writeStream = fd.createWriteStream(); | ||
const ndjsonStream = ndjson.stringify(); | ||
|
||
writeStream.on("error", (error) => { | ||
throw error; | ||
}); | ||
|
||
ndjsonStream.on("data", (line: string) => { | ||
writeStream.write(line); | ||
}); | ||
|
||
const teardown = async () => { | ||
// Finish the dump session if a PIT ID is set | ||
if (pitId.length > 0) { | ||
await this.finishDump(pitId); | ||
} | ||
|
||
// Close the open streams/file | ||
writeStream.close(); | ||
await fd.close(); | ||
}; | ||
|
||
try { | ||
// Dump the first batch | ||
let result = await this.dumpDocuments(pitId, searchAfter); | ||
let hits = result.hits.hits; | ||
|
||
while (hits.length > 0) { | ||
// Update the PIT ID and the cursor for the next dump | ||
pitId = result.pit_id; | ||
searchAfter = hits[hits.length - 1].sort; | ||
|
||
// Save the documents | ||
for (let i = 0; i < hits.length; ++i) { | ||
ndjsonStream.write(hits[i]); | ||
} | ||
|
||
dumpedDocuments += hits.length; | ||
totalDocuments = result.hits.total.value; | ||
this.logInfo(`Dumping Elasticsearch documents: ${Math.floor(dumpedDocuments / totalDocuments * 100)}% (${dumpedDocuments}/${totalDocuments})`); | ||
|
||
// Dump the next batch | ||
result = await this.dumpDocuments(pitId, searchAfter); | ||
hits = result.hits.hits; | ||
} | ||
} catch (error: any) { | ||
teardown(); | ||
|
||
this.logKo(`Error while dumping the documents: ${error}`); | ||
process.exit(1); | ||
} | ||
|
||
// Finish the dump | ||
teardown(); | ||
} | ||
|
||
/** | ||
* @description Finish the document dumping session. | ||
* @param pitId ID of the PIT opened on Elasticsearch. | ||
*/ | ||
private async finishDump(pitId: string) { | ||
try { | ||
await this.paas.query({ | ||
controller: "application/storage", | ||
action: "finishDumpDocuments", | ||
environmentId: this.args.environment, | ||
projectId: this.flags.project || this.getProject(), | ||
applicationId: this.args.applicationId, | ||
body: { | ||
pitId, | ||
}, | ||
}); | ||
} catch (error: any) { | ||
this.logInfo(`Unable to cleanly finish the dump session: ${error}`); | ||
} | ||
} | ||
} | ||
|
||
export default PaasEsDump; |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does option { recursive true } works as mkdirp ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it works the same as
mkdir -p