Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .sdkmanrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Enable auto-env through the sdkman_auto_env config
# Add key=value pairs of SDKs to use below
java=11.0.27-tem
90 changes: 60 additions & 30 deletions client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import java.util.logging.Logger;

/**
* Task hub worker that connects to a sidecar process over gRPC to execute orchestrator and activity events.
* Task hub worker that connects to a sidecar process over gRPC to execute
* orchestrator and activity events.
*/
public final class DurableTaskGrpcWorker implements AutoCloseable {

Expand All @@ -39,6 +40,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
private final TaskHubSidecarServiceBlockingStub sidecarClient;
private final boolean isExecutorServiceManaged;
private volatile boolean isNormalShutdown = false;
private Thread workerThread;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the real change


DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
this.orchestrationFactories.putAll(builder.orchestrationFactories);
Expand Down Expand Up @@ -66,43 +68,58 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {

this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL;
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval
: DEFAULT_MAXIMUM_TIMER_INTERVAL;
this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool();
this.isExecutorServiceManaged = builder.executorService == null;
}

/**
* Establishes a gRPC connection to the sidecar and starts processing work-items in the background.
* Establishes a gRPC connection to the sidecar and starts processing work-items
* in the background.
* <p>
* This method retries continuously to establish a connection to the sidecar. If a connection fails,
* a warning log message will be written and a new connection attempt will be made. This process
* continues until either a connection succeeds or the process receives an interrupt signal.
* This method retries continuously to establish a connection to the sidecar. If
* a connection fails,
* a warning log message will be written and a new connection attempt will be
* made. This process
* continues until either a connection succeeds or the process receives an
* interrupt signal.
*/
public void start() {
new Thread(this::startAndBlock).start();
this.workerThread = new Thread(this::startAndBlock);
this.workerThread.start();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the real change

}

/**
* Closes the internally managed gRPC channel and executor service, if one exists.
* Closes the internally managed gRPC channel and executor service, if one
* exists.
* <p>
* Only the internally managed GRPC Channel and Executor services are closed. If any of them are supplied,
* Only the internally managed GRPC Channel and Executor services are closed. If
* any of them are supplied,
* it is the responsibility of the supplier to take care of them.
*/
public void close() {
this.workerThread.interrupt();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the real change

this.isNormalShutdown = true;
this.shutDownWorkerPool();
this.closeSideCarChannel();
}

/**
* Establishes a gRPC connection to the sidecar and starts processing work-items on the current thread.
* This method call blocks indefinitely, or until the current thread is interrupted.
* Establishes a gRPC connection to the sidecar and starts processing work-items
* on the current thread.
* This method call blocks indefinitely, or until the current thread is
* interrupted.
* <p>
* Use can alternatively use the {@link #start} method to run orchestration processing in a background thread.
* Use can alternatively use the {@link #start} method to run orchestration
* processing in a background thread.
* <p>
* This method retries continuously to establish a connection to the sidecar. If a connection fails,
* a warning log message will be written and a new connection attempt will be made. This process
* continues until either a connection succeeds or the process receives an interrupt signal.
* This method retries continuously to establish a connection to the sidecar. If
* a connection fails,
* a warning log message will be written and a new connection attempt will be
* made. This process
* continues until either a connection succeeds or the process receives an
* interrupt signal.
*/
public void startAndBlock() {
logger.log(Level.INFO, "Durable Task worker is connecting to sidecar at {0}.", this.getSidecarAddress());
Expand All @@ -117,7 +134,6 @@ public void startAndBlock() {
this.dataConverter,
logger);

// TODO: How do we interrupt manually?
while (true) {
try {
GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build();
Expand Down Expand Up @@ -145,11 +161,17 @@ public void startAndBlock() {
this.sidecarClient.completeOrchestratorTask(response);
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
logger.log(Level.WARNING, "The sidecar at address {0} is unavailable while completing the orchestrator task.", this.getSidecarAddress());
logger.log(Level.WARNING,
"The sidecar at address {0} is unavailable while completing the orchestrator task.",
this.getSidecarAddress());
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
logger.log(Level.WARNING, "Durable Task worker has disconnected from {0} while completing the orchestrator task.", this.getSidecarAddress());
logger.log(Level.WARNING,
"Durable Task worker has disconnected from {0} while completing the orchestrator task.",
this.getSidecarAddress());
} else {
logger.log(Level.WARNING, "Unexpected failure completing the orchestrator task at {0}.", this.getSidecarAddress());
logger.log(Level.WARNING,
"Unexpected failure completing the orchestrator task at {0}.",
this.getSidecarAddress());
}
}
});
Expand Down Expand Up @@ -189,29 +211,35 @@ public void startAndBlock() {
this.sidecarClient.completeActivityTask(responseBuilder.build());
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
logger.log(Level.WARNING, "The sidecar at address {0} is unavailable while completing the activity task.", this.getSidecarAddress());
logger.log(Level.WARNING,
"The sidecar at address {0} is unavailable while completing the activity task.",
this.getSidecarAddress());
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
logger.log(Level.WARNING, "Durable Task worker has disconnected from {0} while completing the activity task.", this.getSidecarAddress());
logger.log(Level.WARNING,
"Durable Task worker has disconnected from {0} while completing the activity task.",
this.getSidecarAddress());
} else {
logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.", this.getSidecarAddress());
logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.",
this.getSidecarAddress());
}
}
});
}
else if (requestType == RequestCase.HEALTHPING)
{
} else if (requestType == RequestCase.HEALTHPING) {
// No-op
} else {
logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.", requestType);
logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.",
requestType);
}
}
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
logger.log(Level.INFO, "The sidecar at address {0} is unavailable. Will continue retrying.", this.getSidecarAddress());
logger.log(Level.INFO, "The sidecar at address {0} is unavailable. Will continue retrying.",
this.getSidecarAddress());
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
logger.log(Level.INFO, "Durable Task worker has disconnected from {0}.", this.getSidecarAddress());
} else {
logger.log(Level.WARNING, String.format("Unexpected failure connecting to %s", this.getSidecarAddress()), e);
logger.log(Level.WARNING,
String.format("Unexpected failure connecting to %s", this.getSidecarAddress()), e);
}

// Retry after 5 seconds
Expand All @@ -225,7 +253,8 @@ else if (requestType == RequestCase.HEALTHPING)
}

/**
* Stops the current worker's listen loop, preventing any new orchestrator or activity events from being processed.
* Stops the current worker's listen loop, preventing any new orchestrator or
* activity events from being processed.
*/
public void stop() {
this.close();
Expand All @@ -246,7 +275,8 @@ private void closeSideCarChannel() {
private void shutDownWorkerPool() {
if (this.isExecutorServiceManaged) {
if (!this.isNormalShutdown) {
logger.log(Level.WARNING, "ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted");
logger.log(Level.WARNING,
"ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted");
}

this.workerPool.shutdown();
Expand Down
Loading