@@ -10,12 +10,18 @@ import { Credential, CredentialStore } from './credential_store';
1010import { reindexActionsFactory } from './reindex_actions' ;
1111import { ReindexService , reindexServiceFactory } from './reindex_service' ;
1212import { LicensingPluginSetup } from '../../../../licensing/server' ;
13- import { sortAndOrderReindexOperations , queuedOpHasStarted } from './op_utils' ;
13+ import { sortAndOrderReindexOperations , queuedOpHasStarted , isQueuedOp } from './op_utils' ;
1414
1515const POLL_INTERVAL = 30000 ;
1616// If no nodes have been able to update this index in 2 minutes (due to missing credentials), set to paused.
1717const PAUSE_WINDOW = POLL_INTERVAL * 4 ;
1818
19+ /**
20+ * To avoid running the worker loop very tightly and causing a CPU bottleneck we use this
21+ * padding to simulate an asynchronous sleep. See the description of the tight loop below.
22+ */
23+ const WORKER_PADDING_MS = 1000 ;
24+
1925/**
2026 * A singleton worker that will coordinate two polling loops:
2127 * (1) A longer loop that polls for reindex operations that are in progress. If any are found, loop (2) is started.
@@ -102,16 +108,25 @@ export class ReindexWorker {
102108 /**
103109 * Runs an async loop until all inProgress jobs are complete or failed.
104110 */
105- private startUpdateOperationLoop = async ( ) => {
111+ private startUpdateOperationLoop = async ( ) : Promise < void > => {
106112 this . updateOperationLoopRunning = true ;
107-
108113 try {
109114 while ( this . inProgressOps . length > 0 ) {
110115 this . log . debug ( `Updating ${ this . inProgressOps . length } reindex operations` ) ;
111116
112117 // Push each operation through the state machine and refresh.
113118 await Promise . all ( this . inProgressOps . map ( this . processNextStep ) ) ;
119+
114120 await this . refresh ( ) ;
121+
122+ if (
123+ this . inProgressOps . length &&
124+ this . inProgressOps . every ( op => ! this . credentialStore . get ( op ) )
125+ ) {
126+ // TODO: This tight loop needs something to relax potentially high CPU demands so this padding is added.
127+ // This scheduler should be revisited in future.
128+ await new Promise ( resolve => setTimeout ( resolve , WORKER_PADDING_MS ) ) ;
129+ }
115130 }
116131 } finally {
117132 this . updateOperationLoopRunning = false ;
@@ -173,20 +188,32 @@ export class ReindexWorker {
173188 }
174189 } ;
175190
176- private processNextStep = async ( reindexOp : ReindexSavedObject ) => {
191+ private lastCheckedQueuedOpId : string | undefined ;
192+ private processNextStep = async ( reindexOp : ReindexSavedObject ) : Promise < void > => {
177193 const credential = this . credentialStore . get ( reindexOp ) ;
178194
179195 if ( ! credential ) {
180- // Set to paused state if the job hasn't been updated in PAUSE_WINDOW.
196+ // If this is a queued reindex op, and we know there can only ever be one in progress at a
197+ // given time, there is a small chance it may have just reached the front of the queue so
198+ // we give it a chance to be updated by another worker with credentials by making this a
199+ // noop once. If it has not been updated by the next loop we will mark it paused if it
200+ // falls outside of PAUSE_WINDOW.
201+ if ( isQueuedOp ( reindexOp ) ) {
202+ if ( this . lastCheckedQueuedOpId !== reindexOp . id ) {
203+ this . lastCheckedQueuedOpId = reindexOp . id ;
204+ return ;
205+ }
206+ }
181207 // This indicates that no Kibana nodes currently have credentials to update this job.
182208 const now = moment ( ) ;
183209 const updatedAt = moment ( reindexOp . updated_at ) ;
184210 if ( updatedAt < now . subtract ( PAUSE_WINDOW ) ) {
185- return this . reindexService . pauseReindexOperation ( reindexOp . attributes . indexName ) ;
211+ await this . reindexService . pauseReindexOperation ( reindexOp . attributes . indexName ) ;
212+ return ;
186213 } else {
187214 // If it has been updated recently, we assume another node has the necessary credentials,
188215 // and this becomes a noop.
189- return reindexOp ;
216+ return ;
190217 }
191218 }
192219
0 commit comments