Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions servers/cu/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ There are a few environment variables that you can set. Besides
- `EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD`: If a process uses this amount of
gas, then it will immediately create a Checkpoint at the end of the evaluation
stream.
- `EAGER_CHECKPOINT_EVAL_TIME_THRESHOLD`: If a process has this amount of eval
time, then it will immediately create a Checkpoint at the end of the
evaluation stream.
- `MEM_MONITOR_INTERVAL`: The interval, in milliseconds, at which to log memory
usage on this CU.
- `BUSY_THRESHOLD`: The amount of time, in milliseconds, the CU should wait for
Expand Down
2 changes: 2 additions & 0 deletions servers/cu/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ const CONFIG_ENVS = {
* catching up to a previous checkpoint.
*/
EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: process.env.EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD || 300_000_000_000_000,
EAGER_CHECKPOINT_EVAL_TIME_THRESHOLD: process.env.EAGER_CHECKPOINT_EVAL_TIME_THRESHOLD || ms('15m'),
PROCESS_WASM_MEMORY_MAX_LIMIT: process.env.PROCESS_WASM_MEMORY_MAX_LIMIT || bytes('1gb'),
PROCESS_WASM_COMPUTE_MAX_LIMIT: process.env.PROCESS_WASM_COMPUTE_MAX_LIMIT || 9_000_000_000_000, // 9t
PROCESS_WASM_SUPPORTED_FORMATS: process.env.PROCESS_WASM_SUPPORTED_FORMATS || DEFAULT_PROCESS_WASM_MODULE_FORMATS,
Expand Down Expand Up @@ -207,6 +208,7 @@ const CONFIG_ENVS = {
* This is the baseline for checkpointing as no process should need to spend more than two hours catching up to a previous checkpoint.
*/
EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: process.env.EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD || 300_000_000_000_000,
EAGER_CHECKPOINT_EVAL_TIME_THRESHOLD: process.env.EAGER_CHECKPOINT_EVAL_TIME_THRESHOLD || ms('15m'),
PROCESS_WASM_MEMORY_MAX_LIMIT: process.env.PROCESS_WASM_MEMORY_MAX_LIMIT || bytes('1gb'), // 1GB
PROCESS_WASM_COMPUTE_MAX_LIMIT: process.env.PROCESS_WASM_COMPUTE_MAX_LIMIT || 9_000_000_000_000, // 9t
PROCESS_WASM_SUPPORTED_FORMATS: process.env.PROCESS_WASM_SUPPORTED_FORMATS || DEFAULT_PROCESS_WASM_MODULE_FORMATS,
Expand Down
3 changes: 2 additions & 1 deletion servers/cu/src/domain/dal.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ export const saveLatestProcessMemorySchema = z.function()
cron: z.string().nullish(),
blockHeight: z.coerce.number().nullish(),
Memory: bufferSchema,
gasUsed: z.bigint().nullish()
gasUsed: z.bigint().nullish(),
evalTime: z.number().nullish()
}))
.returns(z.promise(z.any()))

Expand Down
93 changes: 20 additions & 73 deletions servers/cu/src/domain/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import ms from 'ms'
import pMap from 'p-map'
import { fromPromise } from 'hyper-async'

import { applicationSchema } from './dal.js'
Expand Down Expand Up @@ -86,57 +84,6 @@ export const bootstrap = async ({ config, effects }) => {
findEvaluations: effects.findEvaluations
})

let checkpointP
const checkpointProcesses = fromPromise(async () => {
if (checkpointP) {
logger('Checkpointing of WASM Memory Cache already in progress. Nooping...')
return checkpointP
}

const pArgs = []
/**
* push a new object to keep references to original data intact
*/
effects.wasmMemoryCache.data.forEach((value) =>
pArgs.push({ Memory: value.Memory, File: value.File, evaluation: value.evaluation })
)

checkpointP = pMap(
pArgs,
(value) => effects.saveCheckpoint({ Memory: value.Memory, File: value.File, ...value.evaluation })
.catch((err) => {
logger(
'Error occurred when creating Checkpoint for evaluation "%j". Skipping...',
value.evaluation,
err
)
}),
{
/**
* TODO: allow to be configured on CU
*
* Helps prevent the gateway from being overwhelmed and then timing out
*/
concurrency: 10,
/**
* Prevent any one rejected promise from causing other invocations
* to not be attempted.
*
* The overall promise will still reject, which is why we have
* an empty catch below, which will allow all Promises to either resolve,
* or reject, then the final wrapping promise to always resolve.
*
* https://github.com/sindresorhus/p-map?tab=readme-ov-file#stoponerror
*/
stopOnError: false
}
)
.catch(() => {})

await checkpointP
checkpointP = undefined
})

const healthcheck = healthcheckWith({ walletAddress: effects.address })

