diff --git a/java/client/src/main/java/io/vitess/client/VTGateConnection.java b/java/client/src/main/java/io/vitess/client/VTGateConnection.java index d22f01f5dc2..480de673a57 100644 --- a/java/client/src/main/java/io/vitess/client/VTGateConnection.java +++ b/java/client/src/main/java/io/vitess/client/VTGateConnection.java @@ -36,6 +36,8 @@ import io.vitess.proto.Vtgate.SplitQueryRequest; import io.vitess.proto.Vtgate.SplitQueryResponse; import io.vitess.proto.Vtgate.StreamExecuteRequest; +import io.vitess.proto.Vtgate.VStreamRequest; +import io.vitess.proto.Vtgate.VStreamResponse; import java.io.Closeable; import java.io.IOException; @@ -252,6 +254,26 @@ public ListenableFuture> apply( }, directExecutor())); } + /** + * Starts streaming the vstream binlog events. + * + * @param ctx Context on user and execution deadline if any. + * @param vstreamRequest VStreamRequest containing starting VGtid positions + * in binlog and optional Filters + * @return Streaming iterator over VStream events + * @throws SQLException If anything fails on query execution. + */ + StreamIterator getVStream(Context ctx, VStreamRequest vstreamRequest) + throws SQLException { + VStreamRequest request = vstreamRequest; + + if (ctx.getCallerId() != null) { + request = request.toBuilder().setCallerId(ctx.getCallerId()).build(); + } + + return client.getVStream(ctx, request); + } + /** * @inheritDoc */