Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"code.gitea.io/gitea/modules/private"
repo_module "code.gitea.io/gitea/modules/repository"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util"

"github.com/urfave/cli"
)
Expand Down Expand Up @@ -141,7 +140,7 @@ func (d *delayWriter) Close() error {
if d == nil {
return nil
}
stopped := util.StopTimer(d.timer)
stopped := d.timer.Stop()
if stopped || d.buf == nil {
return nil
}
Expand Down
1 change: 1 addition & 0 deletions models/unittest/testdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func MainTest(m *testing.M, testOpts *TestOptions) {
}

if err = CreateTestEngine(opts); err != nil {
_, _ = fmt.Fprintln(os.Stderr, `sqlite3 requires: import _ "github.com/mattn/go-sqlite3" or -tags sqlite,sqlite_unlock_notify`)
fatalTestError("Error creating test engine: %v\n", err)
}

Expand Down
4 changes: 0 additions & 4 deletions modules/indexer/code/bleve.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,6 @@ func (b *BleveIndexer) Close() {
log.Info("PID: %d Repository Indexer closed", os.Getpid())
}

// SetAvailabilityChangeCallback does nothing
func (b *BleveIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
}

// Ping does nothing
func (b *BleveIndexer) Ping() bool {
return true
Expand Down
22 changes: 5 additions & 17 deletions modules/indexer/code/elastic_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,11 @@ var _ Indexer = &ElasticSearchIndexer{}

// ElasticSearchIndexer implements Indexer interface
type ElasticSearchIndexer struct {
client *elastic.Client
indexerAliasName string
available bool
availabilityCallback func(bool)
stopTimer chan struct{}
lock sync.RWMutex
client *elastic.Client
indexerAliasName string
available bool
stopTimer chan struct{}
lock sync.RWMutex
}

type elasticLogger struct {
Expand Down Expand Up @@ -198,13 +197,6 @@ func (b *ElasticSearchIndexer) init() (bool, error) {
return exists, nil
}

// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
b.lock.Lock()
defer b.lock.Unlock()
b.availabilityCallback = callback
}

// Ping checks if elastic is available
func (b *ElasticSearchIndexer) Ping() bool {
b.lock.RLock()
Expand Down Expand Up @@ -529,8 +521,4 @@ func (b *ElasticSearchIndexer) setAvailability(available bool) {
}

b.available = available
if b.availabilityCallback != nil {
// Call the callback from within the lock to ensure that the ordering remains correct
b.availabilityCallback(b.available)
}
}
56 changes: 26 additions & 30 deletions modules/indexer/code/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type SearchResultLanguages struct {
// Indexer defines an interface to index and search code contents
type Indexer interface {
Ping() bool
SetAvailabilityChangeCallback(callback func(bool))
Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error
Delete(repoID int64) error
Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error)
Expand Down Expand Up @@ -81,7 +80,7 @@ type IndexerData struct {
RepoID int64
}

var indexerQueue queue.UniqueQueue
var indexerQueue *queue.WorkerPoolQueue[*IndexerData]

func index(ctx context.Context, indexer Indexer, repoID int64) error {
repo, err := repo_model.GetRepositoryByID(ctx, repoID)
Expand Down Expand Up @@ -137,37 +136,46 @@ func Init() {
// Create the Queue
switch setting.Indexer.RepoType {
case "bleve", "elasticsearch":
handler := func(data ...queue.Data) []queue.Data {
handler := func(items ...*IndexerData) []*IndexerData {
idx, err := indexer.get()
if idx == nil || err != nil {
log.Error("Codes indexer handler: unable to get indexer!")
return data
return items
}
if !idx.Ping() {
log.Error("Code indexer handler: indexer is unavailable.")
return items
}

unhandled := make([]queue.Data, 0, len(data))
for _, datum := range data {
indexerData, ok := datum.(*IndexerData)
if !ok {
log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum)
continue
}
// the old logic did: if indexer.Ping() { return nil }, skip all failed items

for _, indexerData := range items {
log.Trace("IndexerData Process Repo: %d", indexerData.RepoID)

// FIXME: it seems there is a bug in `CatFileBatch` or `nio.Pipe`, which will cause the process to hang forever in rare cases
/*
sync.(*Cond).Wait(cond.go:70)
github.com/djherbis/nio/v3.(*PipeReader).Read(sync.go:106)
bufio.(*Reader).fill(bufio.go:106)
bufio.(*Reader).ReadSlice(bufio.go:372)
bufio.(*Reader).collectFragments(bufio.go:447)
bufio.(*Reader).ReadString(bufio.go:494)
code.gitea.io/gitea/modules/git.ReadBatchLine(batch_reader.go:149)
code.gitea.io/gitea/modules/indexer/code.(*BleveIndexer).addUpdate(bleve.go:214)
code.gitea.io/gitea/modules/indexer/code.(*BleveIndexer).Index(bleve.go:296)
code.gitea.io/gitea/modules/indexer/code.(*wrappedIndexer).Index(wrapped.go:74)
code.gitea.io/gitea/modules/indexer/code.index(indexer.go:105)
*/
if err := index(ctx, indexer, indexerData.RepoID); err != nil {
if !setting.IsInTesting {
log.Error("indexer index error for repo %v: %v", indexerData.RepoID, err)
}
if indexer.Ping() {
continue
}
// Add back to queue
unhandled = append(unhandled, datum)
}
}
return unhandled
return nil
}

indexerQueue = queue.CreateUniqueQueue("code_indexer", handler, &IndexerData{})
indexerQueue = queue.CreateUniqueQueue("code_indexer", handler)
if indexerQueue == nil {
log.Fatal("Unable to create codes indexer queue")
}
Expand Down Expand Up @@ -224,18 +232,6 @@ func Init() {

indexer.set(rIndexer)

if queue, ok := indexerQueue.(queue.Pausable); ok {
rIndexer.SetAvailabilityChangeCallback(func(available bool) {
if !available {
log.Info("Code index queue paused")
queue.Pause()
} else {
log.Info("Code index queue resumed")
queue.Resume()
}
})
}

// Start processing the queue
go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run)

Expand Down
10 changes: 0 additions & 10 deletions modules/indexer/code/wrapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,6 @@ func (w *wrappedIndexer) get() (Indexer, error) {
return w.internal, nil
}

// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
func (w *wrappedIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
indexer, err := w.get()
if err != nil {
log.Error("Failed to get indexer: %v", err)
return
}
indexer.SetAvailabilityChangeCallback(callback)
}

// Ping checks if elastic is available
func (w *wrappedIndexer) Ping() bool {
indexer, err := w.get()
Expand Down
4 changes: 0 additions & 4 deletions modules/indexer/issues/bleve.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,6 @@ func (b *BleveIndexer) Init() (bool, error) {
return false, err
}

// SetAvailabilityChangeCallback does nothing
func (b *BleveIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
}

// Ping does nothing
func (b *BleveIndexer) Ping() bool {
return true
Expand Down
4 changes: 0 additions & 4 deletions modules/indexer/issues/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ func (i *DBIndexer) Init() (bool, error) {
return false, nil
}

// SetAvailabilityChangeCallback dummy function
func (i *DBIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
}

// Ping checks if database is available
func (i *DBIndexer) Ping() bool {
return db.GetEngine(db.DefaultContext).Ping() != nil
Expand Down
22 changes: 5 additions & 17 deletions modules/indexer/issues/elastic_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ var _ Indexer = &ElasticSearchIndexer{}

// ElasticSearchIndexer implements Indexer interface
type ElasticSearchIndexer struct {
client *elastic.Client
indexerName string
available bool
availabilityCallback func(bool)
stopTimer chan struct{}
lock sync.RWMutex
client *elastic.Client
indexerName string
available bool
stopTimer chan struct{}
lock sync.RWMutex
}

type elasticLogger struct {
Expand Down Expand Up @@ -138,13 +137,6 @@ func (b *ElasticSearchIndexer) Init() (bool, error) {
return true, nil
}

// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
b.lock.Lock()
defer b.lock.Unlock()
b.availabilityCallback = callback
}

// Ping checks if elastic is available
func (b *ElasticSearchIndexer) Ping() bool {
b.lock.RLock()
Expand Down Expand Up @@ -305,8 +297,4 @@ func (b *ElasticSearchIndexer) setAvailability(available bool) {
}

b.available = available
if b.availabilityCallback != nil {
// Call the callback from within the lock to ensure that the ordering remains correct
b.availabilityCallback(b.available)
}
}
69 changes: 17 additions & 52 deletions modules/indexer/issues/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type SearchResult struct {
type Indexer interface {
Init() (bool, error)
Ping() bool
SetAvailabilityChangeCallback(callback func(bool))
Index(issue []*IndexerData) error
Delete(ids ...int64) error
Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
Expand Down Expand Up @@ -94,7 +93,7 @@ func (h *indexerHolder) get() Indexer {

var (
// issueIndexerQueue queue of issue ids to be updated
issueIndexerQueue queue.Queue
issueIndexerQueue *queue.WorkerPoolQueue[*IndexerData]
holder = newIndexerHolder()
)

Expand All @@ -108,62 +107,43 @@ func InitIssueIndexer(syncReindex bool) {
// Create the Queue
switch setting.Indexer.IssueType {
case "bleve", "elasticsearch", "meilisearch":
handler := func(data ...queue.Data) []queue.Data {
handler := func(items ...*IndexerData) []*IndexerData {
indexer := holder.get()
if indexer == nil {
log.Error("Issue indexer handler: unable to get indexer!")
return data
log.Error("Issue indexer handler: unable to get indexer.")
return items
}
if !indexer.Ping() {
log.Error("Issue indexer handler: indexer is unavailable.")
return items
}

iData := make([]*IndexerData, 0, len(data))
unhandled := make([]queue.Data, 0, len(data))
for _, datum := range data {
indexerData, ok := datum.(*IndexerData)
if !ok {
log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum)
continue
}
// the old logic did: if indexer.Ping() { return nil }, skip all failed items

toIndex := make([]*IndexerData, 0, len(items))
for _, indexerData := range items {
log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete)
if indexerData.IsDelete {
if err := indexer.Delete(indexerData.IDs...); err != nil {
log.Error("Error whilst deleting from index: %v Error: %v", indexerData.IDs, err)
if indexer.Ping() {
continue
}
// Add back to queue
unhandled = append(unhandled, datum)
}
continue
}
iData = append(iData, indexerData)
toIndex = append(toIndex, indexerData)
}
if len(unhandled) > 0 {
for _, indexerData := range iData {
unhandled = append(unhandled, indexerData)
}
return unhandled
}
if err := indexer.Index(iData); err != nil {
log.Error("Error whilst indexing: %v Error: %v", iData, err)
if indexer.Ping() {
return nil
}
// Add back to queue
for _, indexerData := range iData {
unhandled = append(unhandled, indexerData)
}
return unhandled
if err := indexer.Index(toIndex); err != nil {
log.Error("Error whilst indexing: %v Error: %v", toIndex, err)
}
return nil
}

issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{})
issueIndexerQueue = queue.CreateSimpleQueue("issue_indexer", handler)

if issueIndexerQueue == nil {
log.Fatal("Unable to create issue indexer queue")
}
default:
issueIndexerQueue = &queue.DummyQueue{}
issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData]("issue_indexer", nil)
}

// Create the Indexer
Expand Down Expand Up @@ -240,18 +220,6 @@ func InitIssueIndexer(syncReindex bool) {
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
}

if queue, ok := issueIndexerQueue.(queue.Pausable); ok {
holder.get().SetAvailabilityChangeCallback(func(available bool) {
if !available {
log.Info("Issue index queue paused")
queue.Pause()
} else {
log.Info("Issue index queue resumed")
queue.Resume()
}
})
}

// Start processing the queue
go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)

Expand Down Expand Up @@ -285,9 +253,6 @@ func InitIssueIndexer(syncReindex bool) {
case <-graceful.GetManager().IsShutdown():
log.Warn("Shutdown occurred before issue index initialisation was complete")
case <-time.After(timeout):
if shutdownable, ok := issueIndexerQueue.(queue.Shutdownable); ok {
shutdownable.Terminate()
}
log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout)
}
}()
Expand Down
Loading