Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added .build-jdk8
Empty file.
31 changes: 31 additions & 0 deletions go/cmd/vtgateclienttest/services/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/vtgate/vtgateservice"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
Expand Down Expand Up @@ -395,3 +396,33 @@ func (c *echoClient) UpdateStream(ctx context.Context, keyspace string, shard st
}
return c.fallbackClient.UpdateStream(ctx, keyspace, shard, keyRange, tabletType, timestamp, event, callback)
}

func (c *echoClient) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, callback func([]*binlogdatapb.VEvent) error) error {
if strings.HasPrefix(vgtid.ShardGtids[0].Shard, EchoPrefix) {
_ = callback([]*binlogdatapb.VEvent{
{
Type: 1,
Timestamp: 1234,
Gtid: "echo-gtid-1",
Ddl: "echo-ddl-1",
Vgtid: vgtid,
RowEvent: &binlogdatapb.RowEvent{
TableName:"echo-table-1",
},
},
{
Type: 2,
Timestamp: 4321,
Gtid: "echo-gtid-2",
Ddl: "echo-ddl-2",
Vgtid: vgtid,
FieldEvent: &binlogdatapb.FieldEvent{
TableName:"echo-table-2",
},
},
})
return nil
}

return c.fallbackClient.VStream(ctx, tabletType, vgtid, filter, callback)
}
14 changes: 14 additions & 0 deletions java/client/src/main/java/io/vitess/client/RpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import io.vitess.proto.Vtgate.StreamExecuteKeyspaceIdsRequest;
import io.vitess.proto.Vtgate.StreamExecuteRequest;
import io.vitess.proto.Vtgate.StreamExecuteShardsRequest;
import io.vitess.proto.Vtgate.VStreamRequest;
import io.vitess.proto.Vtgate.VStreamResponse;
import io.vitess.proto.Vtrpc.RPCError;

import java.io.Closeable;
Expand Down Expand Up @@ -257,4 +259,16 @@ ListenableFuture<GetSrvKeyspaceResponse> getSrvKeyspace(
* definition for canonical documentation on this VTGate API.
*/
SQLException checkError(RPCError error);

/**
* Starts streaming the vstream binlog events.
*
* Stream begins at the specified VGTID.
*
* <p>See the
* <a href="https://github.com/vitessio/vitess/blob/master/proto/vtgateservice.proto">proto</a>
* definition for canonical documentation on this VTGate API.
*/
StreamIterator<VStreamResponse> getVStream(
Context ctx, VStreamRequest vstreamRequest) throws SQLException;
}
55 changes: 55 additions & 0 deletions java/client/src/test/java/io/vitess/client/RpcClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;

import binlogdata.Binlogdata.FieldEvent;
import binlogdata.Binlogdata.RowEvent;
import binlogdata.Binlogdata.ShardGtid;
import binlogdata.Binlogdata.VEvent;
import binlogdata.Binlogdata.VEventType;
import binlogdata.Binlogdata.VGtid;
import io.vitess.client.cursor.Cursor;
import io.vitess.client.cursor.Row;
import io.vitess.proto.Query;
Expand All @@ -33,6 +39,8 @@
import io.vitess.proto.Topodata.SrvKeyspace.KeyspacePartition;
import io.vitess.proto.Topodata.TabletType;
import io.vitess.proto.Vtgate.SplitQueryResponse;
import io.vitess.proto.Vtgate.VStreamRequest;
import io.vitess.proto.Vtgate.VStreamResponse;
import io.vitess.proto.Vtrpc.CallerID;

import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -730,4 +738,51 @@ void execute(VTGateBlockingTx tx, String query) throws Exception {
}
});
}

