From 21bc6c7c31f81700b5059cf0ecdfffe22471f82c Mon Sep 17 00:00:00 2001 From: Abin Shahab Date: Tue, 29 Sep 2020 21:22:43 -0700 Subject: [PATCH 1/8] Exposes num_parallel_reads and num_parallel_calls -Exposes `num_parallel_reads` and `num_parallel_calls` in AvroRecordDataset and `make_avro_record_dataset` -Adds parameter constraints -Fixes lint issues --- .../experimental/avro_record_dataset_ops.py | 147 +++++++++++++++--- .../experimental/make_avro_record_dataset.py | 29 ++-- 2 files changed, 144 insertions(+), 32 deletions(-) diff --git a/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py b/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py index b84a97cf4..5c4c48835 100644 --- a/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py +++ b/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py @@ -16,11 +16,28 @@ import tensorflow as tf from tensorflow_io.core.python.ops import core_ops +from typing import Optional _DEFAULT_READER_BUFFER_SIZE_BYTES = 256 * 1024 # 256 KB _DEFAULT_READER_SCHEMA = "" # From https://github.com/tensorflow/tensorflow/blob/v2.0.0/tensorflow/python/data/ops/readers.py + +def _require(condition: bool, err_msg: Optional[str] = None) -> None: + """Checks if the specified condition is true else raises exception + + :param condition: The condition to test + + :param err_msg: If specified, it's the error message to use if condition is not true. + + :raises ValueError: Raised when the condition is false + + :return: None. + """ + if not condition: + raise ValueError(err_msg) + + # copied from https://github.com/tensorflow/tensorflow/blob/ # 3095681b8649d9a828afb0a14538ace7a998504d/tensorflow/python/data/ops/readers.py#L36 def _create_or_validate_filenames_dataset(filenames): @@ -52,21 +69,62 @@ def _create_or_validate_filenames_dataset(filenames): # copied from https://github.com/tensorflow/tensorflow/blob/ # 3095681b8649d9a828afb0a14538ace7a998504d/tensorflow/python/data/ops/readers.py#L67 -def _create_dataset_reader(dataset_creator, filenames, num_parallel_reads=None): - """create_dataset_reader""" - - def read_one_file(filename): - filename = tf.convert_to_tensor(filename, tf.string, name="filename") - return dataset_creator(filename) - - if num_parallel_reads is None: - return filenames.flat_map(read_one_file) - if num_parallel_reads == tf.data.experimental.AUTOTUNE: - return filenames.interleave( - read_one_file, num_parallel_calls=num_parallel_reads - ) +def _create_dataset_reader( + dataset_creator, + filenames, + cycle_length=None, + num_parallel_calls=None, + deterministic=None, + block_length=1, +): + """ + This creates a dataset reader which reads records from multiple files and interleaves them together +``` +dataset = Dataset.range(1, 6) # ==> [ 1, 2, 3, 4, 5 ] +# NOTE: New lines indicate "block" boundaries. +dataset = dataset.interleave( + lambda x: Dataset.from_tensors(x).repeat(6), + cycle_length=2, block_length=4) +list(dataset.as_numpy_iterator()) +``` +Results in the following output: +[1,1,1,1, + 2,2,2,2, + 1,1, + 2,2, + 3,3,3,3, + 4,4,4,4, + 3,4, + 5,5,5,5, + 5,5, +] + Args: + dataset_creator: Initializer for AvroDatasetRecord + filenames: A `tf.data.Dataset` iterator of filenames to read + cycle_length: The number of files to be processed in parallel. This is used by `Dataset.Interleave`. + We set this equal to `block_length`, so that each time n number of records are returned for each of the n + files. + num_parallel_calls: Number of threads spawned by the interleave call. + deterministic: Sets whether the interleaved records are written in deterministic order. in tf.interleave thi sis default true + block_length: Sets the number of output on the output tensor. Defaults to 1 + Returns: + A dataset iterator with an interleaved list of parsed avro records. + + """ + + def read_many_files(filenames): + filenames = tf.convert_to_tensor(filenames, tf.string, name="filename") + return dataset_creator(filenames) + + if cycle_length is None: + return filenames.flat_map(read_many_files) + return filenames.interleave( - read_one_file, cycle_length=num_parallel_reads, block_length=1 + read_many_files, + cycle_length=cycle_length, + num_parallel_calls=num_parallel_calls, + block_length=block_length, + deterministic=deterministic, ) @@ -128,7 +186,14 @@ class AvroRecordDataset(tf.data.Dataset): """A `Dataset` comprising records from one or more AvroRecord files.""" def __init__( - self, filenames, buffer_size=None, num_parallel_reads=None, reader_schema=None + self, + filenames, + buffer_size=None, + num_parallel_reads=None, + num_parallel_calls=None, + reader_schema=None, + deterministic=True, + block_length=1, ): """Creates a `AvroRecordDataset` to read one or more AvroRecord files. @@ -144,25 +209,65 @@ def __init__( files read in parallel are outputted in an interleaved order. If your input pipeline is I/O bottlenecked, consider setting this parameter to a value greater than one to parallelize the I/O. If `None`, files will be - read sequentially. + read sequentially. This must be set to equal or greater than `num_parallel_calls`. + This constraint exists because `num_parallel_reads` becomes `cycle_length` in the + underlying call to `tf.Dataset.Interleave`, and the `cycle_length` is required to be + equal or higher than the number of threads(`num_parallel_calls`). + `cycle_length` in tf.Dataset.Interleave will dictate how many items it will pick up to process + num_parallel_calls: (Optional.) number of thread to spawn. This must be set to `None` + or greater than 0. Also this must be less than or equal to `num_parallel_reads`. This defines + the degree of parallelism in the underlying Dataset.interleave call. reader_schema: (Optional.) A `tf.string` scalar representing the reader schema or None. +<<<<<<< HEAD +======= + deterministic: (Optional.) A boolean controlling whether determinism should be traded for performance by + allowing elements to be produced out of order. Defaults to `True` + block_length: Sets the number of output on the output tensor. Defaults to 1 +>>>>>>> d41d946... Added parameter constraints Raises: TypeError: If any argument does not have the expected type. ValueError: If any argument does not have the expected shape. """ + _require( + num_parallel_calls is None + or num_parallel_calls == tf.data.experimental.AUTOTUNE + or num_parallel_calls > 0, + f"num_parallel_calls: {num_parallel_calls} must be set to None, " + f"tf.data.experimental.AUTOTUNE, or greater than 0", + ) + if num_parallel_calls is not None: + _require( + num_parallel_reads is not None + and ( + num_parallel_reads >= num_parallel_calls + or num_parallel_reads == tf.data.experimental.AUTOTUNE + ), + f"num_parallel_reads: {num_parallel_reads} must be greater than or equal to " + f"num_parallel_calls: {num_parallel_calls} or set to tf.data.experimental.AUTOTUNE", + ) + filenames = _create_or_validate_filenames_dataset(filenames) self._filenames = filenames self._buffer_size = buffer_size self._num_parallel_reads = num_parallel_reads + self._num_parallel_calls = num_parallel_calls self._reader_schema = reader_schema + self._block_length = block_length - def creator_fn(filename): - return _AvroRecordDataset(filename, buffer_size, reader_schema) + def read_multiple_files(filenames): + return _AvroRecordDataset(filenames, buffer_size, reader_schema) - self._impl = _create_dataset_reader(creator_fn, filenames, num_parallel_reads) + self._impl = _create_dataset_reader( + read_multiple_files, + filenames, + cycle_length=num_parallel_reads, + num_parallel_calls=num_parallel_calls, + deterministic=deterministic, + block_length=block_length, + ) variant_tensor = self._impl._variant_tensor # pylint: disable=protected-access super().__init__(variant_tensor) @@ -171,13 +276,17 @@ def _clone( filenames=None, buffer_size=None, num_parallel_reads=None, + num_parallel_calls=None, reader_schema=None, + block_length=None, ): return AvroRecordDataset( filenames or self._filenames, buffer_size or self._buffer_size, num_parallel_reads or self._num_parallel_reads, + num_parallel_calls or self._num_parallel_calls, reader_schema or self._reader_schema, + block_length or self._block_length, ) def _inputs(self): diff --git a/tensorflow_io/core/python/experimental/make_avro_record_dataset.py b/tensorflow_io/core/python/experimental/make_avro_record_dataset.py index 11934175b..0a9624f37 100644 --- a/tensorflow_io/core/python/experimental/make_avro_record_dataset.py +++ b/tensorflow_io/core/python/experimental/make_avro_record_dataset.py @@ -37,7 +37,6 @@ def make_avro_record_dataset( shuffle_seed=None, prefetch_buffer_size=tf.data.experimental.AUTOTUNE, num_parallel_reads=None, - num_parallel_parser_calls=None, drop_final_batch=False, ): """Reads and (optionally) parses avro files into a dataset. @@ -79,14 +78,26 @@ def make_avro_record_dataset( prefetch_buffer_size: (Optional.) An int specifying the number of feature batches to prefetch for performance improvement. Defaults to auto-tune. Set to 0 to disable prefetching. +<<<<<<< HEAD +<<<<<<< HEAD num_parallel_reads: (Optional.) Number of threads used to read records from files. By default or if set to a value >1, the results will be interleaved. num_parallel_parser_calls: (Optional.) Number of parallel +======= + num_parallel_calls: (Optional.) Number of threads used to read + records from files. By default or if set to a value >1, the + results will be interleaved. + num_parallel_reads: (Optional.) Number of parallel +>>>>>>> f7032e3... Exposed num_parallel_reads as well as num_parallel_calls records to parse in parallel. Defaults to an automatic selection. +======= + num_parallel_reads: (Optional.) Number of parallel + records to parse in parallel. Defaults to None(no parallelization). +>>>>>>> d41d946... Added parameter constraints drop_final_batch: (Optional.) Whether the last batch should be dropped in case its size is smaller than `batch_size`; the default behavior is not to drop the smaller batch. @@ -99,20 +110,15 @@ def make_avro_record_dataset( """ files = tf.data.Dataset.list_files(file_pattern, shuffle=shuffle, seed=shuffle_seed) - if num_parallel_reads is None: - # Note: We considered auto-tuning this value, but there is a concern - # that this affects the mixing of records from different files, which - # could affect training convergence/accuracy, so we are defaulting to - # a constant for now. - num_parallel_reads = 24 - if reader_buffer_size is None: reader_buffer_size = 1024 * 1024 - + num_parallel_calls = num_parallel_reads dataset = AvroRecordDataset( files, buffer_size=reader_buffer_size, num_parallel_reads=num_parallel_reads, + num_parallel_calls=num_parallel_calls, + block_length=num_parallel_calls, reader_schema=reader_schema, ) @@ -131,14 +137,11 @@ def make_avro_record_dataset( dataset = dataset.batch(batch_size, drop_remainder=drop_final_batch) - if num_parallel_parser_calls is None: - num_parallel_parser_calls = tf.data.experimental.AUTOTUNE - dataset = dataset.map( lambda data: parse_avro( serialized=data, reader_schema=reader_schema, features=features ), - num_parallel_calls=num_parallel_parser_calls, + num_parallel_calls=num_parallel_calls, ) if prefetch_buffer_size == 0: From fb93e5c375c4ed4169dc304f42d57094c0376ad6 Mon Sep 17 00:00:00 2001 From: Abin Shahab Date: Tue, 29 Sep 2020 21:22:43 -0700 Subject: [PATCH 2/8] Exposes num_parallel_reads and num_parallel_calls -Exposes `num_parallel_reads` and `num_parallel_calls` in AvroRecordDataset and `make_avro_record_dataset` -Adds parameter constraints -Fixes lint issues --- .../experimental/avro_record_dataset_ops.py | 147 +++++++++++++++--- .../experimental/make_avro_record_dataset.py | 29 ++-- 2 files changed, 144 insertions(+), 32 deletions(-) diff --git a/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py b/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py index b84a97cf4..44c12aed3 100644 --- a/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py +++ b/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py @@ -16,11 +16,31 @@ import tensorflow as tf from tensorflow_io.core.python.ops import core_ops +from typing import Optional _DEFAULT_READER_BUFFER_SIZE_BYTES = 256 * 1024 # 256 KB _DEFAULT_READER_SCHEMA = "" # From https://github.com/tensorflow/tensorflow/blob/v2.0.0/tensorflow/python/data/ops/readers.py + +def _require(condition: bool, err_msg: Optional[str] = None) -> None: + """Checks if the specified condition is true else raises exception + + Args: + condition: The condition to test + err_msg: If specified, it's the error message to use if condition is not true. + + Raises: + ValueError: Raised when the condition is false + + Returns: + None + """ + + if not condition: + raise ValueError(err_msg) + + # copied from https://github.com/tensorflow/tensorflow/blob/ # 3095681b8649d9a828afb0a14538ace7a998504d/tensorflow/python/data/ops/readers.py#L36 def _create_or_validate_filenames_dataset(filenames): @@ -52,21 +72,62 @@ def _create_or_validate_filenames_dataset(filenames): # copied from https://github.com/tensorflow/tensorflow/blob/ # 3095681b8649d9a828afb0a14538ace7a998504d/tensorflow/python/data/ops/readers.py#L67 -def _create_dataset_reader(dataset_creator, filenames, num_parallel_reads=None): - """create_dataset_reader""" - - def read_one_file(filename): - filename = tf.convert_to_tensor(filename, tf.string, name="filename") - return dataset_creator(filename) - - if num_parallel_reads is None: - return filenames.flat_map(read_one_file) - if num_parallel_reads == tf.data.experimental.AUTOTUNE: - return filenames.interleave( - read_one_file, num_parallel_calls=num_parallel_reads - ) +def _create_dataset_reader( + dataset_creator, + filenames, + cycle_length=None, + num_parallel_calls=None, + deterministic=None, + block_length=1, +): + """ + This creates a dataset reader which reads records from multiple files and interleaves them together +``` +dataset = Dataset.range(1, 6) # ==> [ 1, 2, 3, 4, 5 ] +# NOTE: New lines indicate "block" boundaries. +dataset = dataset.interleave( + lambda x: Dataset.from_tensors(x).repeat(6), + cycle_length=2, block_length=4) +list(dataset.as_numpy_iterator()) +``` +Results in the following output: +[1,1,1,1, + 2,2,2,2, + 1,1, + 2,2, + 3,3,3,3, + 4,4,4,4, + 3,4, + 5,5,5,5, + 5,5, +] + Args: + dataset_creator: Initializer for AvroDatasetRecord + filenames: A `tf.data.Dataset` iterator of filenames to read + cycle_length: The number of files to be processed in parallel. This is used by `Dataset.Interleave`. + We set this equal to `block_length`, so that each time n number of records are returned for each of the n + files. + num_parallel_calls: Number of threads spawned by the interleave call. + deterministic: Sets whether the interleaved records are written in deterministic order. in tf.interleave thi sis default true + block_length: Sets the number of output on the output tensor. Defaults to 1 + Returns: + A dataset iterator with an interleaved list of parsed avro records. + + """ + + def read_many_files(filenames): + filenames = tf.convert_to_tensor(filenames, tf.string, name="filename") + return dataset_creator(filenames) + + if cycle_length is None: + return filenames.flat_map(read_many_files) + return filenames.interleave( - read_one_file, cycle_length=num_parallel_reads, block_length=1 + read_many_files, + cycle_length=cycle_length, + num_parallel_calls=num_parallel_calls, + block_length=block_length, + deterministic=deterministic, ) @@ -128,7 +189,14 @@ class AvroRecordDataset(tf.data.Dataset): """A `Dataset` comprising records from one or more AvroRecord files.""" def __init__( - self, filenames, buffer_size=None, num_parallel_reads=None, reader_schema=None + self, + filenames, + buffer_size=None, + num_parallel_reads=None, + num_parallel_calls=None, + reader_schema=None, + deterministic=True, + block_length=1, ): """Creates a `AvroRecordDataset` to read one or more AvroRecord files. @@ -144,25 +212,62 @@ def __init__( files read in parallel are outputted in an interleaved order. If your input pipeline is I/O bottlenecked, consider setting this parameter to a value greater than one to parallelize the I/O. If `None`, files will be - read sequentially. + read sequentially. This must be set to equal or greater than `num_parallel_calls`. + This constraint exists because `num_parallel_reads` becomes `cycle_length` in the + underlying call to `tf.Dataset.Interleave`, and the `cycle_length` is required to be + equal or higher than the number of threads(`num_parallel_calls`). + `cycle_length` in tf.Dataset.Interleave will dictate how many items it will pick up to process + num_parallel_calls: (Optional.) number of thread to spawn. This must be set to `None` + or greater than 0. Also this must be less than or equal to `num_parallel_reads`. This defines + the degree of parallelism in the underlying Dataset.interleave call. reader_schema: (Optional.) A `tf.string` scalar representing the reader schema or None. + deterministic: (Optional.) A boolean controlling whether determinism should be traded for performance by + allowing elements to be produced out of order. Defaults to `True` + block_length: Sets the number of output on the output tensor. Defaults to 1 Raises: TypeError: If any argument does not have the expected type. ValueError: If any argument does not have the expected shape. """ + _require( + num_parallel_calls is None + or num_parallel_calls == tf.data.experimental.AUTOTUNE + or num_parallel_calls > 0, + f"num_parallel_calls: {num_parallel_calls} must be set to None, " + f"tf.data.experimental.AUTOTUNE, or greater than 0", + ) + if num_parallel_calls is not None: + _require( + num_parallel_reads is not None + and ( + num_parallel_reads >= num_parallel_calls + or num_parallel_reads == tf.data.experimental.AUTOTUNE + ), + f"num_parallel_reads: {num_parallel_reads} must be greater than or equal to " + f"num_parallel_calls: {num_parallel_calls} or set to tf.data.experimental.AUTOTUNE", + ) + filenames = _create_or_validate_filenames_dataset(filenames) self._filenames = filenames self._buffer_size = buffer_size self._num_parallel_reads = num_parallel_reads + self._num_parallel_calls = num_parallel_calls self._reader_schema = reader_schema + self._block_length = block_length - def creator_fn(filename): - return _AvroRecordDataset(filename, buffer_size, reader_schema) + def read_multiple_files(filenames): + return _AvroRecordDataset(filenames, buffer_size, reader_schema) - self._impl = _create_dataset_reader(creator_fn, filenames, num_parallel_reads) + self._impl = _create_dataset_reader( + read_multiple_files, + filenames, + cycle_length=num_parallel_reads, + num_parallel_calls=num_parallel_calls, + deterministic=deterministic, + block_length=block_length, + ) variant_tensor = self._impl._variant_tensor # pylint: disable=protected-access super().__init__(variant_tensor) @@ -171,13 +276,17 @@ def _clone( filenames=None, buffer_size=None, num_parallel_reads=None, + num_parallel_calls=None, reader_schema=None, + block_length=None, ): return AvroRecordDataset( filenames or self._filenames, buffer_size or self._buffer_size, num_parallel_reads or self._num_parallel_reads, + num_parallel_calls or self._num_parallel_calls, reader_schema or self._reader_schema, + block_length or self._block_length, ) def _inputs(self): diff --git a/tensorflow_io/core/python/experimental/make_avro_record_dataset.py b/tensorflow_io/core/python/experimental/make_avro_record_dataset.py index 11934175b..0a9624f37 100644 --- a/tensorflow_io/core/python/experimental/make_avro_record_dataset.py +++ b/tensorflow_io/core/python/experimental/make_avro_record_dataset.py @@ -37,7 +37,6 @@ def make_avro_record_dataset( shuffle_seed=None, prefetch_buffer_size=tf.data.experimental.AUTOTUNE, num_parallel_reads=None, - num_parallel_parser_calls=None, drop_final_batch=False, ): """Reads and (optionally) parses avro files into a dataset. @@ -79,14 +78,26 @@ def make_avro_record_dataset( prefetch_buffer_size: (Optional.) An int specifying the number of feature batches to prefetch for performance improvement. Defaults to auto-tune. Set to 0 to disable prefetching. +<<<<<<< HEAD +<<<<<<< HEAD num_parallel_reads: (Optional.) Number of threads used to read records from files. By default or if set to a value >1, the results will be interleaved. num_parallel_parser_calls: (Optional.) Number of parallel +======= + num_parallel_calls: (Optional.) Number of threads used to read + records from files. By default or if set to a value >1, the + results will be interleaved. + num_parallel_reads: (Optional.) Number of parallel +>>>>>>> f7032e3... Exposed num_parallel_reads as well as num_parallel_calls records to parse in parallel. Defaults to an automatic selection. +======= + num_parallel_reads: (Optional.) Number of parallel + records to parse in parallel. Defaults to None(no parallelization). +>>>>>>> d41d946... Added parameter constraints drop_final_batch: (Optional.) Whether the last batch should be dropped in case its size is smaller than `batch_size`; the default behavior is not to drop the smaller batch. @@ -99,20 +110,15 @@ def make_avro_record_dataset( """ files = tf.data.Dataset.list_files(file_pattern, shuffle=shuffle, seed=shuffle_seed) - if num_parallel_reads is None: - # Note: We considered auto-tuning this value, but there is a concern - # that this affects the mixing of records from different files, which - # could affect training convergence/accuracy, so we are defaulting to - # a constant for now. - num_parallel_reads = 24 - if reader_buffer_size is None: reader_buffer_size = 1024 * 1024 - + num_parallel_calls = num_parallel_reads dataset = AvroRecordDataset( files, buffer_size=reader_buffer_size, num_parallel_reads=num_parallel_reads, + num_parallel_calls=num_parallel_calls, + block_length=num_parallel_calls, reader_schema=reader_schema, ) @@ -131,14 +137,11 @@ def make_avro_record_dataset( dataset = dataset.batch(batch_size, drop_remainder=drop_final_batch) - if num_parallel_parser_calls is None: - num_parallel_parser_calls = tf.data.experimental.AUTOTUNE - dataset = dataset.map( lambda data: parse_avro( serialized=data, reader_schema=reader_schema, features=features ), - num_parallel_calls=num_parallel_parser_calls, + num_parallel_calls=num_parallel_calls, ) if prefetch_buffer_size == 0: From 5203f5d928e484f8fa967bdfb28ff0f066d5b5aa Mon Sep 17 00:00:00 2001 From: Abin Shahab Date: Tue, 29 Sep 2020 21:22:43 -0700 Subject: [PATCH 3/8] Exposes num_parallel_reads and num_parallel_calls -Exposes `num_parallel_reads` and `num_parallel_calls` in AvroRecordDataset and `make_avro_record_dataset` -Adds parameter constraints -Fixes lint issues --- .../experimental/avro_record_dataset_ops.py | 8 ++--- .../experimental/make_avro_record_dataset.py | 32 +------------------ 2 files changed, 4 insertions(+), 36 deletions(-) diff --git a/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py b/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py index c4dea1e1f..9ee5d80d3 100644 --- a/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py +++ b/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py @@ -25,18 +25,17 @@ def _require(condition: bool, err_msg: Optional[str] = None) -> None: """Checks if the specified condition is true else raises exception - + Args: condition: The condition to test err_msg: If specified, it's the error message to use if condition is not true. Raises: - ValueError: Raised when the condition is false + ValueError: Raised when the condition is false Returns: None """ - if not condition: raise ValueError(err_msg) @@ -199,8 +198,7 @@ def __init__( block_length=1, ): """Creates a `AvroRecordDataset` to read one or more AvroRecord files. - - Args: + Args: filenames: A `tf.string` tensor or `tf.data.Dataset` containing one or more filenames. buffer_size: (Optional.) A `tf.int64` scalar representing the number of diff --git a/tensorflow_io/core/python/experimental/make_avro_record_dataset.py b/tensorflow_io/core/python/experimental/make_avro_record_dataset.py index 0a9624f37..6973b438e 100644 --- a/tensorflow_io/core/python/experimental/make_avro_record_dataset.py +++ b/tensorflow_io/core/python/experimental/make_avro_record_dataset.py @@ -39,69 +39,39 @@ def make_avro_record_dataset( num_parallel_reads=None, drop_final_batch=False, ): - """Reads and (optionally) parses avro files into a dataset. - + """Reads and (optionally) parses avro files into a dataset. Provides common functionality such as batching, optional parsing, shuffling, and performing defaults. - Args: file_pattern: List of files or patterns of avro file paths. See `tf.io.gfile.glob` for pattern rules. - features: A map of feature names mapped to feature information. - batch_size: An int representing the number of records to combine in a single batch. - reader_schema: The reader schema. - reader_buffer_size: (Optional.) An int specifying the readers buffer size in By. If None (the default) will use the default value from AvroRecordDataset. - num_epochs: (Optional.) An int specifying the number of times this dataset is repeated. If None (the default), cycles through the dataset forever. If set to None drops final batch. - shuffle: (Optional.) A bool that indicates whether the input should be shuffled. Defaults to `True`. - shuffle_buffer_size: (Optional.) Buffer size to use for shuffling. A large buffer size ensures better shuffling, but increases memory usage and startup time. If not provided assumes default value of 10,000 records. Note that the shuffle size is measured in records. - shuffle_seed: (Optional.) Randomization seed to use for shuffling. By default uses a pseudo-random seed. - prefetch_buffer_size: (Optional.) An int specifying the number of feature batches to prefetch for performance improvement. Defaults to auto-tune. Set to 0 to disable prefetching. -<<<<<<< HEAD -<<<<<<< HEAD - - num_parallel_reads: (Optional.) Number of threads used to read - records from files. By default or if set to a value >1, the - results will be interleaved. - - num_parallel_parser_calls: (Optional.) Number of parallel -======= - num_parallel_calls: (Optional.) Number of threads used to read - records from files. By default or if set to a value >1, the - results will be interleaved. - num_parallel_reads: (Optional.) Number of parallel ->>>>>>> f7032e3... Exposed num_parallel_reads as well as num_parallel_calls - records to parse in parallel. Defaults to an automatic selection. - -======= num_parallel_reads: (Optional.) Number of parallel records to parse in parallel. Defaults to None(no parallelization). ->>>>>>> d41d946... Added parameter constraints drop_final_batch: (Optional.) Whether the last batch should be dropped in case its size is smaller than `batch_size`; the default behavior is not to drop the smaller batch. - Returns: A dataset, where each element matches the output of `parser_fn` except it will have an additional leading `batch-size` dimension, From fe683eb3ac646debcc9f8a2f3ac397af55ab1f4c Mon Sep 17 00:00:00 2001 From: ionyeneho Date: Sat, 19 Dec 2020 09:45:41 -0800 Subject: [PATCH 4/8] Fixes Lint Issues --- .../core/python/experimental/make_avro_record_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow_io/core/python/experimental/make_avro_record_dataset.py b/tensorflow_io/core/python/experimental/make_avro_record_dataset.py index 6973b438e..af4eefa61 100644 --- a/tensorflow_io/core/python/experimental/make_avro_record_dataset.py +++ b/tensorflow_io/core/python/experimental/make_avro_record_dataset.py @@ -39,7 +39,7 @@ def make_avro_record_dataset( num_parallel_reads=None, drop_final_batch=False, ): - """Reads and (optionally) parses avro files into a dataset. + """Reads and (optionally) parses avro files into a dataset. Provides common functionality such as batching, optional parsing, shuffling, and performing defaults. Args: From 8c4973c25e67b9d950cc5b782255c3aea98a138e Mon Sep 17 00:00:00 2001 From: ionyeneho Date: Mon, 21 Dec 2020 10:00:36 -0800 Subject: [PATCH 5/8] Removes Optional typing for method parameter - --- .../core/python/experimental/avro_record_dataset_ops.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py b/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py index 9ee5d80d3..bbbdb56fa 100644 --- a/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py +++ b/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py @@ -16,14 +16,13 @@ import tensorflow as tf from tensorflow_io.core.python.ops import core_ops -from typing import Optional _DEFAULT_READER_BUFFER_SIZE_BYTES = 256 * 1024 # 256 KB _DEFAULT_READER_SCHEMA = "" # From https://github.com/tensorflow/tensorflow/blob/v2.0.0/tensorflow/python/data/ops/readers.py -def _require(condition: bool, err_msg: Optional[str] = None) -> None: +def _require(condition: bool, err_msg: str = None) -> None: """Checks if the specified condition is true else raises exception Args: From 8de71386419c3eb328fe76f10bfad03f5b9e7a43 Mon Sep 17 00:00:00 2001 From: ionyeneho Date: Tue, 5 Jan 2021 12:55:12 -0800 Subject: [PATCH 6/8] Adds test method for _require() function -This update adds a test to check if ValueErrors are raised when given an invalid input for num_parallel_calls --- .../experimental/avro_record_dataset_ops.py | 2 +- tests/test_parse_avro_eager.py | 62 ++++++++++++++++++- 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py b/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py index bbbdb56fa..6429aac10 100644 --- a/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py +++ b/tensorflow_io/core/python/experimental/avro_record_dataset_ops.py @@ -106,7 +106,7 @@ def _create_dataset_reader( We set this equal to `block_length`, so that each time n number of records are returned for each of the n files. num_parallel_calls: Number of threads spawned by the interleave call. - deterministic: Sets whether the interleaved records are written in deterministic order. in tf.interleave thi sis default true + deterministic: Sets whether the interleaved records are written in deterministic order. in tf.interleave this is default true block_length: Sets the number of output on the output tensor. Defaults to 1 Returns: A dataset iterator with an interleaved list of parsed avro records. diff --git a/tests/test_parse_avro_eager.py b/tests/test_parse_avro_eager.py index fc4220ad1..427d29e09 100644 --- a/tests/test_parse_avro_eager.py +++ b/tests/test_parse_avro_eager.py @@ -30,8 +30,8 @@ from avro.schema import Parse as parse import tensorflow_io as tfio -if sys.platform == "darwin": - pytest.skip("TODO: skip macOS", allow_module_level=True) +# if sys.platform == "darwin": +# pytest.skip("TODO: skip macOS", allow_module_level=True) class AvroRecordsToFile: @@ -246,6 +246,64 @@ def _load_records_as_tensors(filenames, schema): ), ) + def test_inval_num_parallel_calls(self): + """test_inval_num_parallel_calls + + This function tests that value errors are raised upon + the passing of invalid values for num_parallel_calls which + includes zero values and values greater than num_parallel_reads + """ + + NUM_PARALLEL_READS = 1 + NUM_PARALLEL_CALLS_ZERO = 0 + NUM_PARALLEL_CALLS_GREATER = 2 + + writer_schema = """{ + "type": "record", + "name": "dataTypes", + "fields": [ + { + "name":"index", + "type":"int" + }, + { + "name":"string_value", + "type":"string" + } + ]}""" + + record_data = [ + {"index": 0, "string_value": ""}, + {"index": 1, "string_value": "SpecialChars@!#$%^&*()-_=+{}[]|/`~\\'?"}, + { + "index": 2, + "string_value": "ABCDEFGHIJKLMNOPQRSTUVW" + + "Zabcdefghijklmnopqrstuvwz0123456789", + }, + ] + + filenames = AvroRecordDatasetTest._setup_files( + writer_schema=writer_schema, records=record_data + ) + + with pytest.raises(ValueError): + + dataset_a = tfio.experimental.columnar.AvroRecordDataset( + filenames=filenames, + num_parallel_reads=NUM_PARALLEL_READS, + num_parallel_calls=NUM_PARALLEL_CALLS_ZERO, + reader_schema="reader_schema", + ) + + with pytest.raises(ValueError): + + dataset_b = tfio.experimental.columnar.AvroRecordDataset( + filenames=filenames, + num_parallel_reads=NUM_PARALLEL_READS, + num_parallel_calls=NUM_PARALLEL_CALLS_GREATER, + reader_schema="reader_schema", + ) + def _test_pass_dataset(self, writer_schema, record_data, **kwargs): """test_pass_dataset""" filenames = AvroRecordDatasetTest._setup_files( From bbd426ac2470ecd201e8c574c266ac301f36098d Mon Sep 17 00:00:00 2001 From: ionyeneho Date: Wed, 6 Jan 2021 13:03:13 -0800 Subject: [PATCH 7/8] Uncomments skip for macOS pytests --- tests/test_parse_avro_eager.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/test_parse_avro_eager.py b/tests/test_parse_avro_eager.py index 427d29e09..f23d6aadf 100644 --- a/tests/test_parse_avro_eager.py +++ b/tests/test_parse_avro_eager.py @@ -30,8 +30,8 @@ from avro.schema import Parse as parse import tensorflow_io as tfio -# if sys.platform == "darwin": -# pytest.skip("TODO: skip macOS", allow_module_level=True) +if sys.platform == "darwin": + pytest.skip("TODO: skip macOS", allow_module_level=True) class AvroRecordsToFile: @@ -248,7 +248,6 @@ def _load_records_as_tensors(filenames, schema): def test_inval_num_parallel_calls(self): """test_inval_num_parallel_calls - This function tests that value errors are raised upon the passing of invalid values for num_parallel_calls which includes zero values and values greater than num_parallel_reads From 787ddc72f99e84bfc3299471e816fb3f1787c73b Mon Sep 17 00:00:00 2001 From: ionyeneho Date: Wed, 6 Jan 2021 14:39:38 -0800 Subject: [PATCH 8/8] Fixes Lint issues --- tests/test_parse_avro_eager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_parse_avro_eager.py b/tests/test_parse_avro_eager.py index f23d6aadf..83d39e7da 100644 --- a/tests/test_parse_avro_eager.py +++ b/tests/test_parse_avro_eager.py @@ -31,7 +31,7 @@ import tensorflow_io as tfio if sys.platform == "darwin": - pytest.skip("TODO: skip macOS", allow_module_level=True) + pytest.skip("TODO: skip macOS", allow_module_level=True) class AvroRecordsToFile: