Skip to content

Conversation

@kfstorm
Copy link
Member

@kfstorm kfstorm commented Jun 27, 2019

What do these changes do?

This is the continuation of #4911. Previously node resource information is stored in client table. This PR moves it to resource table. This change makes client table more lightweight and simplifies the merge logic of client table because RES_CREATEUPDATE and RES_DELETE are removed from code. Resource changes for a living node no need to update client table any more.

Related issue number

Linter

  • I've run scripts/format.sh to lint the changes in this PR.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/14936/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/14952/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/14968/
Test PASSed.

@zhijunfu
Copy link
Contributor

zhijunfu commented Jul 6, 2019

@raulchen @romilbhardwaj Any comments on this one? thanks

@raulchen
Copy link
Contributor

raulchen commented Jul 6, 2019

Oops, I forgot to review this one. Will take a look soon.

}
NodeInfo nodeInfo = new NodeInfo(
clientId, data.getNodeManagerAddress(), true, resources);
clientId, data.getNodeManagerAddress(), true, new HashMap<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, prefer using ImmutableMap.of()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.

Preconditions.checkState(data.getEntryType() == EntryType.DELETION);
NodeInfo nodeInfo = new NodeInfo(clientId, clients.get(clientId).nodeAddress,
false, clients.get(clientId).resources);
false, new HashMap<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.

try (Jedis jedis = jedisPool.getResource()) {
return jedis.hgetAll(key);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove blank line.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the other methods in this class have a blank line at the end. Should I remove the blank lines of all methods?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better also remove them, thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"TablePubsub",
"Task",
"TaskTableData",
"RayResource",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rename RayResource to ResourceTableData?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// 1) Client cache is up to date.
// 2) Lookup resource table after subscribe.
io_service_.post([this]() {
// Fetch resource info for all clients and update cluster resource map.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm misunderstanding something.
Why do we need to fetch resource info for all clients here?
This is the callback of one node being added. Why not just fetch that node?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this lambda is called only when the callback is for the local (self) raylet. This is the case when the raylet has been newly instantiated and has no resource state of other raylets. This lambda populates the resources in the raylet by querying the resource table and calling ResourceCreateUpdated on the resources of all nodes. Is that correct @kfstorm?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@romilbhardwaj You are right. Previously the resource information is stored in the client table, so a client table notification carries resource update information. Now the two kinds of data are separated, we need some extra logic to make sure local resource information is up to date.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the code to lookup resources for only one client at the end of ClientAdded.

worker = worker_pool_.GetRegisteredDriver(client);
std::unordered_map<std::string, std::shared_ptr<gcs::RayResource>> data_map;
auto ray_resource = std::make_shared<gcs::RayResource>();
ray_resource->set_resource_name(resource_name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the resource_name is redundant. Because the resource table is a Hash. The key is already the resource name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean we remove the resource_name field from RayResource structure? BTW, We can't replace RayResource structure with double because the Hash template class requires a protobuf structure as the value type to be able to serialize it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we can have a ResourceTableData that only has a double field.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// Delay execution to make sure,
// 1) Client cache is up to date.
// 2) Lookup resource table after subscribe.
io_service_.post([this]() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how to guarantee that this lambda will run after the client cache is up to date?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The execution of RequestNotifications against the client table will push all entries to raylet. Raylet will handle a single notification with multiple client entries and add alive and dead clients to the client cache. So here we lookup resources asynchronously to ensure all client entries are added to the client cache before the resource lookup operation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is confusing. I updated the code to lookup resources for only one client at the end of ClientAdded.

Copy link
Member

@romilbhardwaj romilbhardwaj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for submitting this PR! I've added some comments

return [node_info[client_id] for client_id in ordered_client_ids]


def _parse_resource_table(redis_client, client_id):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Considering client_id is stored/used in hex at most places in state.py and elsewhere, would it be more consistent to have the client_id argument as hex here and convert it to binary inside this method, rather than invoking it as _parse_resource_table(redis_client, ray.utils.hex_to_binary(client_id))?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Sorry that I'm not familiar with state.py.

Args:
redis_client: A client to the primary Redis shard.
client_id: The client ID of the node.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to clarify the encoding (hex/binary) of the client_id here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

// 1) Client cache is up to date.
// 2) Lookup resource table after subscribe.
io_service_.post([this]() {
// Fetch resource info for all clients and update cluster resource map.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this lambda is called only when the callback is for the local (self) raylet. This is the case when the raylet has been newly instantiated and has no resource state of other raylets. This lambda populates the resources in the raylet by querying the resource table and calling ResourceCreateUpdated on the resources of all nodes. Is that correct @kfstorm?

// Delay execution to make sure,
// 1) Client cache is up to date.
// 2) Lookup resource table after subscribe.
io_service_.post([this]() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps I'm missing something, but instead of using this lambda, why can't we update the resources of the remote clients at line 412 when they're added to the local raylet? Even if client cache is not up to date, subsequent notifications should handle update/delete in the local raylet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to make sure the remote client is already registered in local raylet when receiving a resource update notification. Thus can prevent potential bugs in the resource update notification handling code path.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussed with @raulchen, we think that it's OK to put the lookup at the end of ClientAdded and only lookup resources for one client. So I updated the code as you suggested to avoid the lambda.

}
if (!marked) {
RAY_CHECK_OK(gcs_client_.client_table().MarkDisconnected(client_id));
// Remove all resources of this client from GCS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe sometimes we'll want query the resources of a dead node. I think we can keep it. it won't waste too much memory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

// 1) Client cache is up to date.
// 2) Lookup resource table after subscribe.
io_service_.post([this]() {
// Fetch resource info for all clients and update cluster resource map.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. thanks.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/15268/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-Perf-Integration-PRB/1578/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/15269/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-Perf-Integration-PRB/1579/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-Perf-Integration-PRB/1584/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/15274/
Test PASSed.

@raulchen raulchen merged commit 43b6513 into ray-project:master Jul 11, 2019
@raulchen raulchen deleted the dynamic_resource_hash branch July 11, 2019 05:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants