From 98c7b778f8742028a2447144e0efe05039499a4b Mon Sep 17 00:00:00 2001 From: Andrew Audibert Date: Mon, 13 Jan 2020 07:04:25 -0800 Subject: [PATCH 01/11] | Status | (Proposed / Accepted / Implemented / Obsolete) | | :------------ | :------------------------------------------------------ | | **RFC #** | [NNN](https://github.com/tensorflow/community/pull/NNN) | : : (update when you have community PR #) : | **Author(s)** | Andrew Audibert (aaudibert@google.com) Rohan Jain | : : (rohanj@google.com) : | **Sponsor** | Jiri Simsa (jsimsa@google.com) | | **Updated** | 2019-01-13 Provide an API and implementation of a tf.data service which can process tf.data datasets in a distributed manner. The service can be run outside the TensorFlow cluster or be exported as a gRPC service by TensorFlow servers. Goals: - Enable horizontal scaling of dataset computation to improve performance of input-bound dataset pipelines. - Improve tf.data integration with the tf.distribute API. In particular, support dynamic sharding of data across multiple processes. - Provide visitation guarantees for distributed training jobs. --- rfcs/20200113-tf-data-service.md | 591 +++++++++++++++++++++++++++++++ 1 file changed, 591 insertions(+) create mode 100644 rfcs/20200113-tf-data-service.md diff --git a/rfcs/20200113-tf-data-service.md b/rfcs/20200113-tf-data-service.md new file mode 100644 index 000000000..b18908e19 --- /dev/null +++ b/rfcs/20200113-tf-data-service.md @@ -0,0 +1,591 @@ +# Distributed tf.data service + +| Status | Proposed | +| :------------ | :------------------------------------------------------ | +| **RFC #** | [195](https://github.com/tensorflow/community/pull/195) | +| **Author(s)** | Andrew Audibert (aaudibert@google.com) Rohan Jain (rohanj@google.com) | +| **Sponsor** | Jiri Simsa (jsimsa@google.com) | +| **Updated** | 2019-01-09 | + +## Objective + +Provide an API and implementation of a tf.data service which can process tf.data +datasets in a distributed manner. The service can be run outside the TensorFlow +cluster or be exported as a gRPC service by TensorFlow servers. + +Goals: + +- Enable horizontal scaling of dataset computation to improve performance of + input-bound dataset pipelines. +- Improve tf.data integration with the tf.distribute API. In particular, + support dynamic sharding of data across multiple processes. +- Provide visitation guarantees for distributed training jobs. + +Non-goals: + +- Process non-dataset data. +- Distribute datasets that rely on external / non-serializable state. +- Support non-graph computation (e.g. py_function). + +## Motivation + +### Host machine input pipelines can't always keep up with accelerators. + +Some input pipelines require significant resources to produce their data, e.g. +due to image transformations. When the host machine isn't powerful enough to +generate input data at the rate the attached accelerator(s) consume the data, +the accelerator(s) will idle. This slows down training time, and also wastes +valuable accelerator resources. The tf.data service solves this problem by using +N input workers to feed M accelerators. The number of input workers can be +scaled up or down as needed to keep up with the accelerators. + +### Distributed training requires a distribution-aware input pipeline. + +Today tf.data supports the tf.distribute API by providing mechanisms for +sharding, cloning, and re-batching. The tf.distribute API uses these primitives +to implement their own version of a distributed dataset. If distributed datasets +become a core feature of tf.data, tf.data can provide a public API for +tf.distribute (and users who wish to implement their own distribution) to use +instead. This will also allow us to support feature requests that require +cross-worker coordination, such as dynamic sharding. + +## User Benefit + +### Input-bound models + +Users with input-bound models can leverage the tf.data service to distribute +input processing across horizontally-scaling compute resources. This can improve +utilization for valuable accelerator resources, reducing total cost. + +### Dynamic load balancing + +Today, the tf.distribute API statically shards data across accelerators. This +can lead to suboptimal utilization because some shards may contain more data +than others. The tf.data service provides a mechanism for dynamically sharding, +reducing the data imbalance across accelerators. + +### Visitation guarantees + +Model accuracy can often be improved when each training sample is trained on +exactly once per epoch. The tf.data service can coordinate across workers to +provide this guarantee. + +## Design Proposal + +The tf.data service is a master-worker system which iterates through datasets, +producing outputs to be consumed by accelerators. The service is comprised of a +few components: + +* User-facing Python API for interacting with the tf.data service. +* Dataset splitting API for determining how to split up datasets for parallel + processing. +* Master and worker gRPC services. + +### Architecture + +The tf.data service is comprised of master and worker gRPC services which could +be run in a couple of different configurations: + +#### Glossary + +**Master**: The single master coordinating the tf.data service. + +**Worker**: A tf.data service worker which performs dataset processing and +provides dataset elements to consumers over RPC. + +**Consumer**: A machine which consumes data from the tf.data service. The +consumer may be attached to a GPU or TPU, or use data for on-CPU training. + +#### Separate Cluster Architecture + +Each server is run on a separate host from the TensorFlow cluster. This +configuration gives users a way to provide horizontally scaling CPU for +processing their input pipelines and quickly feeding data to accelerators. + +#### Embedded Cluster Architecture + +without needing to provision additional compute resources. and gives all the +benefits of the tf.data service except for horizontal scaling. + +#### Hybrid Architecture + +Users could run tf.data workers embedded in their TensorFlow cluster, and also +run additional tf.data workers (and potentially the tf.data master) outside the +cluster. This allows for horizontal worker scaling, while still leveraging the +compute resources of the TensorFlow cluster for input processing. + +### User-facing Python API + +This API is how users will interact with the tf.data service from their Python +code. + +```python +def tf.data.experimental.service.distribute(address): + """Marks that a dataset should be processed by the tf.data service. + + ds = ... # dataset to distribute + ds = ds.apply(tf.data.experimental.service.distribute(address)) + + Args: + address: The address of the tf.data service master. + + Returns: + A function that can be passed to `dataset.apply()`. + """ + +def tf.data.experimental.service.create_iteration( + dataset, num_consumers=1, num_tasks=None, deterministic=False): + """Begins distributed iteration over a dataset. + + It is expected that the dataset contains at least one `.distribute(address)` + transformation, otherwise this method will print a warning and do nothing. + + `create_iteration` will first register the dataset with the tf.data service + if it isn't already registered. It will then request the creation of + `num_consumers` dataset iterators which divide the dataset `num_consumers` + ways. The returned object can be used to read from one of the + iterators using + `tf.data.experimental.service.make_iterator(ds, obj, consumer_index)`. + + ds = ... # dataset to distribute + ds = ds.apply(tf.data.experimental.service.distribute(address)) + if consumer_index == 0: + # The iteration object is a byte array which needs to be shared among all + # consumers. Here we suppose there are broadcast_send and broadcast_recv + # method available. + iteration_id = tf.data.experimental.service.create_iteration(ds, address, 3) + broadcast_send(iteration_id) + else: + iteration_id = broadcast_recv() + it = tf.data.experimental.service.make_iterator( + ds, iteration_id, consumer_index) + for element in it: + # process element + + Args: + dataset: The dataset to begin iteration over. + num_consumers: The number of consumers to divide the dataset between. Set + this if you require determinism. If None, a single iterator id is returned, + and any number of consumers can read from that iterator id. The data + produced by the dataset will be fed to consumers on a first-come + first-served basis. + num_tasks: The number of tasks to use for processing. Tasks run for + the duration of an epoch, and each worker should typically process a single + task. Normally it is best to leave this as None so that the master can + choose a reasonable number of tasks. Setting `num_tasks` is useful for + producing deterministic results. + deterministic: Whether the iteration should be performed + deterministically. Fully deterministic output also requires setting + `num_tasks` to a fixed number, and that the input dataset is itself + deterministic. + + Returns: + An iteration_id which can be used to created iterators via + `tf.data.experimental.service.make_iterator` + """ + +def tf.data.experimental.service.make_iterator( + dataset, iteration, consumer_index): + """Creates an iterator for reading from the specified dataset. + + Args: + dataset: The dataset to read from. + iteration: An iteration_id object generated by + `tf.data.experimental.service.create_iteration`. + consumer_index: The consumer index within the iteration to read from. If + the iteration was created with `n` consumers, `consumers_index` must be + less than `n`. + + Returns: + A Python iterator which iterates over the dataset elements. + """ +``` + +### Dataset splitting API + +To parallelize dataset processing, the tf.data service needs a way to split up +datasets. We will achieve this by adding a splitting API that allows source +datasets to express how they can be split. + +Our goals for the API are + +* Performance: The splitting API can be used to performantly split and process + datasets. +* Extensibility: User-defined datasets can be split as long as they implement + the splitting API. +* Minimize Surprises: Users write their datasets as though they will not be + split, so introducing splitting can easily lead to unexpected outcomes. To + mitigate this, we will be conservative about which dataset transformations + support splitting. + +The API will be used internally by the tf.data service to distribute datasets. +It will be entirely in C++, and we don't currently have any plans to expose +splitting through Python. + +The API focuses on producing and consuming `Split`s. A `Split` is a variant +Tensor that can be subclassed to represent arbitrary types of splitting. + +```cpp +class Split { + public: + virtual std::string DebugString() const = 0; + // Methods to support being used as a Variant tensor. + virtual std::string TypeName() const = 0; + virtual void Encode(VariantTensorData* data) const = 0; + virtual bool Decode(const VariantTensorData& data) = 0; +}; +``` + +To iterate over splits for a dataset, we will use a new +`DatasetBase::MakeSplitGenerator()` method. This method creates a +`SplitGenerator`, which is responsible for generating all of the splits for the +dataset. We use an intermediate `SplitGenerator` object instead of generating +splits directly because there could be a large number of splits, and the +`SplitGenerator` gives us as way to tune split size in response to pipeline +performance. + +```cpp +class SplitGenerator { + public: + virtual Status GetNext(std::unique_ptr* split, + bool* end_of_splits) = 0; + // Instructs the SplitGenerator to adjust the size of future splits by the + // specified percent. 100% means no change, 50% means half-sized splits, and + // 200% means double-sized splits. The SplitGenerator will make a best effort + // to incorporate the feedback when creating splits. + virtual void AdjustSplitSize(int percent) = 0; +}; +``` + +It is tempting to process each split independently, but this would cause issues +when splits are small. tf.data pipelines need to populate internal buffers for +shuffling, prefetching, and batching. If we use a separate pipeline to process +each split, our shuffling will be lower quality, we will have performance jitter +as we keep needing to refill prefetch buffers from scratching, and we will +produce many more partial batches (each split might not even have enough data to +fill a full batch). To avoid these issues, we use a small number of tasks, where +each task processes many splits as a single pipeline. + +To enable processing of multiple splits in a dataset, we will add an optional +`SplitProvider` field to the `IteratorContext` passed to +`IteratorBase::Initialize`. The `SplitProvider` produces splits which tell the +iterator what source data to iterate over. For example, if splits are +represented by filenames, and a SplitProvider produces `["file1", "file6", +"file11"]`, an iterator initialized by that `SplitProvider` should process those +three files only. + +```cpp +class SplitProvider { + public: + virtual Status GetNext(std::unique_ptr* split, + bool* end_of_splits) = 0; +}; +``` + +When processing datasets, tf.data service workers will use `SplitProvider`s +which provide splits by querying the tf.data service master for which splits to +process. A few splits will be prefetched to hide the latency of needing to +request a new split from the master. + +#### Supported Datasets + +Not all dataset sources and transformations are easily splittable. For example, +`take`, `skip`, and `scan` require a global view of the dataset to produce +correct results. Datasets which require multiple input datasets such as `zip` +are also difficult to support, since we don't have a good way of aligning the +splits of multiple input datasets. Users who rely on these unsupported datasets +will need to move those datasets to come after the distributed part of their +pipeline. + +Initially, we will support splitting for the following dataset sources and +transformations: + +* `batch`, `CsvDataset`, `dense_to_sparse_batch`, `filter`, + `FixedLengthRecordDataset`, `flat_map`, `from_tensor_slices`, + `group_by_window`, `ignore_errors`, `interleave`, `list_files`, `map`, + `range`, `repeat`, `padded_batch`, `prefetch`, `shuffle`, `SSTableDataset`, + `TextLineDataset`, `TFRecordDataset`, `unbatch`, `window`. + +### Master and worker services + +This section discusses the design for the master and worker services. These +services are used by the Python API to provide distributed dataset processing, +and these services use the splitting API as a part of their implementation. + +#### Master API + +The master is responsible for registering datasets, generating and tracking +iteration and worker ids, and generating dataset splits for processing on +workers. + +Below is a sketch of the Master API. This API is not public and is subject to +change. + +```cpp +/// ---- Methods called by consumers ---- + +// Registers a dataset and returns an id for the dataset. If the dataset is +// already registered, its dataset id is returned. +int GetOrRegisterDataset(GraphDef dataset); + +// Creates and returns `num_consumers` iterator ids which partition the +// specified dataset. This also creates an internal `iteration_id` used to +// track the overall dataset iteration. `num_tasks` defines how many tasks to +// create. If `num_tasks` is -1, it is up to the master to determine how many +// tasks to create. +list CreateIterators(int dataset_id, int num_consumers, + int num_tasks); + +// Returns the list of tasks processing data for `iterator_id`. Consumers query +// this to find which worker addresses to read data from. +list GetWorkersForiterator(int iterator_id); + +///---- Methods called by input workers ---- + +// Registers a worker and returns its worker id. +int RegisterWorker(WorkerInfo worker_info); + +// Requests the next splits to process on the given worker for the given +// iteration_id. +List GetSplits(int worker_id, int iteration_id); +``` + +#### Worker API + +The worker is responsible for processing datasets and providing dataset elements +to consumers. + +Below is a sketch of the Worker API. This API is not public and is subject to +change. + +```cpp +/// ---- Methods called by consumers ---- + +// Gets the next element for the specified iterator_id. +list GetElement(iterator_id); + +/// ---- Methods called by master ---- + +// Requests that the worker process the specified dataset. This will trigger the +// worker to start requesting splits from the master using the `iteration_id`. +void ProcessDataset(int dataset_id, int iteration_id, list iterator_ids); +``` + +#### Visitation Guarantees + +When iterating over a dataset, the tf.data service will process all input data +at least once, even in the presence of master or worker failures. If there are +no failures, all input data will be processed exactly once. + +With determinstic execution enabled, the tf.data service provides an +exactly-once visitation guarantee even in the face of master or worker failures. + +#### Determinism + +Deterministic processing is a cornerstone of tf.data. Determinism is valuable +for debugging and experimentation. This section discusses how the tf.data +service will provide determinism. + +To get deterministic behavior, the tf.data service will require three things: + +1. The dataset being distributed has deterministic output. +1. The user sets `deterministic=True` when calling + `tf.data.experimental.service.create_iteration`. +1. The user specifies how many input tasks to use when calling + `tf.data.experimental.service.create_iteration`. +1. The consumers do not fail. + +In the absence of failures, determinism is achieved by distributing splits +round-robin among `N` input workers and having input workers earmark every `ith` +element for consumer `i`. + +To provide determinism even when servers fail, consumers can keep track of which +element index they have processed up to for each task. Input workers would +attach per-task element indices when they produce elements, so consumers can +ignore duplicate elements caused by worker restarts. We will use an analogous +mechanism to avoid re-processing the same split in case of master falure. Input +workers will track the split index of splits as they receive them, and ignore +duplicate splits. + +#### Failure Recovery + +The tf.data service can recover from master and worker failures while preserving +determinism and its at-least-once visitation guarantee. The master achieves this +by writing its unrecoverable state to a persistent journal, and taking +checkpoints of its recoverable state to improve recovery time. When workers +reconnect to a restarted master, they update the master with their state so that +the master can recover its knowledge of its workers. + +The unrecoverable state includes + +* **Registered datasets** +* **ID generators** for iteration ids, iterator ids, dataset ids, and worker + ids. +* **In-progress iteration state**: + * **dataset id** for the iterated dataset so that we can recover the + iteration's split generator + * **iteration id** + * **participating worker ids**, so that we can send splits to the correct + workers. + +Recoverable state includes + +* **Split generators**: Recoverable from our information about in-progress + iterations. +* **Worker addresses**: Recoverable when workers reconnect. +* **Worker loads**: Recoverable when workers reconnect. +* **Assignment from splits to workers**: Recoverable when workers reconnect. +* **Outstanding splits**: Recoverable by re-running split generators from + their checkpoint state. + +To improve recovery time, the master will periodically write checkpoints of its +split generators and outstanding splits, so that split generators don't need to +be run from the beginning during master recovery. + +A concern with the above recovery strategy is that a master could transmit a +split before crashing, then restart and transmit the same split again. To avoid +this duplication, the master attaches a split index to every split it sends to a +worker. When workers reconnect, they inform the master of their latest split +index. + +Workers have no unrecoverable state. If a worker crashes, a new worker can take +its place. It is up to the master to reassign splits from the crashed worker to +the new worker. + +To improve worker recovery time, workers will periodically write checkpoints of +their iterators to directories named using their worker ids. When the restarted +worker connects, the master will tell it which iterator checkpoints to recover +from. + +We will read and write this state through a MasterState interface which can be +implemented using various storage backends. For use cases that require fault +tolerance, the user must configure a fault-tolerant MasterState, e.g. Spanner +internally, Cloud Spanner in GCP, or etcd externally. If fault tolerance isn't +required, the user could configure state to be held in memory only. + +#### Leadership Transfer + +The master writes state to journal files so that the state can be recovered on +restart. It is possible that a new master could be brought up while the old +master is still running. If we aren't careful, this could result in corruption +of the journal as both masters try to write to it. + +Ideally we could rely on a distributed coordination service such as ZooKeeper. +However, this would add a significant burden to users who don't have access to a +ZooKeeper cluster, and it would also require adding a new dependency on a +ZooKeeper client. + +What TensorFlow does have is a FileSystem API. We will leverage this API to +perform leadership transfer as follows: + +1. The first master will create a file named "master_seqno_0". If it + successfully creates the file, it will consider itself the leader. +1. The leader master will check every N milliseconds that the "master_seqno" + file it created still exists. If the file no longer exists, the master will + cease operation immediately. +1. When a master thinks it should be leader, it attempts to atomically rename + the master_seqno_n file to master_seqno_n+1. If this succeeds, the master + will wait (N + M) milliseconds, verify that its renamed file still exists, + and begin acting as leader. This gives the previous leader time to notice + the rename. + +The above scheme relies on rename being atomic so that two masters don't both +succeed at renaming the same file. Users may opt to use a filesystem that +doesn't support atomic rename, but they do so at the (unlikely) risk of two +concurrently running masters thinking they are leader. Common filesystems such +as Posix and HDFS support atomic rename. + +#### Caveats + +This section calls out caveats that users will need to be aware of when using +the tf.data service. + +- Due to the nature of dataset splitting, elements will not be processed in + the same order as they were in the pre-distributed dataset. If a dataset + relies on the order of the input files, the user's assumptions will be + violated when splitting causes each input worker to process only a subset of + the input files. +- If a dataset doesn't support splitting, it must be moved after the part of + the dataset which is distributed. Alternately, the user could set + num_tasks=1 to avoid the need for splitting, but this will have a heavy + performance cost since it only allows a single worker to generate dataset + elements. The most commonly used but unsupported datasets are + `from_generator` and `zip`. + +### Alternatives Considered + +#### Use Beam for distributed dataset processing. + +Beam is an open-source data processing framework capable of large-scale parallel +computation. Instead of implementing distributed computation ourselves, we could +execute Beam jobs to perform dataset processing. + +We chose not to follow this direction to avoid creating a dependency on Beam. +Many users don't depend on Beam, and it would be a limitation to require that +dependency. If we depend on Beam, it will not be possible to use the tf.data +service with out-of-the-box TensorFlow. This is especially important as tf.data +service is expected to be used by the tf.distribute API. + +### Performance Implications + +With tf.data workers running in a separate cluster, we expect to be able to +horizontally scale until the input pipeline is no longer the bottleneck, +improving performance for input-bound pipelines. + +If a pipeline input-bound or close to input-bound, tf.distribute could see +performance regressions when it uses the tf.data service to serve elements +across replicas. The issue is that the tf.data service will incur the cost of +transferring elements over the network to feed replicas, instead of having each +replica perform its input processing locally. On the other hand, if the input +pipeline is not the bottleneck, tf.distribute could see training speedups as +dynamic sharding mitigates the time spent waiting for stragglers. + +### Dependencies + +This proposal does not add any new dependencies to TensorFlow. + +### Engineering Impact + +The tf.data service will be maintained by the tf.data team. + +### Platforms and Environments + +The tf.data service is compatible with all platforms supported by TensorFlow. + +### Best Practices, Tutorials and Examples + +The tf.data performance guide will be updated to explain when to use the tf.data +service. We will also provide a tutorial for using the tf.data service. + +### Compatibility + +* Does the design conform to the backwards & forwards compatibility + requirements? + - Yes, this design only adds new functionality, so it doesn't break any + backwards or forwards compatibility guarantees. +* How will this proposal interact with other parts of the TensorFlow + Ecosystem? + - How will it work with TFLite? + * We aren't planning any integration with TFLite, where we haven't + seen a need for distributed input processing. Traditionally TFLite + is used for inference, while tf.data is used for training. + - How will it work with distribution strategies? + * Distribution strategies will be able to leverage the tf.data service + to replace its static sharding with dynamic sharding, and to support + efficient splitting for a wider range of datasets. + - How will it interact with tf.function? + * The tf.data service APIs will work both inside and outside of + tf.functions. + - Will this work on GPU/TPU? + * This proposal does not change the status quo of support for + executing tf.data pipelines on GPU/TPU. + +## Questions and Discussion Topics + +* How should we communicate that distributing a dataset will change the order + in which elements are processed? If users' datasets rely on elements being + processed in a certain order, they could face unpleasant surprises. +* Is there a more user-friendly way to share iteration data across consumers? + Distribution strategy is well-equipped with collective ops to share the + iteration data, but sharing the iteration data could be a heavy burden for + some users. From 9452e02eb9617d4f285c44fc9f3b25e4bebd7e6e Mon Sep 17 00:00:00 2001 From: Andrew Audibert Date: Mon, 13 Jan 2020 17:22:38 -0800 Subject: [PATCH 02/11] Fix truncated sentence --- rfcs/20200113-tf-data-service.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rfcs/20200113-tf-data-service.md b/rfcs/20200113-tf-data-service.md index b18908e19..b2d6af959 100644 --- a/rfcs/20200113-tf-data-service.md +++ b/rfcs/20200113-tf-data-service.md @@ -104,6 +104,8 @@ processing their input pipelines and quickly feeding data to accelerators. #### Embedded Cluster Architecture +Each TensorFlow server runs the tf.data worker gRPC service, and one server also +runs the master gRPC service. This lets users leverage the tf.data service without needing to provision additional compute resources. and gives all the benefits of the tf.data service except for horizontal scaling. From a575f858af0b154f56e8be51d7da36f00012afd5 Mon Sep 17 00:00:00 2001 From: Andrew Audibert Date: Mon, 13 Jan 2020 18:55:53 -0800 Subject: [PATCH 03/11] Improve wording --- rfcs/20200113-tf-data-service.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rfcs/20200113-tf-data-service.md b/rfcs/20200113-tf-data-service.md index b2d6af959..0137b4596 100644 --- a/rfcs/20200113-tf-data-service.md +++ b/rfcs/20200113-tf-data-service.md @@ -507,9 +507,9 @@ the tf.data service. relies on the order of the input files, the user's assumptions will be violated when splitting causes each input worker to process only a subset of the input files. -- If a dataset doesn't support splitting, it must be moved after the part of - the dataset which is distributed. Alternately, the user could set - num_tasks=1 to avoid the need for splitting, but this will have a heavy +- If a particular dataset operation doesn't support splitting, it must be moved + after the part of the dataset which is distributed. Alternately, the user could + set num_tasks=1 to avoid the need for splitting, but this will have a heavy performance cost since it only allows a single worker to generate dataset elements. The most commonly used but unsupported datasets are `from_generator` and `zip`. From a6e7227d7f380fd9bef5e44087fbbb0d2e58a311 Mon Sep 17 00:00:00 2001 From: Andrew Audibert Date: Wed, 15 Jan 2020 15:43:29 -0800 Subject: [PATCH 04/11] Expand vistation guarantees. Update the proposal to support exactly-once visitation even when the service is executing non-deterministically. Also, add discussion of the visitation guarantees provided when the dataset produces outputs non-deterministically. --- rfcs/20200113-tf-data-service.md | 111 ++++++++++++++++--------------- 1 file changed, 59 insertions(+), 52 deletions(-) diff --git a/rfcs/20200113-tf-data-service.md b/rfcs/20200113-tf-data-service.md index 0137b4596..e361cc191 100644 --- a/rfcs/20200113-tf-data-service.md +++ b/rfcs/20200113-tf-data-service.md @@ -1,9 +1,10 @@ # Distributed tf.data service -| Status | Proposed | +| Status | Proposed | | :------------ | :------------------------------------------------------ | | **RFC #** | [195](https://github.com/tensorflow/community/pull/195) | -| **Author(s)** | Andrew Audibert (aaudibert@google.com) Rohan Jain (rohanj@google.com) | +| **Author(s)** | Andrew Audibert (aaudibert@google.com) Rohan Jain | +: : (rohanj@google.com) : | **Sponsor** | Jiri Simsa (jsimsa@google.com) | | **Updated** | 2019-01-09 | @@ -62,7 +63,9 @@ utilization for valuable accelerator resources, reducing total cost. Today, the tf.distribute API statically shards data across accelerators. This can lead to suboptimal utilization because some shards may contain more data than others. The tf.data service provides a mechanism for dynamically sharding, -reducing the data imbalance across accelerators. +reducing the data imbalance across accelerators. Note that dynamic load +balancing and deterministic output are mutually exclusive; if users require +deterministic output, they must trade off dynamic load balancing. ### Visitation guarantees @@ -154,7 +157,7 @@ def tf.data.experimental.service.create_iteration( if consumer_index == 0: # The iteration object is a byte array which needs to be shared among all # consumers. Here we suppose there are broadcast_send and broadcast_recv - # method available. + # methods available. iteration_id = tf.data.experimental.service.create_iteration(ds, address, 3) broadcast_send(iteration_id) else: @@ -373,14 +376,26 @@ list GetElement(iterator_id); void ProcessDataset(int dataset_id, int iteration_id, list iterator_ids); ``` -#### Visitation Guarantees +#### Visitation Guarantee -When iterating over a dataset, the tf.data service will process all input data -at least once, even in the presence of master or worker failures. If there are -no failures, all input data will be processed exactly once. +When iterating over a deterministic dataset, the tf.data service will process +all input data exactly once, even in the presence of master or worker failures. +We achieve exactly-once by having consumers keep track of their index within +each task, and having restored tasks skip elements to reach the requested index. +For the skipping to give exactly-once semantics, the dataset must produce +outputs deterministically. -With determinstic execution enabled, the tf.data service provides an -exactly-once visitation guarantee even in the face of master or worker failures. +If the dataset is not deterministic, the user can choose either at-least-once or +a close-to-exactly-once visitation guarantee. We can achieve +close-to-exactly-once by using the same skipping technique that we use to +achieve exactly-once for deterministic datasets. If users prefer an +at-least-once guarantee, we can instead start restored tasks from their latest +checkpoint. + +In some cases, we can provide an exactly-once visitation guarantee to +non-deterministic pipelines. If input workers are brought down gracefully, they +can first write checkpoints of their tasks. This way, tasks can begin exactly +where they left off. #### Determinism @@ -404,10 +419,7 @@ element for consumer `i`. To provide determinism even when servers fail, consumers can keep track of which element index they have processed up to for each task. Input workers would attach per-task element indices when they produce elements, so consumers can -ignore duplicate elements caused by worker restarts. We will use an analogous -mechanism to avoid re-processing the same split in case of master falure. Input -workers will track the split index of splits as they receive them, and ignore -duplicate splits. +ignore duplicate elements caused by worker restarts. #### Failure Recovery @@ -427,8 +439,8 @@ The unrecoverable state includes * **dataset id** for the iterated dataset so that we can recover the iteration's split generator * **iteration id** - * **participating worker ids**, so that we can send splits to the correct - workers. + * **assignments from splits to tasks**, so that we can restart failed + tasks on new workers. Recoverable state includes @@ -436,20 +448,12 @@ Recoverable state includes iterations. * **Worker addresses**: Recoverable when workers reconnect. * **Worker loads**: Recoverable when workers reconnect. -* **Assignment from splits to workers**: Recoverable when workers reconnect. -* **Outstanding splits**: Recoverable by re-running split generators from - their checkpoint state. +* **Assignment from tasks to workers**: Recoverable when workers reconnect. To improve recovery time, the master will periodically write checkpoints of its split generators and outstanding splits, so that split generators don't need to be run from the beginning during master recovery. -A concern with the above recovery strategy is that a master could transmit a -split before crashing, then restart and transmit the same split again. To avoid -this duplication, the master attaches a split index to every split it sends to a -worker. When workers reconnect, they inform the master of their latest split -index. - Workers have no unrecoverable state. If a worker crashes, a new worker can take its place. It is up to the master to reassign splits from the crashed worker to the new worker. @@ -461,9 +465,9 @@ from. We will read and write this state through a MasterState interface which can be implemented using various storage backends. For use cases that require fault -tolerance, the user must configure a fault-tolerant MasterState, e.g. Spanner -internally, Cloud Spanner in GCP, or etcd externally. If fault tolerance isn't -required, the user could configure state to be held in memory only. +tolerance, the user must configure a fault-tolerant MasterState, e.g. Cloud +Spanner or etcd. If fault tolerance isn't required, the user could configure +state to be held in memory only. #### Leadership Transfer @@ -478,24 +482,27 @@ ZooKeeper cluster, and it would also require adding a new dependency on a ZooKeeper client. What TensorFlow does have is a FileSystem API. We will leverage this API to -perform leadership transfer as follows: - -1. The first master will create a file named "master_seqno_0". If it - successfully creates the file, it will consider itself the leader. -1. The leader master will check every N milliseconds that the "master_seqno" - file it created still exists. If the file no longer exists, the master will - cease operation immediately. -1. When a master thinks it should be leader, it attempts to atomically rename - the master_seqno_n file to master_seqno_n+1. If this succeeds, the master - will wait (N + M) milliseconds, verify that its renamed file still exists, - and begin acting as leader. This gives the previous leader time to notice - the rename. - -The above scheme relies on rename being atomic so that two masters don't both -succeed at renaming the same file. Users may opt to use a filesystem that -doesn't support atomic rename, but they do so at the (unlikely) risk of two -concurrently running masters thinking they are leader. Common filesystems such -as Posix and HDFS support atomic rename. +perform leadership transfer by creating empty files and inspecting file +modification times. + +``` +files = list_directory(leadership_directory) +if all_files_older_than(files, leadership_transfer_interval): + file = create_unique_file(leadership_directory); + if file_is_strictly_newest(file, leadership_directory): + become_leader() +# Another master may be leader. Wait for some time before trying again. +wait_random_interval() +``` + +The leader master will periodically write files to the leadership directory to +indicate that it is still leading. + +The above scheme relies on the filesystem's create_file() and list() operations +being strongly consistent . Users may opt to use a filesystem that doesn't +support strong consistency, but they do so at the risk of two concurrently +running masters thinking they are leader. Common filesystems such as Posix, +HDFS, and GCS support such strong consistency. #### Caveats @@ -507,12 +514,12 @@ the tf.data service. relies on the order of the input files, the user's assumptions will be violated when splitting causes each input worker to process only a subset of the input files. -- If a particular dataset operation doesn't support splitting, it must be moved - after the part of the dataset which is distributed. Alternately, the user could - set num_tasks=1 to avoid the need for splitting, but this will have a heavy - performance cost since it only allows a single worker to generate dataset - elements. The most commonly used but unsupported datasets are - `from_generator` and `zip`. +- If a particular dataset operation doesn't support splitting, it must be + moved after the part of the dataset which is distributed. Alternately, the + user could set num_tasks=1 to avoid the need for splitting, but this will + have a heavy performance cost since it only allows a single worker to + generate dataset elements. The most commonly used but unsupported datasets + are `from_generator` and `zip`. ### Alternatives Considered From 4b55d4e714738a8519a0aef534e8afa989e74e1b Mon Sep 17 00:00:00 2001 From: Andrew Audibert Date: Thu, 16 Jan 2020 09:10:39 -0800 Subject: [PATCH 05/11] Fix errors and add more description. --- rfcs/20200113-tf-data-service.md | 35 ++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/rfcs/20200113-tf-data-service.md b/rfcs/20200113-tf-data-service.md index e361cc191..37515a927 100644 --- a/rfcs/20200113-tf-data-service.md +++ b/rfcs/20200113-tf-data-service.md @@ -122,7 +122,20 @@ compute resources of the TensorFlow cluster for input processing. ### User-facing Python API This API is how users will interact with the tf.data service from their Python -code. +code. The steps for distributed iteration over a dataset are + +1. Create a dataset like usual. +2. Apply the `distribute` transformation to indicate that the dataset should be + processed by the tf.data service. +3. Begin an *iteration* by calling `create_iteration`. An *iteration* is a + single pass through the dataset. Multiple consumers can read from the same + iteration, resulting in each consumer receiving a partition of the original + dataset. We represent an iteration with an iteration id, which is generated + by the tf.data service when you call `create_iteration`. +4. Share the iteration id with all consumer processes which are participating + in the iteration. +5. Create per-consumer iterators using `make_iterator`, and use these iterators + to read data from the tf.data service. ```python def tf.data.experimental.service.distribute(address): @@ -158,7 +171,7 @@ def tf.data.experimental.service.create_iteration( # The iteration object is a byte array which needs to be shared among all # consumers. Here we suppose there are broadcast_send and broadcast_recv # methods available. - iteration_id = tf.data.experimental.service.create_iteration(ds, address, 3) + iteration_id = tf.data.experimental.service.create_iteration(ds, 3) broadcast_send(iteration_id) else: iteration_id = broadcast_recv() @@ -170,10 +183,7 @@ def tf.data.experimental.service.create_iteration( Args: dataset: The dataset to begin iteration over. num_consumers: The number of consumers to divide the dataset between. Set - this if you require determinism. If None, a single iterator id is returned, - and any number of consumers can read from that iterator id. The data - produced by the dataset will be fed to consumers on a first-come - first-served basis. + this if you require determinism. num_tasks: The number of tasks to use for processing. Tasks run for the duration of an epoch, and each worker should typically process a single task. Normally it is best to leave this as None so that the master can @@ -190,7 +200,7 @@ def tf.data.experimental.service.create_iteration( """ def tf.data.experimental.service.make_iterator( - dataset, iteration, consumer_index): + dataset, iteration, consumer_index=0): """Creates an iterator for reading from the specified dataset. Args: @@ -343,7 +353,7 @@ list CreateIterators(int dataset_id, int num_consumers, // Returns the list of tasks processing data for `iterator_id`. Consumers query // this to find which worker addresses to read data from. -list GetWorkersForiterator(int iterator_id); +list GetWorkersForIterator(int iterator_id); ///---- Methods called by input workers ---- @@ -376,7 +386,7 @@ list GetElement(iterator_id); void ProcessDataset(int dataset_id, int iteration_id, list iterator_ids); ``` -#### Visitation Guarantee +#### Visitation Guarantees When iterating over a deterministic dataset, the tf.data service will process all input data exactly once, even in the presence of master or worker failures. @@ -406,10 +416,9 @@ service will provide determinism. To get deterministic behavior, the tf.data service will require three things: 1. The dataset being distributed has deterministic output. -1. The user sets `deterministic=True` when calling - `tf.data.experimental.service.create_iteration`. -1. The user specifies how many input tasks to use when calling - `tf.data.experimental.service.create_iteration`. +1. The user sets `num_consumers`, `num_tasks`, and `deterministic=True` when + calling `tf.data.experimental.service.create_iteration`. +1. Each consumer uses a unique `consumer_index` when calling `make_iterator`. 1. The consumers do not fail. In the absence of failures, determinism is achieved by distributing splits From eb6fed23627b7e646122f1a94caf495d538dd319 Mon Sep 17 00:00:00 2001 From: Andrew Audibert Date: Thu, 16 Jan 2020 09:13:11 -0800 Subject: [PATCH 06/11] Fix markdown formatting --- rfcs/20200113-tf-data-service.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rfcs/20200113-tf-data-service.md b/rfcs/20200113-tf-data-service.md index 37515a927..6d6ccfb5c 100644 --- a/rfcs/20200113-tf-data-service.md +++ b/rfcs/20200113-tf-data-service.md @@ -3,8 +3,7 @@ | Status | Proposed | | :------------ | :------------------------------------------------------ | | **RFC #** | [195](https://github.com/tensorflow/community/pull/195) | -| **Author(s)** | Andrew Audibert (aaudibert@google.com) Rohan Jain | -: : (rohanj@google.com) : +| **Author(s)** | Andrew Audibert (aaudibert@google.com) Rohan Jain (rohanj@google.com) | | **Sponsor** | Jiri Simsa (jsimsa@google.com) | | **Updated** | 2019-01-09 | From c66699b2231ec457bbf0d0538d284f2790bdd7e2 Mon Sep 17 00:00:00 2001 From: Andrew Audibert Date: Thu, 16 Jan 2020 09:17:03 -0800 Subject: [PATCH 07/11] Note that S3 is not a good FS for safe leadership transfer --- rfcs/20200113-tf-data-service.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rfcs/20200113-tf-data-service.md b/rfcs/20200113-tf-data-service.md index 6d6ccfb5c..0f8baa37e 100644 --- a/rfcs/20200113-tf-data-service.md +++ b/rfcs/20200113-tf-data-service.md @@ -509,8 +509,8 @@ indicate that it is still leading. The above scheme relies on the filesystem's create_file() and list() operations being strongly consistent . Users may opt to use a filesystem that doesn't support strong consistency, but they do so at the risk of two concurrently -running masters thinking they are leader. Common filesystems such as Posix, -HDFS, and GCS support such strong consistency. +running masters thinking they are leader. Common filesystems such as POSIX, +HDFS, and GCS support such strong consistency, but S3 does not. #### Caveats From 910474a51c7b482d70c5ce32f790cd9c805b0c7c Mon Sep 17 00:00:00 2001 From: Andrew Audibert Date: Thu, 23 Jan 2020 18:03:21 -0800 Subject: [PATCH 08/11] Add section on framework integration --- rfcs/20200113-tf-data-service.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/rfcs/20200113-tf-data-service.md b/rfcs/20200113-tf-data-service.md index 0f8baa37e..9450c409c 100644 --- a/rfcs/20200113-tf-data-service.md +++ b/rfcs/20200113-tf-data-service.md @@ -529,6 +529,18 @@ the tf.data service. generate dataset elements. The most commonly used but unsupported datasets are `from_generator` and `zip`. +#### Framework Integration + +Many users interact with TensorFlow through a framework such as +[TFX](https://www.tensorflow.org/tfx). A framework can make leveraging the +tf.data service as simple as toggling a configuration boolean, triggering the +framework to bring up tf.data service servers and add a +`tf.data.experimental.service.distribute` transformation at the end of the +users' data pipeline. By inspecting the amount of time blocked on the input +pipeline, the framework could dynamically scale the number of input workers up +and down to find the minimum number of workers needed so that the input pipeline +can keep up with the model. + ### Alternatives Considered #### Use Beam for distributed dataset processing. From afec5f9aebd3a4544e5b5a1289eddc992a2dcd5e Mon Sep 17 00:00:00 2001 From: Andrew Audibert Date: Fri, 24 Jan 2020 12:41:39 -0800 Subject: [PATCH 09/11] Improve doc clarity --- rfcs/20200113-tf-data-service.md | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/rfcs/20200113-tf-data-service.md b/rfcs/20200113-tf-data-service.md index 9450c409c..b696f0861 100644 --- a/rfcs/20200113-tf-data-service.md +++ b/rfcs/20200113-tf-data-service.md @@ -5,7 +5,7 @@ | **RFC #** | [195](https://github.com/tensorflow/community/pull/195) | | **Author(s)** | Andrew Audibert (aaudibert@google.com) Rohan Jain (rohanj@google.com) | | **Sponsor** | Jiri Simsa (jsimsa@google.com) | -| **Updated** | 2019-01-09 | +| **Updated** | 2019-01-24 | ## Objective @@ -98,20 +98,20 @@ provides dataset elements to consumers over RPC. **Consumer**: A machine which consumes data from the tf.data service. The consumer may be attached to a GPU or TPU, or use data for on-CPU training. -#### Separate Cluster Architecture +#### Option 1: Separate Cluster Architecture Each server is run on a separate host from the TensorFlow cluster. This configuration gives users a way to provide horizontally scaling CPU for processing their input pipelines and quickly feeding data to accelerators. -#### Embedded Cluster Architecture +#### Option 2: Embedded Cluster Architecture Each TensorFlow server runs the tf.data worker gRPC service, and one server also runs the master gRPC service. This lets users leverage the tf.data service without needing to provision additional compute resources. and gives all the benefits of the tf.data service except for horizontal scaling. -#### Hybrid Architecture +#### Option 3: Hybrid Architecture Users could run tf.data workers embedded in their TensorFlow cluster, and also run additional tf.data workers (and potentially the tf.data master) outside the @@ -136,6 +136,12 @@ code. The steps for distributed iteration over a dataset are 5. Create per-consumer iterators using `make_iterator`, and use these iterators to read data from the tf.data service. +We move away from the idiomatic `for element in dataset` control flow because +there is now an extra step when going from dataset to iterator: creating an +iteration. A higher layer API such as tf.distribute may use the API presented +here to implement datasets which produce per-replica elements, enabling +idiomatic control flow. + ```python def tf.data.experimental.service.distribute(address): """Marks that a dataset should be processed by the tf.data service. @@ -237,7 +243,9 @@ It will be entirely in C++, and we don't currently have any plans to expose splitting through Python. The API focuses on producing and consuming `Split`s. A `Split` is a variant -Tensor that can be subclassed to represent arbitrary types of splitting. +Tensor that can be subclassed to represent arbitrary types of splitting. The +`Split` base class is intentionally general so that subclasses have the +flexibility to define splits however they like. ```cpp class Split { @@ -614,6 +622,8 @@ service. We will also provide a tutorial for using the tf.data service. * How should we communicate that distributing a dataset will change the order in which elements are processed? If users' datasets rely on elements being processed in a certain order, they could face unpleasant surprises. +* Should we support splitting `skip` and `take` by having them operate at a + per-task level (skip or take the first `N` elements within each task)? * Is there a more user-friendly way to share iteration data across consumers? Distribution strategy is well-equipped with collective ops to share the iteration data, but sharing the iteration data could be a heavy burden for From 4c005e7c203ff3b44226eeac634170f2eec12950 Mon Sep 17 00:00:00 2001 From: Andrew Audibert Date: Thu, 30 Jan 2020 10:36:01 -0800 Subject: [PATCH 10/11] Update discussion topics --- rfcs/20200113-tf-data-service.md | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/rfcs/20200113-tf-data-service.md b/rfcs/20200113-tf-data-service.md index b696f0861..004063f6f 100644 --- a/rfcs/20200113-tf-data-service.md +++ b/rfcs/20200113-tf-data-service.md @@ -622,9 +622,22 @@ service. We will also provide a tutorial for using the tf.data service. * How should we communicate that distributing a dataset will change the order in which elements are processed? If users' datasets rely on elements being processed in a certain order, they could face unpleasant surprises. -* Should we support splitting `skip` and `take` by having them operate at a - per-task level (skip or take the first `N` elements within each task)? -* Is there a more user-friendly way to share iteration data across consumers? + - Current plan is to address this through documentation. +* Should we support splitting `skip`, `take`, and `scan` by having them + operate at a per-task level (e.g. skip or take the first `N` elements within + each task)? + - Leaning towards supporting these operations at a per-task level. This is + consistent with how skip/take/scan behave today when using distribution + strategies to distribute a dataset. +* Is there a more user-friendly way to share iteration ids across consumers? Distribution strategy is well-equipped with collective ops to share the - iteration data, but sharing the iteration data could be a heavy burden for + iteration ids, but sharing the iteration id could be a heavy burden for some users. + - Distributing iteration ids is simple in the common case where a single + process builds the graph. If users are advanced enough to do distributed + training without distribution strategies, they will likely have a + different mechanism available for distributing iteration ids. +* Can `service.distribute` take a `ClusterResolver` so that the master + hostname isn't baked into the dataset definition? + - We can achieve this by having the `distribute` transformation take a + master_address_or_resolver. From 2f698f9b41461cd272840fa0e9190ff11b06bca0 Mon Sep 17 00:00:00 2001 From: Andrew Audibert Date: Thu, 30 Jan 2020 16:20:26 -0800 Subject: [PATCH 11/11] Update doc based on design review --- rfcs/20200113-tf-data-service.md | 35 ++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/rfcs/20200113-tf-data-service.md b/rfcs/20200113-tf-data-service.md index 004063f6f..8e80c6cdc 100644 --- a/rfcs/20200113-tf-data-service.md +++ b/rfcs/20200113-tf-data-service.md @@ -1,11 +1,11 @@ # Distributed tf.data service -| Status | Proposed | +| Status | Accepted | | :------------ | :------------------------------------------------------ | | **RFC #** | [195](https://github.com/tensorflow/community/pull/195) | | **Author(s)** | Andrew Audibert (aaudibert@google.com) Rohan Jain (rohanj@google.com) | | **Sponsor** | Jiri Simsa (jsimsa@google.com) | -| **Updated** | 2019-01-24 | +| **Updated** | 2019-01-30 | ## Objective @@ -143,14 +143,16 @@ here to implement datasets which produce per-replica elements, enabling idiomatic control flow. ```python -def tf.data.experimental.service.distribute(address): +def tf.data.experimental.service.distribute(address_or_resolver): """Marks that a dataset should be processed by the tf.data service. ds = ... # dataset to distribute - ds = ds.apply(tf.data.experimental.service.distribute(address)) + ds = ds.apply( + tf.data.experimental.service.distribute(address_or_resolver)) Args: - address: The address of the tf.data service master. + address_or_resolver: The address of the tf.data service master, or a + cluster resolver that can be used to determine the master address. Returns: A function that can be passed to `dataset.apply()`. @@ -622,22 +624,25 @@ service. We will also provide a tutorial for using the tf.data service. * How should we communicate that distributing a dataset will change the order in which elements are processed? If users' datasets rely on elements being processed in a certain order, they could face unpleasant surprises. - - Current plan is to address this through documentation. + - Final decision: Address this through documentation. * Should we support splitting `skip`, `take`, and `scan` by having them operate at a per-task level (e.g. skip or take the first `N` elements within each task)? - - Leaning towards supporting these operations at a per-task level. This is - consistent with how skip/take/scan behave today when using distribution - strategies to distribute a dataset. + - Final decision: Prohibit distributing these transformations, and tell + users to instead use these transformations *after* applying the + `distribute` transformation. * Is there a more user-friendly way to share iteration ids across consumers? Distribution strategy is well-equipped with collective ops to share the iteration ids, but sharing the iteration id could be a heavy burden for some users. - - Distributing iteration ids is simple in the common case where a single - process builds the graph. If users are advanced enough to do distributed - training without distribution strategies, they will likely have a - different mechanism available for distributing iteration ids. + - Final decision: It is a reasonable expectation for users to either use + distribution strategies, or distribute their own iteration ids. + TensorFlow will soon have public APIs for collective operations that + would make it easy to broadcast iteration ids. * Can `service.distribute` take a `ClusterResolver` so that the master hostname isn't baked into the dataset definition? - - We can achieve this by having the `distribute` transformation take a - master_address_or_resolver. + - Final decision: Accept `master_address_or_resolver`, and wait to resolve + the master address until iteration begins. The `ClusterResolver` will be + stored in the Python `Dataset` object. In the future, we may want C++ + implementations of `ClusterResolver` so that we can represent the + resolver within the dataset graph.