From f36daf538d71baed9d23c4b1c187c01115e73634 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 25 Nov 2020 17:31:49 +0530 Subject: [PATCH 1/2] fix(stream): Stop produceKVs on error (#1604) The orchestrate function would get blocked forever if send function returned an error. The produceKv go routines would also get blocked since the size of the error chan was 1. --- stream.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/stream.go b/stream.go index 63e949166..e857324c0 100644 --- a/stream.go +++ b/stream.go @@ -398,6 +398,8 @@ outer: // are serial. In case any of these steps encounter an error, Orchestrate would stop execution and // return that error. Orchestrate can be called multiple times, but in serial order. func (st *Stream) Orchestrate(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() defer func() { for _, a := range st.allocators { // Using AllocatorFrom is better because if the allocator is already freed up, it would @@ -421,7 +423,7 @@ func (st *Stream) Orchestrate(ctx context.Context) error { // Picks up ranges from Badger, and sends them to rangeCh. go st.produceRanges(ctx) - errCh := make(chan error, 1) // Stores error by consumeKeys. + errCh := make(chan error, st.NumGo) // Stores error by consumeKeys. var wg sync.WaitGroup for i := 0; i < st.NumGo; i++ { wg.Add(1) @@ -442,7 +444,11 @@ func (st *Stream) Orchestrate(ctx context.Context) error { kvErr := make(chan error, 1) go func() { // Picks up KV lists from kvChan, and sends them to Output. - kvErr <- st.streamKVs(ctx) + err := st.streamKVs(ctx) + if err != nil { + cancel() // Stop all the go routines. + } + kvErr <- err }() wg.Wait() // Wait for produceKVs to be over. close(st.kvChan) // Now we can close kvChan. From 925e15b8c467f8555dd08b9d4ce3ab9a513a1e1a Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 25 Nov 2020 17:59:19 +0530 Subject: [PATCH 2/2] Fix race condition in L0StallMs variable (#1605) This fixes two issues - Atomic variable was not being accessed correctly - Atomic variable should be the first member of the struct to ensure proper alignment. Failure to do so will cause a segmentation fault. Fixes DGRAPH-2773 --- db.go | 2 +- levels.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/db.go b/db.go index eb8a3b85b..3065837f4 100644 --- a/db.go +++ b/db.go @@ -477,7 +477,7 @@ func (db *DB) IsClosed() bool { func (db *DB) close() (err error) { db.opt.Debugf("Closing database") - db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(db.lc.l0stallsMs)) + db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(atomic.LoadInt64(&db.lc.l0stallsMs))) atomic.StoreInt32(&db.blockWrites, 1) diff --git a/levels.go b/levels.go index 0b3950817..49f7ccbe8 100644 --- a/levels.go +++ b/levels.go @@ -41,13 +41,13 @@ import ( type levelsController struct { nextFileID uint64 // Atomic + l0stallsMs int64 // Atomic // The following are initialized once and const. levels []*levelHandler kv *DB - cstatus compactStatus - l0stallsMs int64 + cstatus compactStatus } // revertToManifest checks that all necessary table files exist and removes all table files not