Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public void run() {
}

private void resetConnection() {
log.info("Reconnecting for stream:" + streamName);
log.info("Reconnecting for stream:" + streamName + " id: " + writerId);
this.streamConnection =
new StreamConnection(
this.client,
Expand All @@ -258,6 +258,7 @@ public void run(Throwable finalStatus) {
doneCallback(finalStatus);
}
});
log.info("Reconnect done for stream:" + streamName + " id: " + writerId);
}

/** Schedules the writing of rows at given offset. */
Expand Down Expand Up @@ -392,13 +393,18 @@ public void close() {
} finally {
this.lock.unlock();
}
log.fine("Waiting for append thread to finish. Stream: " + streamName);
log.fine("Waiting for append thread to finish. Stream: " + streamName + " id: " + writerId);
try {
appendThread.join();
} catch (InterruptedException e) {
// Unexpected. Just swallow the exception with logging.
log.warning(
"Append handler join is interrupted. Stream: " + streamName + " Error: " + e.toString());
"Append handler join is interrupted. Stream: "
+ streamName
+ " id: "
+ writerId
+ " Error: "
+ e.toString());
}
this.client.close();
try {
Expand All @@ -408,14 +414,20 @@ public void close() {
}

try {
log.fine("Begin shutting down user callback thread pool for stream " + streamName);
log.fine(
"Begin shutting down user callback thread pool for stream "
+ streamName
+ " id: "
+ writerId);
threadPool.shutdown();
threadPool.awaitTermination(3, TimeUnit.MINUTES);
} catch (InterruptedException e) {
// Unexpected. Just swallow the exception with logging.
log.warning(
"Close on thread pool for "
+ streamName
+ " id: "
+ writerId
+ " is interrupted with exception: "
+ e.toString());
throw new IllegalStateException(
Expand Down Expand Up @@ -464,6 +476,8 @@ private void appendLoop() {
log.warning(
"Interrupted while waiting for message. Stream: "
+ streamName
+ " id: "
+ writerId
+ " Error: "
+ e.toString());
} finally {
Expand Down Expand Up @@ -539,17 +553,11 @@ private void appendLoop() {
// TODO: Handle NOT_ENOUGH_QUOTA.
// In the close case, the request is in the inflight queue, and will either be returned
// to the user with an error, or will be resent.
log.fine(
"Sending "
+ originalRequestBuilder.getProtoRows().getRows().getSerializedRowsCount()
+ " rows to stream '"
+ originalRequestBuilder.getWriteStream()
+ "'");
this.streamConnection.send(originalRequestBuilder.build());
}
}

log.fine("Cleanup starts. Stream: " + streamName);
log.fine("Cleanup starts. Stream: " + streamName + " id: " + writerId);
// At this point, the waiting queue is drained, so no more requests.
// We can close the stream connection and handle the remaining inflight requests.
if (streamConnection != null) {
Expand All @@ -559,9 +567,12 @@ private void appendLoop() {

// At this point, there cannot be more callback. It is safe to clean up all inflight requests.
log.fine(
"Stream connection is fully closed. Cleaning up inflight requests. Stream: " + streamName);
"Stream connection is fully closed. Cleaning up inflight requests. Stream: "
+ streamName
+ " id: "
+ writerId);
cleanupInflightRequests();
log.fine("Append thread is done. Stream: " + streamName);
log.fine("Append thread is done. Stream: " + streamName + " id: " + writerId);
}

/*
Expand All @@ -581,7 +592,11 @@ private boolean waitingQueueDrained() {
}

private void waitForDoneCallback(long duration, TimeUnit timeUnit) {
log.fine("Waiting for done callback from stream connection. Stream: " + streamName);
log.fine(
"Waiting for done callback from stream connection. Stream: "
+ streamName
+ " id: "
+ writerId);
long deadline = System.nanoTime() + timeUnit.toNanos(duration);
while (System.nanoTime() <= deadline) {
this.lock.lock();
Expand Down Expand Up @@ -630,23 +645,29 @@ private void cleanupInflightRequests() {
} finally {
this.lock.unlock();
}
log.fine("Cleaning " + localQueue.size() + " inflight requests with error: " + finalStatus);
log.fine(
"Cleaning "
+ localQueue.size()
+ " inflight requests with error: "
+ finalStatus
+ " for Stream "
+ streamName
+ " id: "
+ writerId);
while (!localQueue.isEmpty()) {
localQueue.pollFirst().appendResult.setException(finalStatus);
}
}

private void requestCallback(AppendRowsResponse response) {
if (!response.hasUpdatedSchema()) {
log.fine(String.format("Got response on stream %s", response.toString()));
} else {
if (response.hasUpdatedSchema()) {
AppendRowsResponse responseWithUpdatedSchemaRemoved =
response.toBuilder().clearUpdatedSchema().build();

log.fine(
String.format(
"Got response with schema updated (omitting updated schema in response here): %s",
responseWithUpdatedSchemaRemoved.toString()));
"Got response with schema updated (omitting updated schema in response here): %s writer id %s",
responseWithUpdatedSchemaRemoved.toString(), writerId));
}

AppendRequestAndResponse requestWrapper;
Expand Down Expand Up @@ -737,6 +758,8 @@ private void doneCallback(Throwable finalStatus) {
log.fine(
"Received done callback. Stream: "
+ streamName
+ " worker id: "
+ writerId
+ " Final status: "
+ finalStatus.toString());
this.lock.lock();
Expand Down