/**
Expand All @@ -154,7 +101,6 @@ export const bootstrap = async ({ config, effects }) => {
readResult,
readResults,
readCronResults,
checkpointProcesses,
healthcheck
}
}
Expand All @@ -174,32 +120,33 @@ export const bootstrap = async ({ config, effects }) => {
}, config.MEM_MONITOR_INTERVAL)
memMonitor.unref()

if (config.PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL) {
logger('Setting up Interval to Checkpoint all Processes every %s', ms(domainConfig.PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL))
const cacheCheckpointInterval = setInterval(async () => {
logger('Checkpoint Interval Reached. Attempting to Checkpoint all Processes currently in WASM heap cache...')
await domain.apis.checkpointProcesses().toPromise()
logger('Interval Checkpoint Done. Done checkpointing all processes in WASM heap cache.')
}, config.PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL)
cacheCheckpointInterval.unref()
}

process.on('SIGUSR2', async () => {
logger('Received SIGUSR2. Manually Attempting to Checkpoint all Processes currently in WASM heap cache...')
await domain.apis.checkpointProcesses().toPromise()
logger('SIGUSR2 Done. Done checkpointing all processes in WASM heap cache.')
})
// DEPRECATED: Checkpointing is no longer done on an interval, as we are using the eager checkpointing feature
// if (config.PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL) {
// logger('Setting up Interval to Checkpoint all Processes every %s', ms(domainConfig.PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL))
// const cacheCheckpointInterval = setInterval(async () => {
// logger('Checkpoint Interval Reached. Attempting to Checkpoint all Processes currently in WASM heap cache...')
// await domain.apis.checkpointProcesses().toPromise()
// logger('Interval Checkpoint Done. Done checkpointing all processes in WASM heap cache.')
// }, config.PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL)
// cacheCheckpointInterval.unref()
// }

// process.on('SIGUSR2', async () => {
// logger('Received SIGUSR2. Manually Attempting to Checkpoint all Processes currently in WASM heap cache...')
// await domain.apis.checkpointProcesses().toPromise()
// logger('SIGUSR2 Done. Done checkpointing all processes in WASM heap cache.')
// })

process.on('SIGTERM', async () => {
logger('Received SIGTERM. Gracefully shutting down server...')
logger('Received SIGTERM. Attempting to Checkpoint all Processes currently in WASM heap cache...')
// logger('Received SIGTERM. Attempting to Checkpoint all Processes currently in WASM heap cache...')

await Promise.all([
cu.stop(),
domain.apis.checkpointWasmMemoryCache().toPromise()
cu.stop()
// domain.apis.checkpointWasmMemoryCache().toPromise()
])

logger('Done checkpointing all processes. Exiting...')
// logger('Done checkpointing all processes. Exiting...')
process.exit()
})

Expand Down
7 changes: 6 additions & 1 deletion servers/cu/src/domain/lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ export function evaluateWith (env) {
const { noSave, cron, ordinate, message } = prev

if (!noSave && prev.Memory) {
const now = new Date()
// If there is no startTime, then we use the current time which will make evalTime = 0
const startTime = pathOr(now, ['stats', 'startTime'], ctx)

await saveLatestProcessMemory({
processId: ctx.id,
moduleId: ctx.moduleId,
Expand All @@ -332,7 +336,8 @@ export function evaluateWith (env) {
ordinate,
cron,
Memory: prev.Memory,
gasUsed: totalGasUsed
gasUsed: totalGasUsed,
evalTime: now.getTime() - startTime.getTime() // The eval time in ms: currTime - startTime
})
}

Expand Down
6 changes: 6 additions & 0 deletions servers/cu/src/domain/lib/evaluate.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ describe('evaluate', () => {
moduleId: 'foo-module',
moduleOptions,
stats: {
startTime: new Date(),
messages: {
scheduled: 0,
cron: 0,
Expand Down Expand Up @@ -198,6 +199,7 @@ describe('evaluate', () => {
moduleId: 'foo-module',
moduleOptions,
stats: {
startTime: new Date(),
messages: {
scheduled: 0,
cron: 0,
Expand Down Expand Up @@ -313,6 +315,7 @@ describe('evaluate', () => {
moduleId: 'foo-module',
moduleOptions,
stats: {
startTime: new Date(),
messages: {
scheduled: 0,
cron: 0,
Expand Down Expand Up @@ -460,6 +463,7 @@ describe('evaluate', () => {
moduleId: 'foo-module',
moduleOptions,
stats: {
startTime: new Date(),
messages: {
scheduled: 0,
cron: 0,
Expand Down Expand Up @@ -570,6 +574,7 @@ describe('evaluate', () => {
moduleId: 'foo-module',
moduleOptions,
stats: {
startTime: new Date(),
messages: {
scheduled: 0,
cron: 0,
Expand Down Expand Up @@ -708,6 +713,7 @@ describe('evaluate', () => {
mostRecentHashChain: 'init-hashchain-123',
moduleOptions,
stats: {
startTime: new Date(),
messages: {
scheduled: 0,
cron: 0,
Expand Down
10 changes: 6 additions & 4 deletions servers/cu/src/domain/lib/loadMessageMeta.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ describe('loadMessageMeta', () => {
assert.deepStrictEqual(args, {
suUrl: 'https://foo.bar',
processId: 'process-123',
messageUid: 'message-tx-123'
messageUid: 'message-tx-123',
body: undefined
})
return { processId: 'process-123', timestamp: 1697574792000, nonce: 1 }
},
Expand All @@ -40,7 +41,7 @@ describe('loadMessageMeta', () => {
logger
})

const res = await loadMessageMeta({ processId: 'process-123', messageUid: 'message-tx-123' })
const res = await loadMessageMeta({ processId: 'process-123', messageUid: 'message-tx-123', body: undefined })
.toPromise()

assert.deepStrictEqual(res, { processId: 'process-123', timestamp: 1697574792000, nonce: 1 })
Expand Down Expand Up @@ -69,7 +70,8 @@ describe('loadMessageMeta', () => {
assert.deepStrictEqual(args, {
suUrl: 'https://from.cache',
processId: 'process-123',
messageUid: 'message-tx-123'
messageUid: 'message-tx-123',
body: undefined
})
return { processId: 'process-123', timestamp: 1697574792000, nonce: 1 }
},
Expand All @@ -88,7 +90,7 @@ describe('loadMessageMeta', () => {
logger
})

await loadMessageMeta({ processId: 'process-123', messageUid: 'message-tx-123' })
await loadMessageMeta({ processId: 'process-123', messageUid: 'message-tx-123', body: undefined })
.toPromise()
})

Expand Down
5 changes: 5 additions & 0 deletions servers/cu/src/domain/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ export const domainConfigSchema = z.object({
* evaluation stream.
*/
EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: positiveIntSchema,
/**
* If a process has this amount of eval time, then it will immediately create a Checkpoint at the end of the
* evaluation stream.
*/
EAGER_CHECKPOINT_EVAL_TIME_THRESHOLD: positiveIntSchema,
/**
* The number of workers to use for evaluating messages
*/
Expand Down
52 changes: 36 additions & 16 deletions servers/cu/src/effects/ao-process.js
Original file line number Diff line number Diff line change
Expand Up @@ -1312,9 +1312,11 @@ export function findLatestProcessMemoryWith ({
}
}

