55package issues
66
77import (
8- "fmt "
8+ "sync "
99
1010 "code.gitea.io/gitea/models"
1111 "code.gitea.io/gitea/modules/log"
@@ -46,77 +46,98 @@ type Indexer interface {
4646}
4747
4848var (
49+ issueIndexerChannel = make (chan * IndexerData , setting .Indexer .UpdateQueueLength )
4950 // issueIndexerQueue queue of issue ids to be updated
5051 issueIndexerQueue Queue
5152 issueIndexer Indexer
53+ wg sync.WaitGroup
5254)
5355
5456// InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until
5557// all issue index done.
56- func InitIssueIndexer (syncReindex bool ) error {
57- var populate bool
58- var dummyQueue bool
59- switch setting .Indexer .IssueType {
60- case "bleve" :
61- issueIndexer = NewBleveIndexer (setting .Indexer .IssuePath )
62- exist , err := issueIndexer .Init ()
63- if err != nil {
64- return err
58+ func InitIssueIndexer (syncReindex bool ) {
59+ wg .Add (1 )
60+ go func () {
61+ var populate bool
62+ var dummyQueue bool
63+ switch setting .Indexer .IssueType {
64+ case "bleve" :
65+ issueIndexer = NewBleveIndexer (setting .Indexer .IssuePath )
66+ exist , err := issueIndexer .Init ()
67+ if err != nil {
68+ log .Fatal ("Unable to initialise issueIndexer: %v" , err )
69+ }
70+ populate = ! exist
71+ case "db" :
72+ issueIndexer = & DBIndexer {}
73+ dummyQueue = true
74+ default :
75+ log .Fatal ("Unknown issue indexer type: %s" , setting .Indexer .IssueType )
6576 }
66- populate = ! exist
67- case "db" :
68- issueIndexer = & DBIndexer {}
69- dummyQueue = true
70- default :
71- return fmt .Errorf ("unknow issue indexer type: %s" , setting .Indexer .IssueType )
72- }
7377
74- if dummyQueue {
75- issueIndexerQueue = & DummyQueue {}
76- return nil
77- }
78+ if dummyQueue {
79+ issueIndexerQueue = & DummyQueue {}
80+ } else {
81+ var err error
82+ switch setting .Indexer .IssueQueueType {
83+ case setting .LevelQueueType :
84+ issueIndexerQueue , err = NewLevelQueue (
85+ issueIndexer ,
86+ setting .Indexer .IssueQueueDir ,
87+ setting .Indexer .IssueQueueBatchNumber )
88+ if err != nil {
89+ log .Fatal (
90+ "Unable create level queue for issue queue dir: %s batch number: %d : %v" ,
91+ setting .Indexer .IssueQueueDir ,
92+ setting .Indexer .IssueQueueBatchNumber ,
93+ err )
94+ }
95+ case setting .ChannelQueueType :
96+ issueIndexerQueue = NewChannelQueue (issueIndexer , setting .Indexer .IssueQueueBatchNumber )
97+ case setting .RedisQueueType :
98+ addrs , pass , idx , err := parseConnStr (setting .Indexer .IssueQueueConnStr )
99+ if err != nil {
100+ log .Fatal ("Unable to parse connection string for RedisQueueType: %s : %v" ,
101+ setting .Indexer .IssueQueueConnStr ,
102+ err )
103+ }
104+ issueIndexerQueue , err = NewRedisQueue (addrs , pass , idx , issueIndexer , setting .Indexer .IssueQueueBatchNumber )
105+ if err != nil {
106+ log .Fatal ("Unable to create RedisQueue: %s : %v" ,
107+ setting .Indexer .IssueQueueConnStr ,
108+ err )
109+ }
110+ default :
111+ log .Fatal ("Unsupported indexer queue type: %v" ,
112+ setting .Indexer .IssueQueueType )
113+ }
78114
79- var err error
80- switch setting .Indexer .IssueQueueType {
81- case setting .LevelQueueType :
82- issueIndexerQueue , err = NewLevelQueue (
83- issueIndexer ,
84- setting .Indexer .IssueQueueDir ,
85- setting .Indexer .IssueQueueBatchNumber )
86- if err != nil {
87- return err
88- }
89- case setting .ChannelQueueType :
90- issueIndexerQueue = NewChannelQueue (issueIndexer , setting .Indexer .IssueQueueBatchNumber )
91- case setting .RedisQueueType :
92- addrs , pass , idx , err := parseConnStr (setting .Indexer .IssueQueueConnStr )
93- if err != nil {
94- return err
95- }
96- issueIndexerQueue , err = NewRedisQueue (addrs , pass , idx , issueIndexer , setting .Indexer .IssueQueueBatchNumber )
97- if err != nil {
98- return err
115+ go func () {
116+ err = issueIndexerQueue .Run ()
117+ if err != nil {
118+ log .Error ("issueIndexerQueue.Run: %v" , err )
119+ }
120+ }()
99121 }
100- default :
101- return fmt .Errorf ("Unsupported indexer queue type: %v" , setting .Indexer .IssueQueueType )
102- }
103122
104- go func () {
105- err = issueIndexerQueue .Run ()
106- if err != nil {
107- log .Error ("issueIndexerQueue.Run: %v" , err )
108- }
109- }()
123+ go func () {
124+ for data := range issueIndexerChannel {
125+ _ = issueIndexerQueue .Push (data )
126+ }
127+ }()
110128
111- if populate {
112- if syncReindex {
113- populateIssueIndexer ()
114- } else {
115- go populateIssueIndexer ()
129+ if populate {
130+ if syncReindex {
131+ populateIssueIndexer ()
132+ } else {
133+ go populateIssueIndexer ()
134+ }
116135 }
136+ wg .Done ()
137+ }()
138+ if syncReindex {
139+ wg .Wait ()
117140 }
118-
119- return nil
120141}
121142
122143// populateIssueIndexer populate the issue indexer with issue data
@@ -166,13 +187,13 @@ func UpdateIssueIndexer(issue *models.Issue) {
166187 comments = append (comments , comment .Content )
167188 }
168189 }
169- _ = issueIndexerQueue . Push ( & IndexerData {
190+ issueIndexerChannel <- & IndexerData {
170191 ID : issue .ID ,
171192 RepoID : issue .RepoID ,
172193 Title : issue .Title ,
173194 Content : issue .Content ,
174195 Comments : comments ,
175- })
196+ }
176197}
177198
178199// DeleteRepoIssueIndexer deletes repo's all issues indexes
@@ -188,14 +209,15 @@ func DeleteRepoIssueIndexer(repo *models.Repository) {
188209 return
189210 }
190211
191- _ = issueIndexerQueue . Push ( & IndexerData {
212+ issueIndexerChannel <- & IndexerData {
192213 IDs : ids ,
193214 IsDelete : true ,
194- })
215+ }
195216}
196217
197218// SearchIssuesByKeyword search issue ids by keywords and repo id
198219func SearchIssuesByKeyword (repoID int64 , keyword string ) ([]int64 , error ) {
220+ wg .Wait ()
199221 var issueIDs []int64
200222 res , err := issueIndexer .Search (keyword , repoID , 1000 , 0 )
201223 if err != nil {
0 commit comments