Skip to content

Commit 5094800

Browse files
authored
Merge pull request #30 from javier-aliaga/fix-startAndBlock
Improve GrpcWorker shutdown
2 parents 5a579db + d911e00 commit 5094800

File tree

2 files changed

+63
-30
lines changed

2 files changed

+63
-30
lines changed

.sdkmanrc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Enable auto-env through the sdkman_auto_env config
2+
# Add key=value pairs of SDKs to use below
3+
java=11.0.27-tem

client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java

Lines changed: 60 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
import java.util.logging.Logger;
2121

2222
/**
23-
* Task hub worker that connects to a sidecar process over gRPC to execute orchestrator and activity events.
23+
* Task hub worker that connects to a sidecar process over gRPC to execute
24+
* orchestrator and activity events.
2425
*/
2526
public final class DurableTaskGrpcWorker implements AutoCloseable {
2627

@@ -39,6 +40,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
3940
private final TaskHubSidecarServiceBlockingStub sidecarClient;
4041
private final boolean isExecutorServiceManaged;
4142
private volatile boolean isNormalShutdown = false;
43+
private Thread workerThread;
4244

4345
DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
4446
this.orchestrationFactories.putAll(builder.orchestrationFactories);
@@ -66,43 +68,58 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
6668

6769
this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
6870
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
69-
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL;
71+
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval
72+
: DEFAULT_MAXIMUM_TIMER_INTERVAL;
7073
this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool();
7174
this.isExecutorServiceManaged = builder.executorService == null;
7275
}
7376

7477
/**
75-
* Establishes a gRPC connection to the sidecar and starts processing work-items in the background.
78+
* Establishes a gRPC connection to the sidecar and starts processing work-items
79+
* in the background.
7680
* <p>
77-
* This method retries continuously to establish a connection to the sidecar. If a connection fails,
78-
* a warning log message will be written and a new connection attempt will be made. This process
79-
* continues until either a connection succeeds or the process receives an interrupt signal.
81+
* This method retries continuously to establish a connection to the sidecar. If
82+
* a connection fails,
83+
* a warning log message will be written and a new connection attempt will be
84+
* made. This process
85+
* continues until either a connection succeeds or the process receives an
86+
* interrupt signal.
8087
*/
8188
public void start() {
82-
new Thread(this::startAndBlock).start();
89+
this.workerThread = new Thread(this::startAndBlock);
90+
this.workerThread.start();
8391
}
8492

8593
/**
86-
* Closes the internally managed gRPC channel and executor service, if one exists.
94+
* Closes the internally managed gRPC channel and executor service, if one
95+
* exists.
8796
* <p>
88-
* Only the internally managed GRPC Channel and Executor services are closed. If any of them are supplied,
97+
* Only the internally managed GRPC Channel and Executor services are closed. If
98+
* any of them are supplied,
8999
* it is the responsibility of the supplier to take care of them.
90100
*/
91101
public void close() {
102+
this.workerThread.interrupt();
92103
this.isNormalShutdown = true;
93104
this.shutDownWorkerPool();
94105
this.closeSideCarChannel();
95106
}
96107

97108
/**
98-
* Establishes a gRPC connection to the sidecar and starts processing work-items on the current thread.
99-
* This method call blocks indefinitely, or until the current thread is interrupted.
109+
* Establishes a gRPC connection to the sidecar and starts processing work-items
110+
* on the current thread.
111+
* This method call blocks indefinitely, or until the current thread is
112+
* interrupted.
100113
* <p>
101-
* Use can alternatively use the {@link #start} method to run orchestration processing in a background thread.
114+
* Use can alternatively use the {@link #start} method to run orchestration
115+
* processing in a background thread.
102116
* <p>
103-
* This method retries continuously to establish a connection to the sidecar. If a connection fails,
104-
* a warning log message will be written and a new connection attempt will be made. This process
105-
* continues until either a connection succeeds or the process receives an interrupt signal.
117+
* This method retries continuously to establish a connection to the sidecar. If
118+
* a connection fails,
119+
* a warning log message will be written and a new connection attempt will be
120+
* made. This process
121+
* continues until either a connection succeeds or the process receives an
122+
* interrupt signal.
106123
*/
107124
public void startAndBlock() {
108125
logger.log(Level.INFO, "Durable Task worker is connecting to sidecar at {0}.", this.getSidecarAddress());
@@ -117,7 +134,6 @@ public void startAndBlock() {
117134
this.dataConverter,
118135
logger);
119136

120-
// TODO: How do we interrupt manually?
121137
while (true) {
122138
try {
123139
GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build();
@@ -145,11 +161,17 @@ public void startAndBlock() {
145161
this.sidecarClient.completeOrchestratorTask(response);
146162
} catch (StatusRuntimeException e) {
147163
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
148-
logger.log(Level.WARNING, "The sidecar at address {0} is unavailable while completing the orchestrator task.", this.getSidecarAddress());
164+
logger.log(Level.WARNING,
165+
"The sidecar at address {0} is unavailable while completing the orchestrator task.",
166+
this.getSidecarAddress());
149167
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
150-
logger.log(Level.WARNING, "Durable Task worker has disconnected from {0} while completing the orchestrator task.", this.getSidecarAddress());
168+
logger.log(Level.WARNING,
169+
"Durable Task worker has disconnected from {0} while completing the orchestrator task.",
170+
this.getSidecarAddress());
151171
} else {
152-
logger.log(Level.WARNING, "Unexpected failure completing the orchestrator task at {0}.", this.getSidecarAddress());
172+
logger.log(Level.WARNING,
173+
"Unexpected failure completing the orchestrator task at {0}.",
174+
this.getSidecarAddress());
153175
}
154176
}
155177
});
@@ -189,29 +211,35 @@ public void startAndBlock() {
189211
this.sidecarClient.completeActivityTask(responseBuilder.build());
190212
} catch (StatusRuntimeException e) {
191213
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
192-
logger.log(Level.WARNING, "The sidecar at address {0} is unavailable while completing the activity task.", this.getSidecarAddress());
214+
logger.log(Level.WARNING,
215+
"The sidecar at address {0} is unavailable while completing the activity task.",
216+
this.getSidecarAddress());
193217
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
194-
logger.log(Level.WARNING, "Durable Task worker has disconnected from {0} while completing the activity task.", this.getSidecarAddress());
218+
logger.log(Level.WARNING,
219+
"Durable Task worker has disconnected from {0} while completing the activity task.",
220+
this.getSidecarAddress());
195221
} else {
196-
logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.", this.getSidecarAddress());
222+
logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.",
223+
this.getSidecarAddress());
197224
}
198225
}
199226
});
200-
}
201-
else if (requestType == RequestCase.HEALTHPING)
202-
{
227+
} else if (requestType == RequestCase.HEALTHPING) {
203228
// No-op
204229
} else {
205-
logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.", requestType);
230+
logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.",
231+
requestType);
206232
}
207233
}
208234
} catch (StatusRuntimeException e) {
209235
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
210-
logger.log(Level.INFO, "The sidecar at address {0} is unavailable. Will continue retrying.", this.getSidecarAddress());
236+
logger.log(Level.INFO, "The sidecar at address {0} is unavailable. Will continue retrying.",
237+
this.getSidecarAddress());
211238
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
212239
logger.log(Level.INFO, "Durable Task worker has disconnected from {0}.", this.getSidecarAddress());
213240
} else {
214-
logger.log(Level.WARNING, String.format("Unexpected failure connecting to %s", this.getSidecarAddress()), e);
241+
logger.log(Level.WARNING,
242+
String.format("Unexpected failure connecting to %s", this.getSidecarAddress()), e);
215243
}
216244

217245
// Retry after 5 seconds
@@ -225,7 +253,8 @@ else if (requestType == RequestCase.HEALTHPING)
225253
}
226254

227255
/**
228-
* Stops the current worker's listen loop, preventing any new orchestrator or activity events from being processed.
256+
* Stops the current worker's listen loop, preventing any new orchestrator or
257+
* activity events from being processed.
229258
*/
230259
public void stop() {
231260
this.close();
@@ -246,7 +275,8 @@ private void closeSideCarChannel() {
246275
private void shutDownWorkerPool() {
247276
if (this.isExecutorServiceManaged) {
248277
if (!this.isNormalShutdown) {
249-
logger.log(Level.WARNING, "ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted");
278+
logger.log(Level.WARNING,
279+
"ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted");
250280
}
251281

252282
this.workerPool.shutdown();

0 commit comments

Comments
 (0)