@@ -102,15 +102,15 @@ var (
102102func InitIssueIndexer (syncReindex bool ) {
103103 ctx , _ , finished := process .GetManager ().AddTypedContext (context .Background (), "Service: IssueIndexer" , process .SystemProcessType , false )
104104
105- waitChannel := make (chan time.Duration , 1 )
105+ indexerInitWaitChannel := make (chan time.Duration , 1 )
106106
107107 // Create the Queue
108108 switch setting .Indexer .IssueType {
109109 case "bleve" , "elasticsearch" , "meilisearch" :
110110 handler := func (items ... * IndexerData ) (unhandled []* IndexerData ) {
111111 indexer := holder .get ()
112112 if indexer == nil {
113- log .Error ("Issue indexer handler: unable to get indexer ." )
113+ log .Warn ("Issue indexer handler: indexer is not ready, retry later ." )
114114 return items
115115 }
116116 toIndex := make ([]* IndexerData , 0 , len (items ))
@@ -138,15 +138,17 @@ func InitIssueIndexer(syncReindex bool) {
138138 return unhandled
139139 }
140140
141- issueIndexerQueue = queue .CreateSimpleQueue ("issue_indexer" , handler )
141+ issueIndexerQueue = queue .CreateSimpleQueue (ctx , "issue_indexer" , handler )
142142
143143 if issueIndexerQueue == nil {
144144 log .Fatal ("Unable to create issue indexer queue" )
145145 }
146146 default :
147- issueIndexerQueue = queue .CreateSimpleQueue [* IndexerData ]("issue_indexer" , nil )
147+ issueIndexerQueue = queue .CreateSimpleQueue [* IndexerData ](ctx , "issue_indexer" , nil )
148148 }
149149
150+ graceful .GetManager ().RunAtTerminate (finished )
151+
150152 // Create the Indexer
151153 go func () {
152154 pprof .SetGoroutineLabels (ctx )
@@ -178,51 +180,41 @@ func InitIssueIndexer(syncReindex bool) {
178180 if issueIndexer != nil {
179181 issueIndexer .Close ()
180182 }
181- finished ()
182183 log .Info ("PID: %d Issue Indexer closed" , os .Getpid ())
183184 })
184185 log .Debug ("Created Bleve Indexer" )
185186 case "elasticsearch" :
186- graceful .GetManager ().RunWithShutdownFns (func (_ , atTerminate func (func ())) {
187- pprof .SetGoroutineLabels (ctx )
188- issueIndexer , err := NewElasticSearchIndexer (setting .Indexer .IssueConnStr , setting .Indexer .IssueIndexerName )
189- if err != nil {
190- log .Fatal ("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v" , setting .Indexer .IssueConnStr , err )
191- }
192- exist , err := issueIndexer .Init ()
193- if err != nil {
194- log .Fatal ("Unable to issueIndexer.Init with connection %s Error: %v" , setting .Indexer .IssueConnStr , err )
195- }
196- populate = ! exist
197- holder .set (issueIndexer )
198- atTerminate (finished )
199- })
187+ issueIndexer , err := NewElasticSearchIndexer (setting .Indexer .IssueConnStr , setting .Indexer .IssueIndexerName )
188+ if err != nil {
189+ log .Fatal ("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v" , setting .Indexer .IssueConnStr , err )
190+ }
191+ exist , err := issueIndexer .Init ()
192+ if err != nil {
193+ log .Fatal ("Unable to issueIndexer.Init with connection %s Error: %v" , setting .Indexer .IssueConnStr , err )
194+ }
195+ populate = ! exist
196+ holder .set (issueIndexer )
200197 case "db" :
201198 issueIndexer := & DBIndexer {}
202199 holder .set (issueIndexer )
203- graceful .GetManager ().RunAtTerminate (finished )
204200 case "meilisearch" :
205- graceful .GetManager ().RunWithShutdownFns (func (_ , atTerminate func (func ())) {
206- pprof .SetGoroutineLabels (ctx )
207- issueIndexer , err := NewMeilisearchIndexer (setting .Indexer .IssueConnStr , setting .Indexer .IssueConnAuth , setting .Indexer .IssueIndexerName )
208- if err != nil {
209- log .Fatal ("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v" , setting .Indexer .IssueConnStr , err )
210- }
211- exist , err := issueIndexer .Init ()
212- if err != nil {
213- log .Fatal ("Unable to issueIndexer.Init with connection %s Error: %v" , setting .Indexer .IssueConnStr , err )
214- }
215- populate = ! exist
216- holder .set (issueIndexer )
217- atTerminate (finished )
218- })
201+ issueIndexer , err := NewMeilisearchIndexer (setting .Indexer .IssueConnStr , setting .Indexer .IssueConnAuth , setting .Indexer .IssueIndexerName )
202+ if err != nil {
203+ log .Fatal ("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v" , setting .Indexer .IssueConnStr , err )
204+ }
205+ exist , err := issueIndexer .Init ()
206+ if err != nil {
207+ log .Fatal ("Unable to issueIndexer.Init with connection %s Error: %v" , setting .Indexer .IssueConnStr , err )
208+ }
209+ populate = ! exist
210+ holder .set (issueIndexer )
219211 default :
220212 holder .cancel ()
221213 log .Fatal ("Unknown issue indexer type: %s" , setting .Indexer .IssueType )
222214 }
223215
224216 // Start processing the queue
225- go graceful .GetManager ().RunWithShutdownFns (issueIndexerQueue . Run )
217+ go graceful .GetManager ().RunWithCancel (issueIndexerQueue )
226218
227219 // Populate the index
228220 if populate {
@@ -232,13 +224,14 @@ func InitIssueIndexer(syncReindex bool) {
232224 go graceful .GetManager ().RunWithShutdownContext (populateIssueIndexer )
233225 }
234226 }
235- waitChannel <- time .Since (start )
236- close (waitChannel )
227+
228+ indexerInitWaitChannel <- time .Since (start )
229+ close (indexerInitWaitChannel )
237230 }()
238231
239232 if syncReindex {
240233 select {
241- case <- waitChannel :
234+ case <- indexerInitWaitChannel :
242235 case <- graceful .GetManager ().IsShutdown ():
243236 }
244237 } else if setting .Indexer .StartupTimeout > 0 {
@@ -249,7 +242,7 @@ func InitIssueIndexer(syncReindex bool) {
249242 timeout += setting .GracefulHammerTime
250243 }
251244 select {
252- case duration := <- waitChannel :
245+ case duration := <- indexerInitWaitChannel :
253246 log .Info ("Issue Indexer Initialization took %v" , duration )
254247 case <- graceful .GetManager ().IsShutdown ():
255248 log .Warn ("Shutdown occurred before issue index initialisation was complete" )
0 commit comments