Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
052688a
feat: support partitioned queries + data boost in Connection API
olavloite Jul 24, 2023
9817586
fix: match the correct group in regex
olavloite Jul 24, 2023
dc91c9a
Merge branch 'main' into batch-read-connection-api
olavloite Jul 28, 2023
bca9b12
feat: add more SQL statements for partitioned queries
olavloite Jul 28, 2023
bc5b691
chore: refactor client side statements to accept the entire parsed st…
olavloite Jul 28, 2023
56dfb75
Merge branch 'parsed-statement-to-client-executors' into batch-read-c…
olavloite Jul 28, 2023
f0e9f5b
chore: simplify test
olavloite Jul 28, 2023
af211d3
Merge branch 'parsed-statement-to-client-executors' into batch-read-c…
olavloite Jul 28, 2023
090868c
chore: cleanup differences
olavloite Jul 28, 2023
31ba134
chore: cleanup unrelated changes
olavloite Jul 28, 2023
6960e11
fix: update converter name
olavloite Jul 28, 2023
549e870
test: add more tests
olavloite Jul 30, 2023
0b5269e
chore: add missing license header
olavloite Jul 30, 2023
d5248cc
fix: handle empty partitioned queries correctly
olavloite Jul 31, 2023
a461c39
fix: do not use any random staleness for partitioned queries
olavloite Jul 31, 2023
13541b1
fix: only return false for next() if all have finished
olavloite Aug 1, 2023
3ae95e3
chore: rename to autoPartitionMode
olavloite Aug 1, 2023
ed6a34e
chore: rename sql statements + add tests for empty results
olavloite Aug 1, 2023
1b39e8c
Merge branch 'main' into batch-read-connection-api
olavloite Aug 2, 2023
9e6dc6e
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Aug 2, 2023
89889ee
chore: address review comments
olavloite Aug 4, 2023
1658a51
Batch read connection api native adjustments (#2569)
burkedavison Aug 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.20.0')
implementation platform('com.google.cloud:libraries-bom:26.21.0')

implementation 'com.google.cloud:google-cloud-spanner'
```
Expand Down
56 changes: 56 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,62 @@
<method>boolean isDelayTransactionStartUntilFirstWrite()</method>
</difference>

<!-- Partitioned queries in Connection API -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>int getMaxPartitionedParallelism()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>int getMaxPartitions()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>boolean isAutoPartitionMode()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>boolean isDataBoostEnabled()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.ResultSet partitionQuery(com.google.cloud.spanner.Statement, com.google.cloud.spanner.PartitionOptions, com.google.cloud.spanner.Options$QueryOption[])</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.ResultSet runPartition(java.lang.String)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.connection.PartitionedQueryResultSet runPartitionedQuery(com.google.cloud.spanner.Statement, com.google.cloud.spanner.PartitionOptions, com.google.cloud.spanner.Options$QueryOption[])</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setAutoPartitionMode(boolean)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setDataBoostEnabled(boolean)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setMaxPartitionedParallelism(int)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setMaxPartitions(int)</method>
</difference>
<!-- (Internal change, use stream timeout) -->
<difference>
<differenceType>7012</differenceType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ private Builder(Statement statement) {
statement.queryOptions == null ? null : statement.queryOptions.toBuilder().build();
}

/** Replaces the current SQL of this builder with the given string. */
public Builder replace(String sql) {
sqlBuffer.replace(0, sqlBuffer.length(), sql);
return this;
}

/** Appends {@code sqlFragment} to the statement. */
public Builder append(String sqlFragment) {
sqlBuffer.append(checkNotNull(sqlFragment));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,22 @@
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.spanner.BatchReadOnlyTransaction;
import com.google.cloud.spanner.BatchTransactionId;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.Partition;
import com.google.cloud.spanner.PartitionOptions;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.ResultSets;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.Type.StructField;
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.StatementExecutor.StatementTimeout;
import com.google.common.base.Preconditions;
Expand All @@ -39,13 +48,15 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
Expand Down Expand Up @@ -157,6 +168,39 @@ public void rollbackToSavepoint(
"Rollback to savepoint is not supported for " + getUnitOfWorkName());
}

@Override
public ApiFuture<ResultSet> partitionQueryAsync(
CallType callType,
ParsedStatement query,
PartitionOptions partitionOptions,
QueryOption... options) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION,
"Partition query is not supported for " + getUnitOfWorkName());
}

ResultSet partitionQuery(
BatchReadOnlyTransaction transaction,
PartitionOptions partitionOptions,
ParsedStatement query,
QueryOption... options) {
final String partitionColumnName = "PARTITION";
BatchTransactionId transactionId = transaction.getBatchTransactionId();
List<Partition> partitions =
transaction.partitionQuery(partitionOptions, query.getStatement(), options);
return ResultSets.forRows(
com.google.cloud.spanner.Type.struct(
StructField.of(partitionColumnName, com.google.cloud.spanner.Type.string())),
partitions.stream()
.map(
partition ->
Struct.newBuilder()
.set(partitionColumnName)
.to(PartitionId.encodeToString(transactionId, partition))
.build())
.collect(Collectors.toList()));
}

StatementExecutor getStatementExecutor() {
return statementExecutor;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.connection;

import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.ClientSideStatementImpl.CompileException;
import java.lang.reflect.Method;
import java.util.regex.Matcher;

/** Executor for <code>PARTITION &lt;sql&gt;</code> statements. */
class ClientSideStatementPartitionExecutor implements ClientSideStatementExecutor {
private final ClientSideStatementImpl statement;
private final Method method;

ClientSideStatementPartitionExecutor(ClientSideStatementImpl statement) throws CompileException {
try {
this.statement = statement;
this.method =
ConnectionStatementExecutor.class.getDeclaredMethod(
statement.getMethodName(), Statement.class);
} catch (Exception e) {
throw new CompileException(e, statement);
}
}

@Override
public StatementResult execute(
ConnectionStatementExecutor connection, ParsedStatement parsedStatement) throws Exception {
String sql = getParameterValue(parsedStatement);
return (StatementResult)
method.invoke(connection, parsedStatement.getStatement().toBuilder().replace(sql).build());
}

String getParameterValue(ParsedStatement parsedStatement) {
Matcher matcher = statement.getPattern().matcher(parsedStatement.getSqlWithoutComments());
if (matcher.find() && matcher.groupCount() >= 2) {
String space = matcher.group(1);
String value = matcher.group(2);
return (space + value).trim();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion - StringBuilder would be a bit more optimal as compared to + operation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Java compiler automatically optimizes this internally, as it is not dynamic concatenation (e.g. in a loop or other hard-to-understand construct), so for these simple cases you should keep it as is. You will see that if you change this to using a StringBuilder, IntelliJ will even give you a warning.

}
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT,
String.format(
"Invalid argument for PARTITION: %s", parsedStatement.getStatement().getSql()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.connection;

import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Value;
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.ClientSideStatementImpl.CompileException;
import com.google.common.base.Strings;
import java.lang.reflect.Method;
import java.util.regex.Matcher;

/** Executor for <code>RUN PARTITION &lt;partition_id&gt;</code> statements. */
class ClientSideStatementRunPartitionExecutor implements ClientSideStatementExecutor {
private final ClientSideStatementImpl statement;
private final Method method;

ClientSideStatementRunPartitionExecutor(ClientSideStatementImpl statement)
throws CompileException {
try {
this.statement = statement;
this.method =
ConnectionStatementExecutor.class.getDeclaredMethod(
statement.getMethodName(), String.class);
} catch (Exception e) {
throw new CompileException(e, statement);
}
}

@Override
public StatementResult execute(
ConnectionStatementExecutor connection, ParsedStatement parsedStatement) throws Exception {
String partitionId = getParameterValue(parsedStatement);
if (partitionId == null) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT,
"No valid partition id found in statement: " + parsedStatement.getStatement().getSql());
}
return (StatementResult) method.invoke(connection, partitionId);
}

String getParameterValue(ParsedStatement parsedStatement) {
// The statement has the form `RUN PARTITION ['partition-id']`.
// The regex that is defined for this statement is (simplified) `run\s+partition(?:\s*'(.*)')?`
// This regex has one capturing group, which captures the partition-id inside the single quotes.
// That capturing group is however inside a non-capturing optional group.
// That means that:
// 1. If the matcher matches and returns one or more groups, we know that we have a partition-id
// in the SQL statement itself, as that is the only thing that can be in a capturing group.
// 2. If the matcher matches and returns zero groups, we know that the statement is valid, but
// that it does not contain a partition-id in the SQL statement. The partition-id must then
// be included in the statement as a query parameter.
Matcher matcher = statement.getPattern().matcher(parsedStatement.getSqlWithoutComments());
if (matcher.find() && matcher.groupCount() >= 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • It could be just me but a first time reader will have little difficulty in understanding what we are doing here. My understanding is we are parsing and obtaining the partition ID from the statement. In what cases will a statement have a groupCount >=1 ?
  • Should we beef up the documentation a bit by adding examples for future readers?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some commentary to explain what is going on with this regex.

String value = matcher.group(1);
if (!Strings.isNullOrEmpty(value)) {
return value;
}
}
if (parsedStatement.getStatement().getParameters().size() == 1) {
Value value = parsedStatement.getStatement().getParameters().values().iterator().next();
return value.getAsString();
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.connection;

import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.ClientSideStatementImpl.CompileException;
import java.lang.reflect.Method;
import java.util.regex.Matcher;

/** Executor for <code>RUN PARTITIONED QUERY &lt;sql&gt;</code> statements. */
class ClientSideStatementRunPartitionedQueryExecutor implements ClientSideStatementExecutor {
private final ClientSideStatementImpl statement;
private final Method method;

ClientSideStatementRunPartitionedQueryExecutor(ClientSideStatementImpl statement)
throws CompileException {
try {
this.statement = statement;
this.method =
ConnectionStatementExecutor.class.getDeclaredMethod(
statement.getMethodName(), Statement.class);
} catch (Exception e) {
throw new CompileException(e, statement);
}
}

@Override
public StatementResult execute(
ConnectionStatementExecutor connection, ParsedStatement parsedStatement) throws Exception {
String sql = getParameterValue(parsedStatement);
return (StatementResult)
method.invoke(connection, parsedStatement.getStatement().toBuilder().replace(sql).build());
}

String getParameterValue(ParsedStatement parsedStatement) {
Matcher matcher = statement.getPattern().matcher(parsedStatement.getSqlWithoutComments());
if (matcher.find() && matcher.groupCount() >= 2) {
// Include the spacing group in case the query is enclosed in parentheses like this:
// `run partitioned query(select * from foo)`
String space = matcher.group(1);
String value = matcher.group(2);
return (space + value).trim();
}
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT,
String.format(
"Invalid argument for RUN PARTITIONED QUERY: %s",
parsedStatement.getStatement().getSql()));
}
}
Loading