-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[New Scheduler] Add DataManagementService #5063
Conversation
case object GrantLease | ||
|
||
// TBD | ||
class LeaseKeepAliveService { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized this pr is also dependent on these modules.
It would be better to merge this PR after this kind of module is introduced.
common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
Outdated
Show resolved
Hide resolved
common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
Outdated
Show resolved
Hide resolved
common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
Outdated
Show resolved
Hide resolved
// normally these messages will be sent when queues are created. | ||
case request: ElectLeader => | ||
if (inProgressKeys.contains(request.key)) { | ||
logging.info(this, s"save request $request into a buffer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do these really need to be info level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, it stores the request into a buffer because there is already precedent request processing. If any issue happens it would let us know if the request has processed or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I'm still learning what this is doing, but what does each request involve? Is it every activation or some sort of metadata setup? If it's every activation it would seem spammy to me otherwise I think it's fine
common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
Outdated
Show resolved
Hide resolved
.map { res => | ||
parent ! FinishWork(request.key) | ||
if (res.getSucceeded) { | ||
logging.debug(this, s"data is stored.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what data is stored?
common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
Outdated
Show resolved
Hide resolved
common/scala/src/main/scala/org/apache/openwhisk/core/service/LeaseKeepAliveService.scala
Outdated
Show resolved
Hide resolved
common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
Outdated
Show resolved
Hide resolved
common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
Outdated
Show resolved
Hide resolved
I think we need to merge a PR for CI tests for scheduler components before merging this PR. |
* In the event any issue occurs while storing data, the actor keeps trying until the data is stored guaranteeing delivery to ETCD. | ||
* So it guarantees the data is eventually stored. | ||
*/ | ||
class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will include test cases into this PR after setting up the CI pipeline for scheduler components.
} | ||
|
||
// normally these messages will be sent when queues are created. | ||
case request: ElectLeader => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leader election happens when a queue is created.
This is to guarantee only one scheduler creates a certain queue.
So it happens relatively fewer times.
|
||
// normally these messages will be sent when queues are created. | ||
case request: ElectLeader => | ||
if (inProgressKeys.contains(request.key)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the retry nature of this component, if there is a precedent request(being retried), it would store the new request to a buffer.
logging.info(this, s"save a request $request into a buffer") | ||
operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request) | ||
} else { | ||
worker ! request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actual works would be delegated to ETCDWorker.
inProgressKeys = inProgressKeys + request.key | ||
} | ||
|
||
case request: RegisterInitialData => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actions under the same namespace share some data such as namespace throttling data.
So it is required to store the data if there is no data yet but not overwrite an existing one.
This case is for the case.
|
||
case request: RegisterInitialData => | ||
// send WatchEndpoint first as the put operation will be retried until success if failed | ||
if (request.failoverEnabled) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the failover is enabled, it would watch the key and if the key is deleted for some reason, it would try to restore it.
inProgressKeys = inProgressKeys + request.key | ||
} | ||
|
||
case request: RegisterData => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will overwrite the existing data in ETCD.
Generally, this is used for data that is not shared among actions.
case WatchEndpointRemoved(_, key, value, true) => | ||
logging.error(this, s"unexpected data received: ${WatchEndpoint(key, value, isPrefix = true, watcherName)}") | ||
|
||
case msg: UpdateDataOnChange => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To reduce the loads against ETCD, it does not store data if there is no change in the value.
actorSystem: ActorSystem) | ||
extends Actor { | ||
private implicit val ec = context.dispatcher | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is used by both schedulers and invokers to store data to ETCD.
The following kinds of data are stored to ETCD.
- Throttling data(Action / Namespace)
- Queue endpoint(where a queue is running)
- Scheduler endpoint.
- Container data(running container, warmed container, data to describe how many containers are being created)
Dependent modules are Queue, ContainerProxy, CreationJobManager, etc.
Just comes up in my mind is it would be great to write down some documents for each component in Wiki. |
bab5ce6
to
fd4d4e3
Compare
a20e5d2
to
448753b
Compare
Waiting for this PR(#5067) to be merged. |
I wrote a document about this module: https://cwiki.apache.org/confluence/display/OPENWHISK/DataManagementService |
…DataManagementService.scala Apply comment Co-authored-by: Brendan Doyle <[email protected]>
Update comments Co-authored-by: Brendan Doyle <[email protected]>
448753b
to
56a4f2c
Compare
Codecov Report
@@ Coverage Diff @@
## master #5063 +/- ##
==========================================
- Coverage 81.63% 75.03% -6.60%
==========================================
Files 205 214 +9
Lines 10013 10448 +435
Branches 442 470 +28
==========================================
- Hits 8174 7840 -334
- Misses 1839 2608 +769
Continue to review full report at Codecov.
|
It's ready to merge. |
@bdoyle0182 Do you have any other comments on this PR? |
Just one comment. LGTM |
Description
This component is in charge of storing data to ETCD.
It is based on eventual consistency.
If it fails to store data for some reason, it keeps retrying until data is stored.
Related issue and scope
My changes affect the following components
Types of changes
Checklist: