-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simple pipe reader for hdfs or other service #5282
Simple pipe reader for hdfs or other service #5282
Conversation
def pipe_reader(left_cmd, | ||
parser, | ||
bufsize=8192, | ||
file_type="plain", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we just need to support "plain", the user can decompress it outside of Paddle using pipe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thought it may be inconvenient for users to decompress stream data in their parsers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant the user can decompress the data using shell commands, not in the parsers, e.g.:
hadoop fs -cat /path/to/some/file | gzip -d
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, this is simpler, but I'm considering the pipe size using bash
is set by ulimit
, when in cluster trainer, users may not have control over every node's ulimit
configuration, but using python code can.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand bash very well, but does the pipe just "block" if it's full, and probably gzip can decode in a stream fashion, and will consume the pipe buffer, so it will be unblocked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By default, pipes can block both producer and consumer:
If a process attempts to read from an empty pipe, then read(2) will
block until data is available. If a process attempts to write to a
full pipe (see below), then write(2) blocks until sufficient data has
been read from the pipe to allow the write to complete.
Well, my point is, use pipes in python code, can let users to define pipe buffer size which is critical to the reader performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, ok. Thanks!
@@ -323,3 +323,101 @@ def xreader(): | |||
yield sample | |||
|
|||
return xreader | |||
|
|||
|
|||
def _buf2lines(buf, line_break="\n"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line break won't work in binary data, maybe we should let parser decide when to output a new data item?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If cut_lines=False
the binary data will send to parser directly. Do you mean by should let user's parser generate data, and make pipe_reader
a decorator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I thought maybe pipe_reader should not cut the lines, since it does not have sufficient information, we might want leave it to the user's parser to do so (cut and generate data).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, will update.
fix pipe_reader unimport packages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Let's update https://github.com/PaddlePaddle/Paddle/pull/5282/files#r150348324 with a follow up commit.
Fix #5011