Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce StreamDone in Stream Writer #1061

Merged
merged 15 commits into from
Oct 18, 2019
Merged

Conversation

ashish-goswami
Copy link
Contributor

@ashish-goswami ashish-goswami commented Oct 3, 2019

This PR introduces, a way to tell StreamWriter to close a stream. Previously Streams were always open until Flush is called on StreamWriter. This resulted in memory utilisation, because of underlying TableBuilder to a sortedWriter.

I used below program to test the change.

package main

import (
	"fmt"
	"io/ioutil"
	"os"

	badger "github.com/dgraph-io/badger"
	"github.com/dgraph-io/badger/pb"
)

func main() {
	dir, err := ioutil.TempDir(".", "badger-test")
	if err != nil {
		panic(err)
	}
	defer func() {
		if err := os.RemoveAll(dir); err != nil {
			panic(err)
		}
	}()
	opts := badger.DefaultOptions(dir)
	db, err := badger.Open(opts)
	if err != nil {
		panic("unable to open db " + err.Error())
	}
	defer func() {
		if err := db.Close(); err != nil {
			panic("unable to close db " + err.Error())
		}
	}()

	sw := db.NewStreamWriter()
	if err := sw.Prepare(); err != nil {
		panic("sw.Prepare() failed " + err.Error())
	}

	var val [10]byte
	for i := 0; i < 50000; i++ {
		list := &pb.KVList{}
		kv1 := &pb.KV{
			Key:      []byte(fmt.Sprintf("%d", i)),
			Value:    val[:],
			Version:  1,
			StreamId: uint32(i),
		}
		kv2 := &pb.KV{
			StreamId:   uint32(i),
			StreamDone: true,
		}
		list.Kv = append(list.Kv, kv1, kv2)
		if err := sw.Write(list); err != nil {
			panic("sw.Write() failed " + err.Error())
		}
		fmt.Println("Stream Done: ", i)
	}

	if err := sw.Flush(); err != nil {
		panic("sw.Flush() failed " + err.Error())
	}
}

Memory usage spikes(RES reach >25GB) in case of master branch. While this PR limits memory usage to very low.

I left above program running for a day(on this PR), it completed around ~800K streams without OOM.


This change is Reviewable

Copy link

@pullrequest pullrequest bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ A review job has been created and sent to the PullRequest network.


@ashish-goswami you can click here to see the review status or cancel the code review job.

@ashish-goswami ashish-goswami requested a review from poonai October 3, 2019 09:44
@coveralls
Copy link

coveralls commented Oct 3, 2019

Coverage Status

Coverage increased (+0.2%) to 77.62% when pulling ffb416a on ashish/stream-writer into b20e8b6 on master.

Copy link
Contributor

@jarifibrahim jarifibrahim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a few comments.

Reviewable status: 0 of 4 files reviewed, 3 unresolved discussions (waiting on @ashish-goswami, @balajijinnah, @jarifibrahim, and @manishrjain)


stream_writer.go, line 33 at r1 (raw file):

