Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQueryStorageWrite: StreamWriter (via ConnectionWorkerPool) leaks memory on connection failures #2479

Open
yzhaoa opened this issue Apr 29, 2024 · 0 comments
Labels
api: bigquerystorage Issues related to the googleapis/java-bigquerystorage API.

Comments

@yzhaoa
Copy link

yzhaoa commented Apr 29, 2024

Environment details

  1. Specify the API at the beginning of the title. For example, "BigQuery: ...").
    General, Core, and Other are also allowed as types
  2. OS type and version: GKE 1.26.14-gke.1044000 (x86-64) running amazoncorretto:11-al2023-headless
  3. Java version: 11
  4. version(s): 26.17.0 (com.google.cloud:libraries-bom:26.17.0)

Steps to reproduce

  1. Create a StreamWrite with .setEnableConnectionPool(true).
  2. Write to a BigQuery table that fails, e.g. because it doesn't exist, or the schema is wrong. (why? because some times the schema is out of sync. Nevertheless, such a condition should not cause a memory leak). Keep sending traffic to (2) persistently.
  3. Observe that com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool is continually creating new ConnectionWorkers (expected), but old and isConnectionInUnrecoverableState() ConnectionWorkers are retained in field connectionToWriteStream (bug), and never closeed (bug)

Code example

Problem is caused by

: the cleanup operation neither removes the ConnectionWorker from the other field that tracks the ConnectionWorker object, connectionToWriteStream, nor closes it.

Stack trace

None captured

External references such as API reference guides

Any additional information below

The following is the diff between 26.17.0 and the current main branch version:

@@ -18,6 +18,7 @@
 import com.google.api.core.ApiFuture;
 import com.google.api.core.ApiFutures;
 import com.google.api.gax.batching.FlowController;
+import com.google.api.gax.retrying.RetrySettings;
 import com.google.auto.value.AutoValue;
 import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
 import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp;
@@ -41,6 +42,7 @@
 import java.util.logging.Logger;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;

 /** Pool of connections to accept appends and distirbute to different connections. */
@@ -65,6 +67,11 @@
   private final java.time.Duration maxRetryDuration;

   /*
+   * Retry settings for in-stream retries.
+   */
+  private RetrySettings retrySettings;
+
+  /*
    * Behavior when inflight queue is exceeded. Only supports Block or Throw, default is Block.
    */
   private final FlowController.LimitExceededBehavior limitExceededBehavior;
@@ -91,6 +98,10 @@
    * TraceId for debugging purpose.
    */
   private final String traceId;
+  /*
+   * Sets the compression to use for the calls
+   */
+  private String compressorName;

   /** Used for test on the number of times createWorker is called. */
   private final AtomicInteger testValueCreateConnectionCount = new AtomicInteger(0);
@@ -199,14 +210,18 @@
       java.time.Duration maxRetryDuration,
       FlowController.LimitExceededBehavior limitExceededBehavior,
       String traceId,
-      BigQueryWriteSettings clientSettings) {
+      @Nullable String comperssorName,
+      BigQueryWriteSettings clientSettings,
+      RetrySettings retrySettings) {
     this.maxInflightRequests = maxInflightRequests;
     this.maxInflightBytes = maxInflightBytes;
     this.maxRetryDuration = maxRetryDuration;
     this.limitExceededBehavior = limitExceededBehavior;
     this.traceId = traceId;
+    this.compressorName = comperssorName;
     this.clientSettings = clientSettings;
     this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
+    this.retrySettings = retrySettings;
   }

   /**
@@ -379,7 +394,9 @@
             maxRetryDuration,
             limitExceededBehavior,
             traceId,
-            clientSettings);
+            compressorName,
+            clientSettings,
+            retrySettings);
     connectionWorkerPool.add(connectionWorker);
     log.info(
         String.format(

I don't think this issue is fixed by upgrading.

@product-auto-label product-auto-label bot added the api: bigquerystorage Issues related to the googleapis/java-bigquerystorage API. label Apr 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquerystorage Issues related to the googleapis/java-bigquerystorage API.
Projects
None yet
Development

No branches or pull requests

1 participant