-
Notifications
You must be signed in to change notification settings - Fork 307
Exposes num_parallel_reads and num_parallel_calls #1232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 5 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
21bc6c7
Exposes num_parallel_reads and num_parallel_calls
ashahab fb93e5c
Exposes num_parallel_reads and num_parallel_calls
ashahab e3c8742
Removes merge conflicts in file
i-ony 5203f5d
Exposes num_parallel_reads and num_parallel_calls
ashahab fe683eb
Fixes Lint Issues
i-ony 8c4973c
Removes Optional typing for method parameter
i-ony 8de7138
Adds test method for _require() function
i-ony bbd426a
Uncomments skip for macOS pytests
i-ony 787ddc7
Fixes Lint issues
i-ony File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,11 +16,30 @@ | |
|
|
||
| import tensorflow as tf | ||
| from tensorflow_io.core.python.ops import core_ops | ||
| from typing import Optional | ||
|
|
||
| _DEFAULT_READER_BUFFER_SIZE_BYTES = 256 * 1024 # 256 KB | ||
| _DEFAULT_READER_SCHEMA = "" | ||
| # From https://github.com/tensorflow/tensorflow/blob/v2.0.0/tensorflow/python/data/ops/readers.py | ||
|
|
||
|
|
||
| def _require(condition: bool, err_msg: Optional[str] = None) -> None: | ||
| """Checks if the specified condition is true else raises exception | ||
|
|
||
| Args: | ||
| condition: The condition to test | ||
| err_msg: If specified, it's the error message to use if condition is not true. | ||
|
|
||
| Raises: | ||
| ValueError: Raised when the condition is false | ||
|
|
||
| Returns: | ||
| None | ||
| """ | ||
| if not condition: | ||
| raise ValueError(err_msg) | ||
|
|
||
|
|
||
| # copied from https://github.com/tensorflow/tensorflow/blob/ | ||
| # 3095681b8649d9a828afb0a14538ace7a998504d/tensorflow/python/data/ops/readers.py#L36 | ||
| def _create_or_validate_filenames_dataset(filenames): | ||
|
|
@@ -52,21 +71,62 @@ def _create_or_validate_filenames_dataset(filenames): | |
|
|
||
| # copied from https://github.com/tensorflow/tensorflow/blob/ | ||
| # 3095681b8649d9a828afb0a14538ace7a998504d/tensorflow/python/data/ops/readers.py#L67 | ||
| def _create_dataset_reader(dataset_creator, filenames, num_parallel_reads=None): | ||
| """create_dataset_reader""" | ||
|
|
||
| def read_one_file(filename): | ||
| filename = tf.convert_to_tensor(filename, tf.string, name="filename") | ||
| return dataset_creator(filename) | ||
|
|
||
| if num_parallel_reads is None: | ||
| return filenames.flat_map(read_one_file) | ||
| if num_parallel_reads == tf.data.experimental.AUTOTUNE: | ||
| return filenames.interleave( | ||
| read_one_file, num_parallel_calls=num_parallel_reads | ||
| ) | ||
| def _create_dataset_reader( | ||
| dataset_creator, | ||
| filenames, | ||
| cycle_length=None, | ||
| num_parallel_calls=None, | ||
| deterministic=None, | ||
| block_length=1, | ||
| ): | ||
| """ | ||
| This creates a dataset reader which reads records from multiple files and interleaves them together | ||
| ``` | ||
| dataset = Dataset.range(1, 6) # ==> [ 1, 2, 3, 4, 5 ] | ||
| # NOTE: New lines indicate "block" boundaries. | ||
| dataset = dataset.interleave( | ||
| lambda x: Dataset.from_tensors(x).repeat(6), | ||
| cycle_length=2, block_length=4) | ||
| list(dataset.as_numpy_iterator()) | ||
| ``` | ||
| Results in the following output: | ||
| [1,1,1,1, | ||
| 2,2,2,2, | ||
| 1,1, | ||
| 2,2, | ||
| 3,3,3,3, | ||
| 4,4,4,4, | ||
| 3,4, | ||
| 5,5,5,5, | ||
| 5,5, | ||
| ] | ||
| Args: | ||
| dataset_creator: Initializer for AvroDatasetRecord | ||
| filenames: A `tf.data.Dataset` iterator of filenames to read | ||
| cycle_length: The number of files to be processed in parallel. This is used by `Dataset.Interleave`. | ||
| We set this equal to `block_length`, so that each time n number of records are returned for each of the n | ||
| files. | ||
| num_parallel_calls: Number of threads spawned by the interleave call. | ||
| deterministic: Sets whether the interleaved records are written in deterministic order. in tf.interleave thi sis default true | ||
|
||
| block_length: Sets the number of output on the output tensor. Defaults to 1 | ||
| Returns: | ||
| A dataset iterator with an interleaved list of parsed avro records. | ||
|
|
||
| """ | ||
|
|
||
| def read_many_files(filenames): | ||
| filenames = tf.convert_to_tensor(filenames, tf.string, name="filename") | ||
| return dataset_creator(filenames) | ||
|
|
||
| if cycle_length is None: | ||
| return filenames.flat_map(read_many_files) | ||
|
|
||
| return filenames.interleave( | ||
| read_one_file, cycle_length=num_parallel_reads, block_length=1 | ||
| read_many_files, | ||
| cycle_length=cycle_length, | ||
| num_parallel_calls=num_parallel_calls, | ||
| block_length=block_length, | ||
| deterministic=deterministic, | ||
| ) | ||
|
|
||
|
|
||
|
|
@@ -128,10 +188,16 @@ class AvroRecordDataset(tf.data.Dataset): | |
| """A `Dataset` comprising records from one or more AvroRecord files.""" | ||
|
|
||
| def __init__( | ||
| self, filenames, buffer_size=None, num_parallel_reads=None, reader_schema=None | ||
| self, | ||
| filenames, | ||
| buffer_size=None, | ||
| num_parallel_reads=None, | ||
| num_parallel_calls=None, | ||
| reader_schema=None, | ||
| deterministic=True, | ||
| block_length=1, | ||
| ): | ||
| """Creates a `AvroRecordDataset` to read one or more AvroRecord files. | ||
|
|
||
| Args: | ||
| filenames: A `tf.string` tensor or `tf.data.Dataset` containing one or | ||
| more filenames. | ||
|
|
@@ -144,25 +210,61 @@ def __init__( | |
| files read in parallel are outputted in an interleaved order. If your | ||
| input pipeline is I/O bottlenecked, consider setting this parameter to a | ||
| value greater than one to parallelize the I/O. If `None`, files will be | ||
| read sequentially. | ||
| read sequentially. This must be set to equal or greater than `num_parallel_calls`. | ||
| This constraint exists because `num_parallel_reads` becomes `cycle_length` in the | ||
| underlying call to `tf.Dataset.Interleave`, and the `cycle_length` is required to be | ||
| equal or higher than the number of threads(`num_parallel_calls`). | ||
| `cycle_length` in tf.Dataset.Interleave will dictate how many items it will pick up to process | ||
| num_parallel_calls: (Optional.) number of thread to spawn. This must be set to `None` | ||
| or greater than 0. Also this must be less than or equal to `num_parallel_reads`. This defines | ||
| the degree of parallelism in the underlying Dataset.interleave call. | ||
| reader_schema: (Optional.) A `tf.string` scalar representing the reader | ||
| schema or None. | ||
|
|
||
| deterministic: (Optional.) A boolean controlling whether determinism should be traded for performance by | ||
| allowing elements to be produced out of order. Defaults to `True` | ||
| block_length: Sets the number of output on the output tensor. Defaults to 1 | ||
| Raises: | ||
| TypeError: If any argument does not have the expected type. | ||
| ValueError: If any argument does not have the expected shape. | ||
| """ | ||
| _require( | ||
| num_parallel_calls is None | ||
| or num_parallel_calls == tf.data.experimental.AUTOTUNE | ||
| or num_parallel_calls > 0, | ||
| f"num_parallel_calls: {num_parallel_calls} must be set to None, " | ||
| f"tf.data.experimental.AUTOTUNE, or greater than 0", | ||
| ) | ||
| if num_parallel_calls is not None: | ||
| _require( | ||
| num_parallel_reads is not None | ||
| and ( | ||
| num_parallel_reads >= num_parallel_calls | ||
| or num_parallel_reads == tf.data.experimental.AUTOTUNE | ||
| ), | ||
| f"num_parallel_reads: {num_parallel_reads} must be greater than or equal to " | ||
| f"num_parallel_calls: {num_parallel_calls} or set to tf.data.experimental.AUTOTUNE", | ||
| ) | ||
|
|
||
| filenames = _create_or_validate_filenames_dataset(filenames) | ||
|
|
||
| self._filenames = filenames | ||
| self._buffer_size = buffer_size | ||
| self._num_parallel_reads = num_parallel_reads | ||
| self._num_parallel_calls = num_parallel_calls | ||
| self._reader_schema = reader_schema | ||
| self._block_length = block_length | ||
|
|
||
| def creator_fn(filename): | ||
| return _AvroRecordDataset(filename, buffer_size, reader_schema) | ||
| def read_multiple_files(filenames): | ||
| return _AvroRecordDataset(filenames, buffer_size, reader_schema) | ||
|
|
||
| self._impl = _create_dataset_reader(creator_fn, filenames, num_parallel_reads) | ||
| self._impl = _create_dataset_reader( | ||
| read_multiple_files, | ||
| filenames, | ||
| cycle_length=num_parallel_reads, | ||
| num_parallel_calls=num_parallel_calls, | ||
| deterministic=deterministic, | ||
| block_length=block_length, | ||
| ) | ||
| variant_tensor = self._impl._variant_tensor # pylint: disable=protected-access | ||
| super().__init__(variant_tensor) | ||
|
|
||
|
|
@@ -171,13 +273,17 @@ def _clone( | |
| filenames=None, | ||
| buffer_size=None, | ||
| num_parallel_reads=None, | ||
| num_parallel_calls=None, | ||
| reader_schema=None, | ||
| block_length=None, | ||
| ): | ||
| return AvroRecordDataset( | ||
| filenames or self._filenames, | ||
| buffer_size or self._buffer_size, | ||
| num_parallel_reads or self._num_parallel_reads, | ||
| num_parallel_calls or self._num_parallel_calls, | ||
| reader_schema or self._reader_schema, | ||
| block_length or self._block_length, | ||
| ) | ||
|
|
||
| def _inputs(self): | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are not using any type checker's (like
mypy) as of now. I feel this style is a bit out of place when compared with other modules in the codebase.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kvignesh1420 Thanks for the comment. Updated, please review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!