-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support bulk load of JSON data. #2799
Conversation
… code requires schema.
dgraph/cmd/bulk/loader.go
Outdated
|
||
// The map should be followed by either the ',' between array elements, or the ']' | ||
// at the end of the array. | ||
skipSpace(r) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error return value of skipSpace
is not checked (from errcheck
)
dgraph/cmd/bulk/loader.go
Outdated
if ch == ']' { | ||
err = io.EOF | ||
} else if ch != ',' { | ||
r.UnreadRune() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error return value of r.UnreadRune
is not checked (from errcheck
)
edgraph/nquads_from_json.go
Outdated
|
||
func NquadsFromJson(b []byte) ([]*api.NQuad, error) { | ||
return nquadsFromJson(b, set) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File is not goimports
-ed (from goimports
)
dgraph/cmd/bulk/loader.go
Outdated
@@ -22,6 +22,7 @@ import ( | |||
"compress/gzip" | |||
"context" | |||
"fmt" | |||
"github.com/pkg/errors" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File is not goimports
-ed (from goimports
)
dgraph/cmd/bulk/mapper.go
Outdated
@@ -20,6 +20,7 @@ import ( | |||
"bytes" | |||
"encoding/binary" | |||
"fmt" | |||
"github.com/dgraph-io/dgraph/edgraph" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File is not goimports
-ed (from goimports
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 5 files reviewed, 11 unresolved discussions (waiting on @golangcibot, @codexnull, and @manishrjain)
dgraph/cmd/bulk/loader.go, line 105 at r4 (raw file):
shards: newShardMap(opt.MapShards), // Lots of gz readers, so not much channel buffer needed. readerChunkCh: make(chan *bytes.Buffer, opt.NumGoroutines),
I love these small touches. They make the code more readable. Thanks for that.
dgraph/cmd/bulk/loader.go, line 206 at r4 (raw file):
} ch, _, err := r.ReadRune()
lex/lexer.go is designed to help with this. Though, is there no Go streaming JSON parsing lib?
dgraph/cmd/bulk/loader.go, line 209 at r4 (raw file):
if err != nil && err != io.EOF { return err } else if ch != '[' {
lex.AcceptUntil(...)
can replace this function.
dgraph/cmd/bulk/loader.go, line 228 at r4 (raw file):
} ch, _, err := r.ReadRune()
Can use the lexer here as well.
dgraph/cmd/bulk/loader.go, line 266 at r4 (raw file):
} case '"': if !quoted || (quoted && pch != '\\') {
LexQuotedString
func can slurp everything until the end quote.
dgraph/cmd/bulk/mapper.go, line 148 at r4 (raw file):
// process JSON chunk str, err := chunkBuf.ReadBytes(0) nquads, err := edgraph.NquadsFromJson(str)
I think this code could benefit from a small interface abstraction. Takes in a chunkBuf and spits out the NQuads. Then we can create one for RDF, one for JSON, and test separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 5 files reviewed, 11 unresolved discussions (waiting on @golangcibot, @codexnull, and @manishrjain)
dgraph/cmd/bulk/loader.go, line 206 at r4 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
lex/lexer.go is designed to help with this. Though, is there no Go streaming JSON parsing lib?
I'll take a look at lexer.go.
The streaming JSON parsers that I found parse it into Go data structures, but the nquadsFromJSON code expects a byte array. I couldn't find any that returned just a slice of the text.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 5 files reviewed, 11 unresolved discussions (waiting on @golangcibot, @codexnull, and @manishrjain)
dgraph/cmd/bulk/loader.go, line 206 at r4 (raw file):
Previously, codexnull (Javier Alvarado) wrote…
I'll take a look at lexer.go.
The streaming JSON parsers that I found parse it into Go data structures, but the nquadsFromJSON code expects a byte array. I couldn't find any that returned just a slice of the text.
Am I correct that lex/lexer also requires the entire input to be parsed to be in memory? It looks like the input is just a string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 5 files reviewed, 12 unresolved discussions (waiting on @golangcibot, @codexnull, and @manishrjain)
dgraph/cmd/bulk/loader.go, line 195 at r4 (raw file):
} return nil
It looks like this function can never return a non-nil error.
Maybe the err returned from ReadRune should be checked within and after the for loop.
edgraph/nquads_from_json.go, line 426 at r2 (raw file):
Previously, golangcibot (Bot from GolangCI) wrote…
File is not
goimports
-ed (fromgoimports
)
Instead of creating this new function. Do you think it's better to expose the current one by capitalizing it, and also exposing the Set and Delete operations?
dgraph/cmd/bulk/mapper.go
Outdated
} | ||
|
||
str := make([]byte, chunkBuf.Len()) | ||
chunkBuf.Read(str) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error return value of chunkBuf.Read
is not checked (from errcheck
)
dgraph/cmd/bulk/mapper.go
Outdated
return nil, errors.Wrapf(err, "while parsing line %q", str), done | ||
} | ||
|
||
return []gql.NQuad{ gql.NQuad{NQuad: &nq} }, nil, done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File is not goimports
-ed (from goimports
)
dgraph/cmd/bulk/run.go
Outdated
@@ -89,6 +92,7 @@ func init() { | |||
func run() { | |||
opt := options{ | |||
RDFDir: Bulk.Conf.GetString("rdfs"), | |||
JSONDir: Bulk.Conf.GetString("jsons"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File is not goimports
-ed (from goimports
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code needs more work. Can you encapsulate all the lexing code within a struct which parses JSON, and have another struct which parses the RDFs. What you're doing with enums can be achieved with interfaces. https://gobyexample.com/interfaces
My other concern about this PR is the lexing code. If lexer class can't be used, then maybe copy over the relevant parts and write a lot of tests around edge cases. I'd need those tests to gain confidence in how the lexing is happening.
@srfrog : Could you help review this PR?
Reviewable status: 0 of 6 files reviewed, 23 unresolved discussions (waiting on @golangcibot and @codexnull)
.gitignore, line 16 at r4 (raw file):
# vim session backups .*.swp
lol. I set vim to noswap. I don't see the point of swap files (maybe I don't know what I'm missing).
dgraph/cmd/bulk/loader.go, line 206 at r4 (raw file):
Previously, codexnull (Javier Alvarado) wrote…
Am I correct that lex/lexer also requires the entire input to be parsed to be in memory? It looks like the input is just a string.
Hmm... Yeah. Looks like that won't be directly applicable.
dgraph/cmd/bulk/loader.go, line 379 at r5 (raw file):
defer thr.Done() if loaderType == jsonLoader { if err := readJSONPreChunk(r); err != nil {
Do we still need this?
dgraph/cmd/bulk/mapper.go, line 122 at r5 (raw file):
} func parseRDFToNquads(chunkBuf *bytes.Buffer) ([]gql.NQuad, error, bool) {
No need for error and a bool. done is in fact, io.EOF and can be returned as such.
if err == io.EOF { return err }
dgraph/cmd/bulk/mapper.go, line 126 at r5 (raw file):
str, err := chunkBuf.ReadString('\n') if err != nil {
You don't need the outer if err != nil.
dgraph/cmd/bulk/mapper.go, line 136 at r5 (raw file):
nq, err := rdf.Parse(strings.TrimSpace(str)) if err == rdf.ErrEmpty { return []gql.NQuad{}, nil, done
return nil, io.EOF
Slice can be nil.
dgraph/cmd/bulk/mapper.go, line 144 at r5 (raw file):
} func parseJSONToNquads(chunkBuf *bytes.Buffer) ([]gql.NQuad, error, bool) {
Same changes as above here.
dgraph/cmd/bulk/mapper.go, line 151 at r5 (raw file):
} str := make([]byte, chunkBuf.Len())
bytes.NewBuffer(with string input)
dgraph/cmd/bulk/mapper.go, line 179 at r5 (raw file):
switch loaderType { case rdfLoader: nqs, err, done = parseRDFToNquads(chunkBuf)
This is alright. No need to change this, but worth learning the Go interface pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I think I see what you're saying now. I'll take a look at interfaces to see how to do that.
The lexing shouldn't be such a problem, though. All the loader code needs to do is find an opening brace '{' and extract it plus everything up to the matching closing brace '}'. The actual parsing of the JSON between those two braces is done in the mapper using go's standard encoding/json package, and the conversion of parsed JSON to NQuads is done by the existing edgraph/nquads_from_jason code, which is already tested in edgraph/server_test.go.
I thought I could just use an existing go library to read "balanced" text but was not able to find one.
Reviewable status: 0 of 6 files reviewed, 23 unresolved discussions (waiting on @golangcibot, @codexnull, @manishrjain, and @srfrog)
.gitignore, line 16 at r4 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
lol. I set vim to noswap. I don't see the point of swap files (maybe I don't know what I'm missing).
Swap files have saved me once or twice by letting me recover work after a crash, but I see your point. I'll set it to no swap or to a dedicated swap directory outside the git tree.
dgraph/cmd/bulk/loader.go, line 206 at r4 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Hmm... Yeah. Looks like that won't be directly applicable.
I tried to think of a way I could use it by using chunks of the JSON stream (i.e. dozens or hundreds of Ks) as the Input string, but it didn't seem worth the complexity and cost of copying byte arrays around so much.
dgraph/cmd/bulk/loader.go, line 379 at r5 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Do we still need this?
I don't think we will after I rewrite to use interfaces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 6 files reviewed, 24 unresolved discussions (waiting on @golangcibot, @codexnull, @manishrjain, and @srfrog)
dgraph/cmd/bulk/loader.go, line 228 at r4 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Can use the lexer here as well.
You could use a json decoder for this. The pkg already supports stream decoding.
See: https://golang.org/pkg/encoding/json/#example_Decoder_Decode_stream
In our case we would need to skip the "set" container and try to loop though the nquads values.
dgraph/cmd/bulk/run.go, line 95 at r5 (raw file):
opt := options{ RDFDir: Bulk.Conf.GetString("rdfs"), JSONDir: Bulk.Conf.GetString("jsons"),
Could we just check the file extension and use the appropriate reader ? then we could have a single --datadir
option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 6 files reviewed, 24 unresolved discussions (waiting on @golangcibot, @codexnull, @manishrjain, and @srfrog)
dgraph/cmd/bulk/loader.go, line 228 at r4 (raw file):
Previously, srfrog (Gus) wrote…
You could use a json decoder for this. The pkg already supports stream decoding.
See: https://golang.org/pkg/encoding/json/#example_Decoder_Decode_streamIn our case we would need to skip the "set" container and try to loop though the nquads values.
I did look at json.Decoder but it didn't seem like I could use it because, as the docs for NewDecoder states, "The decoder introduces its own buffering and may read data from r beyond the JSON values requested," so it could consume more from the stream than we want. It's possible to get the extra data consumed with Buffered() but I don't see a way to put that back in the original reader.
Also, I was trying to avoid actually parsing the JSON here. The mapper expects to get a bytes. Buffer and it seemed inefficient for the loader to unmarshal as it reads only to then marshal it back to text to hand off to the mapper. I'll look at the jsoniter package to see that would work.
dgraph/cmd/bulk/run.go, line 95 at r5 (raw file):
Previously, srfrog (Gus) wrote…
Could we just check the file extension and use the appropriate reader ? then we could have a single
--datadir
option.
We can do that. I didn't because I was trying not to change existing functionality, but I suppose we could add a new option that would handle multiple input types and leave in but deprecate the -r/--rdf options.
Although, I prefer to not rely on the file name if I don't have to. I would like to be able to stream data from a pipe or a socket as well, so we should have a way to let the user specify the format explicitly or even auto detect it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 6 files reviewed, 24 unresolved discussions (waiting on @golangcibot, @manishrjain, @codexnull, @gitlw, and @srfrog)
dgraph/cmd/bulk/loader.go, line 25 at r2 (raw file):
Previously, golangcibot (Bot from GolangCI) wrote…
File is not
goimports
-ed (fromgoimports
)
Fixed.
dgraph/cmd/bulk/loader.go, line 105 at r4 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
I love these small touches. They make the code more readable. Thanks for that.
Done.
dgraph/cmd/bulk/loader.go, line 195 at r4 (raw file):
Previously, gitlw (Lucas Wang) wrote…
It looks like this function can never return a non-nil error.
Maybe the err returned from ReadRune should be checked within and after the for loop.
Fixed.
dgraph/cmd/bulk/mapper.go, line 23 at r2 (raw file):
Previously, golangcibot (Bot from GolangCI) wrote…
File is not
goimports
-ed (fromgoimports
)
Done.
edgraph/nquads_from_json.go, line 426 at r2 (raw file):
Previously, gitlw (Lucas Wang) wrote…
Instead of creating this new function. Do you think it's better to expose the current one by capitalizing it, and also exposing the Set and Delete operations?
Not sure. I did it this way to avoid changing calls to this function by existing code and because I expect a bulk load would only use Set operations and not Delete, so it didn't seem necessary to expose it.
.gitignore, line 16 at r4 (raw file):
Previously, codexnull (Javier Alvarado) wrote…
Swap files have saved me once or twice by letting me recover work after a crash, but I see your point. I'll set it to no swap or to a dedicated swap directory outside the git tree.
Removed.
dgraph/cmd/bulk/chunk.go
Outdated
return err | ||
} | ||
if !unicode.IsSpace(ch) { | ||
r.UnreadRune() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error return value of r.UnreadRune
is not checked (from errcheck
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright. Looks a lot better now with interfaces. I made a few commits. The changes were around simplifying the code and ensuring that all errors are being handled. Have a look at the changes, so you can take a note of these for future reference. If the tests pass, the PR is good to go.
Thanks for your work, and congrats on the first major change!
Reviewable status: 0 of 7 files reviewed, 14 unresolved discussions (waiting on @golangcibot, @codexnull, @gitlw, @manishrjain, and @srfrog)
* Only output open file limit info if there is a problem. * Refactor RDF-to-NQuad and JSON-to-NQuad code to allow testing each independently. * Add some bulk loading tests. * Refactor bulk loader chunking to use interfaces. Add more tests. * Refactor bulk loader mapping to use interfaces. * Simplify chunker code. * Consolidate all chunker code into one interface, one file and corresponding test file. * Manish final review
This change is