@Test
public void testVStream() throws Exception {
VGtid vgtid = VGtid.newBuilder()
.addShardGtids(ShardGtid.newBuilder()
.setGtid("gtid")
.setShard(ECHO_PREFIX + System.currentTimeMillis())
.setKeyspace("keyspace: " + System.currentTimeMillis())
.build())
.build();

VStreamRequest vstreamRequest = VStreamRequest.newBuilder()
.setCallerId(CALLER_ID)
.setVgtid(vgtid)
.setTabletType(TABLET_TYPE)
.build();

StreamIterator<VStreamResponse> vstream = client.getVStream(ctx, vstreamRequest);
VStreamResponse actual = vstream.next();
Assert.assertFalse(vstream.hasNext());

VStreamResponse expected = VStreamResponse.newBuilder()
.addEvents(VEvent.newBuilder()
.setType(VEventType.forNumber(1))
.setTimestamp(1234)
.setGtid("echo-gtid-1")
.setDdl("echo-ddl-1")
.setVgtid(vgtid)
.setRowEvent(RowEvent.newBuilder()
.setTableName("echo-table-1")
.build())
.build())
.addEvents(VEvent.newBuilder()
.setType(VEventType.forNumber(2))
.setTimestamp(4321)
.setGtid("echo-gtid-2")
.setDdl("echo-ddl-2")
.setVgtid(vgtid)
.setFieldEvent(FieldEvent.newBuilder()
.setTableName("echo-table-2")
.build())
.build())
.build();

Assert.assertEquals(expected, actual);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
import io.vitess.proto.Vtgate.StreamExecuteResponse;
import io.vitess.proto.Vtgate.StreamExecuteShardsRequest;
import io.vitess.proto.Vtgate.StreamExecuteShardsResponse;
import io.vitess.proto.Vtgate.VStreamRequest;
import io.vitess.proto.Vtgate.VStreamResponse;
import io.vitess.proto.Vtrpc.RPCError;
import io.vitess.proto.grpc.VitessGrpc;
import io.vitess.proto.grpc.VitessGrpc.VitessFutureStub;
Expand Down Expand Up @@ -289,6 +291,21 @@ public SQLException checkError(RPCError error) {
return errorHandler.checkVitessError(error);
}

@Override
public StreamIterator<Vtgate.VStreamResponse> getVStream(Context ctx,
Vtgate.VStreamRequest vstreamRequest) {
GrpcStreamAdapter<VStreamResponse, VStreamResponse> adapter =
new GrpcStreamAdapter<VStreamResponse, VStreamResponse>() {
@Override
VStreamResponse getResult(VStreamResponse response) {
return response;
}
};

getAsyncStub(ctx).vStream(vstreamRequest, adapter);
return adapter;
}

/**
* Converts an exception from the gRPC framework into the appropriate {@link SQLException}.
*/
Expand Down
32 changes: 18 additions & 14 deletions java/jdbc/src/main/java/io/vitess/jdbc/VitessVTGateManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
import java.lang.reflect.InvocationTargetException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -91,7 +93,8 @@ public VTGateConnections(final VitessConnection connection) {
new TimerTask() {
@Override
public void run() {
refreshUpdatedSSLConnections(hostInfo, connection);
refreshUpdatedSSLConnections(hostInfo,
connection);
}
},
TimeUnit.SECONDS.toMillis(connection.getRefreshSeconds()),
Expand Down Expand Up @@ -141,21 +144,25 @@ private static void updateVtGateConnHashMap(String identifier, VitessJDBCUrl.Hos

private static void refreshUpdatedSSLConnections(VitessJDBCUrl.HostInfo hostInfo,
VitessConnection connection) {
Set<VTGateConnection> closedConnections = new HashSet<>();
synchronized (VitessVTGateManager.class) {
int updatedCount = 0;
for (Map.Entry<String, VTGateConnection> entry : vtGateConnHashMap.entrySet()) {
if (entry.getValue() instanceof RefreshableVTGateConnection) {
RefreshableVTGateConnection existing = (RefreshableVTGateConnection) entry.getValue();
if (existing.checkKeystoreUpdates()) {
updatedCount++;
VTGateConnection old = vtGateConnHashMap
.replace(entry.getKey(), getVtGateConn(hostInfo, connection));
closeRefreshedConnection(old);
closedConnections.add(old);
}
}
}
if (updatedCount > 0) {
logger.info("refreshed " + updatedCount + " vtgate connections due to keystore update");
}

if (closedConnections.size() > 0) {
logger.info(
"refreshed " + closedConnections.size() + " vtgate connections due to keystore update");
for (VTGateConnection closedConnection : closedConnections) {
closeRefreshedConnection(closedConnection);
}
}
}
Expand Down Expand Up @@ -216,13 +223,11 @@ private static VTGateConnection getVtGateConn(VitessJDBCUrl.HostInfo hostInfo,
.trustStorePath(trustStorePath).trustStorePassword(trustStorePassword)
.trustAlias(trustAlias);

return new RefreshableVTGateConnection(
new GrpcClientFactory(channelProvider, errorHandler)
.createTls(context, hostInfo.toString(), tlsOptions), keyStorePath, trustStorePath);
return new RefreshableVTGateConnection(new GrpcClientFactory(channelProvider, errorHandler)
.createTls(context, hostInfo.toString(), tlsOptions), keyStorePath, trustStorePath);
} else {
return new VTGateConnection(
new GrpcClientFactory(channelProvider, errorHandler)
.create(context, hostInfo.toString()));
return new VTGateConnection(new GrpcClientFactory(channelProvider, errorHandler)
.create(context, hostInfo.toString()));
}
}

Expand All @@ -235,8 +240,7 @@ private static RetryingInterceptorConfig getRetryingInterceptorConfig(VitessConn
conn.getGrpcRetryMaxBackoffMillis(), conn.getGrpcRetryBackoffMultiplier());
}

private static ErrorHandler getErrorHandlerFromProperties(
VitessConnection connection) {
private static ErrorHandler getErrorHandlerFromProperties(VitessConnection connection) {
// Skip reflection in default case
if (Strings.isNullOrEmpty(connection.getErrorHandlerClass())) {
return new DefaultErrorHandler();
Expand Down