Skip to content

Commit

Permalink
DBZ-8053 Do not log all tablets
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Jul 12, 2024
1 parent ccf1a75 commit 8d46bfd
Showing 1 changed file with 26 additions and 18 deletions.
44 changes: 26 additions & 18 deletions src/main/java/io/debezium/connector/vitess/VitessMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ public static List<String> getShards(VitessConnectorConfig config) {
}

public static List<String> getTables(VitessConnectorConfig config) {
List<String> tables;
Vtgate.ExecuteResponse response;
String query;
if (config.excludeEmptyShards()) {
String query = String.format("SHOW TABLES", config.getKeyspace());
query = "SHOW TABLES";
List<String> shardsToQuery;
List<String> nonEmptyShards = getVitessShardsFromTablets(config);
// If there is a shard list specified, then query one of its non-empty shards
Expand All @@ -61,19 +62,27 @@ public static List<String> getTables(VitessConnectorConfig config) {
}
String randomNonEmptyShard = shardsToQuery.get(new Random().nextInt(shardsToQuery.size()));
LOGGER.info("Get table list from non-empty shard: {}", randomNonEmptyShard);
tables = flattenAndConcat(getRowsFromQuery(config, query, randomNonEmptyShard));
response = executeQuery(config, query, randomNonEmptyShard);
}
else {
String query = String.format("SHOW TABLES FROM %s", config.getKeyspace());
tables = flattenAndConcat(getRowsFromQuery(config, query));
query = String.format("SHOW TABLES FROM %s", config.getKeyspace());
response = executeQuery(config, query);
}
logResponse(response, query);
List<String> tables = getFlattenedRowsFromResponse(response);
LOGGER.info("All tables from keyspace {} are: {}", config.getKeyspace(), tables);
return tables;
}

private static void logResponse(Vtgate.ExecuteResponse response, String query) {
LOGGER.info("Got response: {} for query: {}", response, query);
}

private static List<String> getVitessShards(VitessConnectorConfig config) {
String query = String.format("SHOW VITESS_SHARDS LIKE '%s/%%'", config.getKeyspace());
List<String> rows = flattenAndConcat(getRowsFromQuery(config, query));
Vtgate.ExecuteResponse response = executeQuery(config, query);
logResponse(response, query);
List<String> rows = getFlattenedRowsFromResponse(response);
List<String> shards = rows.stream().map(fieldValue -> {
String[] parts = fieldValue.split("/");
assert parts != null && parts.length == 2 : String.format("Wrong field format: %s", fieldValue);
Expand All @@ -84,7 +93,9 @@ private static List<String> getVitessShards(VitessConnectorConfig config) {

private static List<String> getVitessShardsFromTablets(VitessConnectorConfig config) {
String query = "SHOW VITESS_TABLETS";
List<List<String>> rowValues = getRowsFromQuery(config, query);
Vtgate.ExecuteResponse response = executeQuery(config, query);
// Do not log the response since there is no way to filter tablets: it includes all tablets of all shards of all keyspaces
List<List<String>> rowValues = getRowsFromResponse(response);
List<String> shards = VitessMetadata.getNonEmptyShards(rowValues, config.getKeyspace());
return shards;
}
Expand All @@ -104,29 +115,26 @@ protected static Vtgate.ExecuteResponse executeQuery(VitessConnectorConfig confi
else {
response = connection.execute(query);
}
LOGGER.info("Got response: {} for query: {}", response, query);
return response;
}
catch (Exception e) {
throw new RuntimeException(String.format("Unexpected error while running query: %s", query), e);
}
}

private static List<List<String>> getRowsFromResponse(Vtgate.ExecuteResponse response) {
private static List<String> getFlattenedRowsFromResponse(Vtgate.ExecuteResponse response) {
validateResponse(response);
Query.QueryResult result = response.getResult();
validateResult(result);
return parseRows(result.getRowsList());
}

private static List<List<String>> getRowsFromQuery(VitessConnectorConfig config, String query) {
Vtgate.ExecuteResponse response = executeQuery(config, query);
return getRowsFromResponse(response);
List<List<String>> rows = parseRows(result.getRowsList());
return flattenAndConcat(rows);
}

private static List<List<String>> getRowsFromQuery(VitessConnectorConfig config, String query, String shard) {
Vtgate.ExecuteResponse response = executeQuery(config, query, shard);
return getRowsFromResponse(response);
private static List<List<String>> getRowsFromResponse(Vtgate.ExecuteResponse response) {
validateResponse(response);
Query.QueryResult result = response.getResult();
validateResult(result);
return parseRows(result.getRowsList());
}

private static void validateResponse(Vtgate.ExecuteResponse response) {
Expand Down

0 comments on commit 8d46bfd

Please sign in to comment.