export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD }) {
return async ({ processId, moduleId, assignmentId, messageId, hashChain, timestamp, epoch, nonce, ordinate, cron, blockHeight, Memory, gasUsed }) => {
export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD, EAGER_CHECKPOINT_EVAL_TIME_THRESHOLD }) {
return async ({ processId, moduleId, assignmentId, messageId, hashChain, timestamp, epoch, nonce, ordinate, cron, blockHeight, Memory, gasUsed, evalTime }) => {
const cached = cache.get(processId)
let incrementedGasUsed = pathOr(BigInt(0), ['evaluation', 'gasUsed'], cached)
let incrementedEvalTime = pathOr(0, ['evaluation', 'evalTime'], cached)

/**
* Ensure that we are always caching a Buffer and not a TypedArray
Expand All @@ -1331,7 +1333,6 @@ export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EA
* the value currently cached, so overwrite it
*/

let incrementedGasUsed = pathOr(BigInt(0), ['evaluation', 'gasUsed'], cached)
/**
* The cache is being reseeded ie. Memory was reloaded from a file
*/
Expand All @@ -1353,12 +1354,22 @@ export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EA
processId,
{ messageId, timestamp, ordinate, cron, blockHeight }
)

incrementedGasUsed = pipe(
pathOr(BigInt(0), ['evaluation', 'gasUsed']),
add(gasUsed || BigInt(0))
)(cached)

incrementedEvalTime = pipe(
pathOr(0, ['evaluation', 'evalTime']),
add(evalTime || 0)
)(cached)
}

const gasThresholdReached = incrementedGasUsed && EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD && incrementedGasUsed >= EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD
const evalTimeThresholdReached = incrementedEvalTime && EAGER_CHECKPOINT_EVAL_TIME_THRESHOLD && incrementedEvalTime >= EAGER_CHECKPOINT_EVAL_TIME_THRESHOLD
const eitherThresholdReached = gasThresholdReached || evalTimeThresholdReached

const evaluation = {
processId,
moduleId,
Expand All @@ -1378,37 +1389,46 @@ export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EA
*/
encoding: undefined,
cron,
gasUsed: incrementedGasUsed < EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD ? incrementedGasUsed : 0
gasUsed: eitherThresholdReached ? 0 : incrementedGasUsed,
evalTime: eitherThresholdReached ? 0 : incrementedEvalTime
}
// cache.set(processId, { Memory: zipped, evaluation })
cache.set(processId, { Memory, evaluation })

if (!incrementedGasUsed || !EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD || incrementedGasUsed < EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD) return
if (!eitherThresholdReached) return
/**
* Eagerly create the Checkpoint on the next event queue drain
*/
setImmediate(() => {
if (gasThresholdReached) {
logger(
'Eager Checkpoint Accumulated Gas Threshold of "%d" gas used met when evaluating process "%s" up to "%j" -- "%d" gas used. Eagerly creating a Checkpoint...',
EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD,
processId,
{ messageId, timestamp, ordinate, cron, blockHeight },
incrementedGasUsed
)
} else {
logger(
'Eager Checkpoint Accumulated Eval Time Threshold of "%d" ms eval time met when evaluating process "%s" up to "%j" -- "%d" ms eval time. Eagerly creating a Checkpoint...',
EAGER_CHECKPOINT_EVAL_TIME_THRESHOLD,
processId,
{ messageId, timestamp, ordinate, cron, blockHeight },
incrementedEvalTime
)
}

/**
/**
* Memory will always be defined at this point, so no reason
* to pass File
*/
return saveCheckpoint({ Memory, ...evaluation })
.catch((err) => {
logger(
'Error occurred when creating Eager Checkpoint for evaluation "%j". Skipping...',
evaluation,
err
)
})
})
return saveCheckpoint({ Memory, ...evaluation })
.catch((err) => {
logger(
'Error occurred when creating Eager Checkpoint for evaluation "%j". Skipping...',
evaluation,
err
)
})
}
}

Expand Down
Loading