var (
	// ErrStreamClosed is returned when a sent is performed on closed stream.

... when data is sent on a closed stream.


stream_writer.go, line 145 at r1 (raw file):

	defer sw.writeLock.Unlock()

	// Check if any of the stream we got records for are closed.

// Check if we've received data on a closed stream.


stream_writer.go, line 171 at r1 (raw file):

	// close any streams if required.
	for streamID := range closedStreams {
		writer, ok := sw.writers[streamID]

If we have a streamID in closedStreamsthat implies that we've recieved some data on that stream. Which also means there should be a writer for it. I would suggest we either log an error or return an error here if we don't find the streamID in sw.writers map.

Copy link

@pullrequest pullrequest bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall changes here look pretty good and thanks for showing the tests that you did to validate. I left some feedback and questions inline.


Reviewed with ❤️ by PullRequest

stream_writer.go Outdated
@@ -85,8 +90,23 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error {
if len(kvs.GetKv()) == 0 {
return nil
}

closedStreams := make(map[uint32]bool)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nitpick: Since you are just using these as basically a set you could use make(map[uint32]struct{}) since struct{} is zero-byte.

stream_writer.go Outdated
return err
}

sw.closedStreams[streamID] = true
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to double check, should these be marked as closed directly after the SignalAndWait() or after the wait.Done()?

stream_writer.go Outdated
// Check if any of the stream we got records for are closed.
for streamID := range streamWithRecords {
if _, ok := sw.closedStreams[streamID]; ok {
return ErrStreamClosed
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there may be a race where two threads are calling Write at the same time and one takes the lock first that is closing the stream and the other one still has a streamWithRecords version of that streamID since the above code is run concurrently and unlocked. I just wanted to check if this is okay to return an error in this case? My guess is that its probably fine, but it may be a slight change in behavior from the current behavior in some cases.

stream_writer.go Outdated
const headStreamID uint32 = math.MaxUint32

var (
// ErrStreamClosed is returned when a sent is performed on closed stream.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this possibly say "when a write is performed" instead of "when a sent is performed"?

stream_writer.go Outdated
@@ -121,6 +141,14 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error {

sw.writeLock.Lock()
defer sw.writeLock.Unlock()

// Check if any of the stream we got records for are closed.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream -> streams

Copy link

@pullrequest pullrequest bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks good, but it might be worth checking the script from the commit message in as well for future testing and verification


Reviewed with ❤️ by PullRequest


// Now we can close any streams if required. We will make writer for
// the closed streams as nil.
for streamId := range closedStreams {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

range var streamId should be streamID (from golint)

Copy link
Contributor Author

@ashish-goswami ashish-goswami left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 4 files reviewed, 8 unresolved discussions (waiting on @ashish-goswami, @balajijinnah, @jarifibrahim, @manishrjain, and @pullrequest[bot])


stream_writer.go, line 33 at r1 (raw file):

Previously, jarifibrahim (Ibrahim Jarif) wrote…

... when data is sent on a closed stream.

I have removed the error.


stream_writer.go, line 94 at r1 (raw file):

Previously, pullrequest[bot] wrote…

minor nitpick: Since you are just using these as basically a set you could use make(map[uint32]struct{}) since struct{} is zero-byte.

Done.


stream_writer.go, line 145 at r1 (raw file):

Previously, jarifibrahim (Ibrahim Jarif) wrote…

// Check if we've received data on a closed stream.

changed the comment.


stream_writer.go, line 145 at r1 (raw file):

Previously, pullrequest[bot] wrote…

stream -> streams

changed the comment.


stream_writer.go, line 148 at r1 (raw file):

Previously, pullrequest[bot] wrote…

It seems like there may be a race where two threads are calling Write at the same time and one takes the lock first that is closing the stream and the other one still has a streamWithRecords version of that streamID since the above code is run concurrently and unlocked. I just wanted to check if this is okay to return an error in this case? My guess is that its probably fine, but it may be a slight change in behavior from the current behavior in some cases.

Right now, we are panicking if sent is done on closed stream. Normally it is expected that single stream is handled by a same thread.


stream_writer.go, line 171 at r1 (raw file):

Previously, jarifibrahim (Ibrahim Jarif) wrote…

If we have a streamID in closedStreamsthat implies that we've recieved some data on that stream. Which also means there should be a writer for it. I would suggest we either log an error or return an error here if we don't find the streamID in sw.writers map.

Not necessarily. We can just get close(StreamDone) without any data also. If it is with data, we will definitely have writer for stream(if not code panics). If no data has been received(just StreamDone) and we don't have any writer for it, we are just logging it.


stream_writer.go, line 181 at r1 (raw file):

Previously, pullrequest[bot] wrote…

Just to double check, should these be marked as closed directly after the SignalAndWait() or after the wait.Done()?

SignalAndWait already waits for it.

Copy link
Contributor

@manishrjain manishrjain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the above program part of the tests. And a comment.

:lgtm_strong:

Reviewed 2 of 4 files at r1, 1 of 2 files at r2, 1 of 1 files at r3.
Reviewable status: all files reviewed, 10 unresolved discussions (waiting on @ashish-goswami, @balajijinnah, @jarifibrahim, and @pullrequest[bot])


stream_writer.go, line 140 at r3 (raw file):

	// We are writing all requests to vlog even if some request belongs to already closed stream.
	// It is safe to do because we are panicing while writing to sorted writer, which will be nil

panicking


stream_writer.go, line 282 at r3 (raw file):

		builder:  table.NewTableBuilder(bopts),
		reqCh:    make(chan *request, 3),
		closer:   closer,

closer: y.NewCloser(1)

Copy link
Contributor

@jarifibrahim jarifibrahim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewable status: all files reviewed, 8 unresolved discussions (waiting on @ashish-goswami, @balajijinnah, @jarifibrahim, @manishrjain, and @pullrequest[bot])


stream_writer.go, line 140 at r3 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

panicking

This needs to be fixed.

Copy link
Contributor Author

@ashish-goswami ashish-goswami left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 2 of 4 files reviewed, 8 unresolved discussions (waiting on @ashish-goswami, @balajijinnah, @jarifibrahim, @manishrjain, and @pullrequest[bot])


stream_writer.go, line 140 at r3 (raw file):

Previously, jarifibrahim (Ibrahim Jarif) wrote…

This needs to be fixed.

Done.


stream_writer.go, line 282 at r3 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

closer: y.NewCloser(1)

Done.

Copy link
Contributor

@jarifibrahim jarifibrahim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewable status: 2 of 5 files reviewed, 7 unresolved discussions (waiting on @ashish-goswami, @balajijinnah, @manishrjain, and @pullrequest[bot])

@ashish-goswami ashish-goswami merged commit 385da91 into master Oct 18, 2019
@ashish-goswami ashish-goswami deleted the ashish/stream-writer branch October 18, 2019 13:35
jarifibrahim pushed a commit that referenced this pull request Mar 12, 2020
This PR introduces, a way to tell StreamWriter to close a stream.
Previously Streams were always open until Flush is called on
StreamWriter. This resulted in memory utilisation, because of underlying
TableBuilder to a sortedWriter. Also closing all sorted writer in single
call resulted in more memory allocation(during Flush()). This can be
useful in some case such as bulk loader in
Dgraph, where only one stream is active at a time.

(cherry picked from commit 385da91)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

5 participants