Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: tf.data Service #195

Merged
merged 11 commits into from
Feb 3, 2020
Merged

Conversation

aaudiber
Copy link
Contributor

@aaudiber aaudiber commented Jan 13, 2020

Comment period is open till 1/27/2020.

Status Accepted
RFC # 195
Author(s) Andrew Audibert ([email protected]) Rohan Jain ([email protected])
Sponsor Jiri Simsa ([email protected])
Updated 2019-01-30

Provide an API and implementation of a tf.data service which can process tf.data
datasets in a distributed manner. The service can be run outside the TensorFlow
cluster or be exported as a gRPC service by TensorFlow servers.

Goals:

  • Enable horizontal scaling of dataset computation to improve performance of
    input-bound dataset pipelines.
  • Improve tf.data integration with the tf.distribute API. In particular,
    support dynamic sharding of data across multiple processes.
  • Provide visitation guarantees for distributed training jobs.

@aaudiber aaudiber force-pushed the rfc-data-service branch 2 times, most recently from 2ef7231 to 3cf2c06 Compare January 13, 2020 15:17
| :------------ | :------------------------------------------------------ |
| **RFC #**     | [NNN](https://github.com/tensorflow/community/pull/NNN) |
:               : (update when you have community PR #)                   :
| **Author(s)** | Andrew Audibert ([email protected]) Rohan Jain       |
:               : ([email protected])                                     :
| **Sponsor**   | Jiri Simsa ([email protected])                          |
| **Updated**   | 2019-01-13

Provide an API and implementation of a tf.data service which can process tf.data
datasets in a distributed manner. The service can be run outside the TensorFlow
cluster or be exported as a gRPC service by TensorFlow servers.

Goals:

-   Enable horizontal scaling of dataset computation to improve performance of
    input-bound dataset pipelines.
-   Improve tf.data integration with the tf.distribute API. In particular,
    support dynamic sharding of data across multiple processes.
-   Provide visitation guarantees for distributed training jobs.
rfcs/20200113-tf-data-service.md Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Outdated Show resolved Hide resolved
It is expected that the dataset contains at least one `.distribute(address)`
transformation, otherwise this method will print a warning and do nothing.

`create_iteration` will first register the dataset with the tf.data service
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does create_iteration find the service?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The service address is configured within the dataset by calling dataset.apply(tf.data.experimental.service.distribute(address))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider allowing users to specify a ClusterResolver here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. A ClusterResolver would be helpful.

rfcs/20200113-tf-data-service.md Outdated Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Outdated Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Outdated Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Outdated Show resolved Hide resolved
This section calls out caveats that users will need to be aware of when using
the tf.data service.

- Due to the nature of dataset splitting, elements will not be processed in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to document an actual guarantee, such that the order within each split must be consistent with the original global order of the source dataset, but no promises are made around ordering across splits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have such a guarantee because the dataset pipeline for each task processes multiple splits, so even the order within the split could be different than the order if the split was processed in isolation.

@byronyi byronyi mentioned this pull request Jan 14, 2020
Update the proposal to support exactly-once visitation even
when the service is executing non-deterministically.

Also, add discussion of the visitation guarantees provided
when the dataset produces outputs non-deterministically.
Copy link
Member

@feihugis feihugis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @aaudiber, @rohan100jain and @jsimsa for this RFC!! The distributed dataset is really cool and can speed up both the training & inference and simplify the implementation of distributed data pipeline! A few comments are added above.

rfcs/20200113-tf-data-service.md Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Outdated Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Outdated Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Outdated Show resolved Hide resolved
N input workers to feed M accelerators. The number of input workers can be
scaled up or down as needed to keep up with the accelerators.

### Distributed training requires a distribution-aware input pipeline.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The distributed dataset will be very useful for the inference as well!

Copy link
Contributor Author

@aaudiber aaudiber left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @feihugis for the thoughtful comments

rfcs/20200113-tf-data-service.md Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Outdated Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Outdated Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Show resolved Hide resolved
rfcs/20200113-tf-data-service.md Outdated Show resolved Hide resolved
provides dataset elements to consumers over RPC.

**Consumer**: A machine which consumes data from the tf.data service. The
consumer may be attached to a GPU or TPU, or use data for on-CPU training.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to include an example configuration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What sort of configuration are you interested in?

@pavanky
Copy link

pavanky commented Jan 18, 2020

This looks great! I know this is not the goal of this proposal, but would it be possible (in the future) to build on this and have the training jobs consume the datasets from any a gRPC service (not necessarily one generated using tensorflow) as long as it conforms to a certain API ?

@byronyi
Copy link
Contributor

byronyi commented Jan 20, 2020

This looks great! I know this is not the goal of this proposal, but would it be possible (in the future) to build on this and have the training jobs consume the datasets from any a gRPC service (not necessarily one generated using tensorflow) as long as it conforms to a certain API ?

@pavanky Checkout tensorflow/io#206

@pavanky
Copy link

pavanky commented Jan 21, 2020

This looks great! I know this is not the goal of this proposal, but would it be possible (in the future) to build on this and have the training jobs consume the datasets from any a gRPC service (not necessarily one generated using tensorflow) as long as it conforms to a certain API ?

@pavanky Checkout tensorflow/io#206

Thanks!

@aaudiber
Copy link
Contributor Author

aaudiber commented Jan 30, 2020

Meeting notes from design review on 1/30/20 (thank you @rohan100jain for taking these!):

Changes to do the doc since mailed out

  • Visitation guarantees: Even if pipeline is non-deterministic, we can provide exactly once visitation guarantee.
  • Other changes are part of the comments to be discussed now.

Distribution may change the order of elements being produced by the dataset. How do we communicate this? Current plan is documentation

  • apassos: Hopefully the elements dont change.
  • andrew: the batches created could be different because we combine different examples.

skip, take and scan might not be splittable e.g. take: do it per task and so we end up with 10 * num_worker elements

  • andrew: one proposal is that we do it on a per task level since we already do it for distribution strategies.
  • derek: can we make it clear in the API that some prefix is being distributed and then afterward its not distributed.
  • andrew: can chain transformation after distribute for that distinction
  • derek: should we prohibit this transformations in the distributed part.
  • apassos: didn’t realize we can chain after distribute as well

decision: prohibit and ask users to chain it afterwards

Share iteration ids between tasks

  • apassos: this is only needed for multi-client training, single graph we can just give it. Are these regular tensors?
  • derek: collective broadcast?
  • apassos: we’re working on creating these collective ops and they will be available open source soon.
  • anjali: how often do we need to do this? A: once per epoch

decision: use collective ops to broadcast the iteration ids. They will be publicly available soon.

service.distribute takes in a ClusterResolver instead of a master address

  • derek: ClusterResolver is a python object?
  • andrew: yes, and its not serializable right now
  • derek: we can use a pyfunc to see the ClusterResolver.
  • apassos: We might need a C++ cluster resolver thing for Kubernetes etc.

decision: allow ClusterResolver as input and change API arg to master_address_or_resolver

azaks: is the order deterministic?

  • apassos: can we guarantee a partial order that is consistent with the global order. I think we have this true for the current distributed dataset.
  • jiri: if we shard the filenames, then we can have batches with different elements.
  • apassos: if you had no shuffle and no batching, then if B comes after A, we can guarantee?
  • jiri: We might have to add more transformations to the fine print. Within a split, we can guarantee ordering. We can run the pipeline deterministically but it’ll come at a performance disadvantage.
  • derek: we might be conflicting non-determinism with ordering (flat map vs. interleave). We can get most of the performance by having different ordering but deterministic execution.
  • andrew: we can have some advanced documentation that describes what are the guarantees.

@byronyi
Copy link
Contributor

byronyi commented Jan 31, 2020

Congratulations to acceptance of this RFC! Can't wait to see it implemented in TF core.

@ematejska ematejska removed the request for review from brijk7 February 3, 2020 22:01
@ematejska ematejska self-requested a review February 3, 2020 22:01
@ematejska ematejska added the RFC: Accepted RFC Design Document: Accepted by Review label Feb 3, 2020
@ematejska ematejska merged commit 57e8fcb into tensorflow:master Feb 3, 2020
Copy link
Member

@katsiapis katsiapis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks and apologies for the delayed review!

Args:
dataset: The dataset to begin iteration over.
num_consumers: The number of consumers to divide the dataset between. Set
this if you require determinism.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wording of his suggests it is optional. Should its default value be None to imply Auto, or is this actually not optional in which case we shouldn't mention that this needs to be set for determinism?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user wants determinism, they need to tell the tf.data service ahead of time how many consumers they will use for reading. Otherwise, the user doesn't need to worry about num_consumers. They can leave it as None, and read with as many consumers as they want.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the default then be None (instead of 1)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I thought it was None already but you are right! It should be None.

producing deterministic results.
deterministic: Whether the iteration should be performed
deterministically. Fully deterministic output also requires setting
`num_tasks` to a fixed number, and that the input dataset is itself
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and num_tasks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and num_consumers

task. Normally it is best to leave this as None so that the master can
choose a reasonable number of tasks. Setting `num_tasks` is useful for
producing deterministic results.
deterministic: Whether the iteration should be performed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So setting both num_tasks and num_consumers and having a deterministic dataset does not suffice, so we also need to set this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see what you're getting at; deterministic is superfluous since if users are setting num_tasks and num_consumers, they clearly care about determinism. I think we can improve this API by splitting create_iteration into create_iteration(dataset) and create_deterministic_iteration(dataset, num_tasks, num_consumers)

producing deterministic results.
deterministic: Whether the iteration should be performed
deterministically. Fully deterministic output also requires setting
`num_tasks` to a fixed number, and that the input dataset is itself
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is set to true (and num_tasks and num_consumers are also set) can we somehow automatically detect non-determinism at the dataset level at construction time or runtime (and raise appropriate error(s))? I could see this saving someone a lot of debugging (eg they expect determinism but they don't in fact get it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that would be a good sanity check. In general we don't have a way to detect non-determinism in a dataset, but one thing we can do today is validate that the experimental_deterministic dataset option isn't set to False.

* Minimize Surprises: Users write their datasets as though they will not be
split, so introducing splitting can easily lead to unexpected outcomes. To
mitigate this, we will be conservative about which dataset transformations
support splitting.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the proposal to have a whitelist of "splittable" transformations, and only if a Dataset consists of only splittable transformations can it be split?

Or are you thinking of a blacklist of non-splittable transformations instead?

In other distributed systems I've seen, splittability is usually a function of the source (not the transformations) and most user-defined transformations don't in fact break splittability (so a blacklist feels a bit more natural).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're taking a conservative whitelisting approach to avoid unexpected behavior. We whitelist a dataset as splittable if processing and re-combining its splits gives almost identical results to processing the original dataset.

In most distributed systems, the pipeline writers are aware that what they are writing will be executed in a distributed fashion. With tf.data service, most users are expected to write their datasets with a single-host mental model.

```cpp
class SplitGenerator {
public:
virtual Status GetNext(std::unique_ptr<Split>* split,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: StatusOr?

Similarly elsewhere that this applies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth considering, but note that Status is consistent with the vast majority of TensorFlow core.

#### Supported Datasets

Not all dataset sources and transformations are easily splittable. For example,
`take`, `skip`, and `scan` require a global view of the dataset to produce
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still not clear to me why take and skip are fundamentally not splittable. I've seen them be splittable in other distributed systems.

correct results. Datasets which require multiple input datasets such as `zip`
are also difficult to support, since we don't have a good way of aligning the
splits of multiple input datasets. Users who rely on these unsupported datasets
will need to move those datasets to come after the distributed part of their
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe good to make this clear in the high level api above, where we can mention that anything that comes after the ds.apply(distribute) is not in fact distributed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

being strongly consistent . Users may opt to use a filesystem that doesn't
support strong consistency, but they do so at the risk of two concurrently
running masters thinking they are leader. Common filesystems such as POSIX,
HDFS, and GCS support such strong consistency, but S3 does not.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Frequently they have read-after-write consistency. So if we could remove the "list_directory" part of the protocol above (and instead communicate information otherwise) we might be able to support more filesystems.

[TFX](https://www.tensorflow.org/tfx). A framework can make leveraging the
tf.data service as simple as toggling a configuration boolean, triggering the
framework to bring up tf.data service servers and add a
`tf.data.experimental.service.distribute` transformation at the end of the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, up to now my understanding wast that tf.data.service itself would do the horizontal auto-scaling.

Here it's suggested that needs to happen externally? Or am I misunderstanding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tooling could be built around the tf.data service so that it can automatically scale, but I wouldn't consider that part of the core tf.data service. To enable autoscaling, the tf.data service would report whether it could use more resources, and it is up to external tooling to start more tf.data servers.

@thisisandreeeee
Copy link

Would this also help address memory issues associated with caching large datasets? In a distributed tf.data architecture, is the dataset cache distributed without overlap across the workers?

@byronyi
Copy link
Contributor

byronyi commented Apr 21, 2020

@thisisandreeeee take a look at #193.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla: yes RFC: Accepted RFC Design Document: Accepted by Review
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants