Skip to content

Commit 27d7edc

Browse files
committed
chore: store temp TransactionOptions in connection state
Store temporary TransactionOptions in the connection state as local options. Local options only apply to the current transaction. This simplifies the internal state handling of the driver, as all transaction state should only be read from the connection state, and not also from a temporary variable. This also enables the use of a combination of temporary transaction options and using SQL statements to set further options. The shared library always includes temporary transaction options, as the BeginTransaction function accepts TransactionOptions as an input argument. This meant that using SQL statements to set further transaction options was not supported through the shared library.
1 parent 13bda8d commit 27d7edc

File tree

6 files changed

+311
-56
lines changed

6 files changed

+311
-56
lines changed

conn.go

Lines changed: 94 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -275,14 +275,9 @@ type conn struct {
275275
// tempExecOptions can be set by passing it in as an argument to ExecContext or QueryContext
276276
// and are applied only to that statement.
277277
tempExecOptions *ExecOptions
278-
// tempTransactionOptions are temporarily set right before a read/write transaction is started.
279-
tempTransactionOptions *ReadWriteTransactionOptions
280-
// tempReadOnlyTransactionOptions are temporarily set right before a read-only
281-
// transaction is started on a Spanner connection.
282-
tempReadOnlyTransactionOptions *ReadOnlyTransactionOptions
283-
// tempBatchReadOnlyTransactionOptions are temporarily set right before a
284-
// batch read-only transaction is started on a Spanner connection.
285-
tempBatchReadOnlyTransactionOptions *BatchReadOnlyTransactionOptions
278+
// tempTransactionCloseFunc is set right before a transaction is started, and is set as the
279+
// close function for that transaction.
280+
tempTransactionCloseFunc func()
286281
}
287282

288283
func (c *conn) UnderlyingClient() (*spanner.Client, error) {
@@ -1011,8 +1006,10 @@ func (c *conn) options(reset bool) *ExecOptions {
10111006
TransactionTag: c.TransactionTag(),
10121007
IsolationLevel: toProtoIsolationLevelOrDefault(c.IsolationLevel()),
10131008
ReadLockMode: c.ReadLockMode(),
1009+
CommitPriority: propertyCommitPriority.GetValueOrDefault(c.state),
10141010
CommitOptions: spanner.CommitOptions{
1015-
MaxCommitDelay: c.maxCommitDelayPointer(),
1011+
MaxCommitDelay: c.maxCommitDelayPointer(),
1012+
ReturnCommitStats: propertyReturnCommitStats.GetValueOrDefault(c.state),
10161013
},
10171014
},
10181015
PartitionedQueryOptions: PartitionedQueryOptions{},
@@ -1045,16 +1042,43 @@ func (c *conn) resetTransactionForRetry(ctx context.Context, errDuringCommit boo
10451042
}
10461043

10471044
func (c *conn) withTempTransactionOptions(options *ReadWriteTransactionOptions) {
1048-
c.tempTransactionOptions = options
1045+
if options == nil {
1046+
return
1047+
}
1048+
c.tempTransactionCloseFunc = options.close
1049+
// Start a transaction for the connection state, so we can set the transaction options
1050+
// as local options in the current transaction.
1051+
_ = c.state.Begin()
1052+
if options.DisableInternalRetries {
1053+
_ = propertyRetryAbortsInternally.SetLocalValue(c.state, !options.DisableInternalRetries)
1054+
}
1055+
if options.TransactionOptions.BeginTransactionOption != spanner.DefaultBeginTransaction {
1056+
_ = propertyBeginTransactionOption.SetLocalValue(c.state, options.TransactionOptions.BeginTransactionOption)
1057+
}
1058+
if options.TransactionOptions.CommitOptions.MaxCommitDelay != nil {
1059+
_ = propertyMaxCommitDelay.SetLocalValue(c.state, *options.TransactionOptions.CommitOptions.MaxCommitDelay)
1060+
}
1061+
if options.TransactionOptions.CommitOptions.ReturnCommitStats {
1062+
_ = propertyReturnCommitStats.SetLocalValue(c.state, options.TransactionOptions.CommitOptions.ReturnCommitStats)
1063+
}
1064+
if options.TransactionOptions.TransactionTag != "" {
1065+
_ = propertyTransactionTag.SetLocalValue(c.state, options.TransactionOptions.TransactionTag)
1066+
}
1067+
if options.TransactionOptions.ReadLockMode != spannerpb.TransactionOptions_ReadWrite_READ_LOCK_MODE_UNSPECIFIED {
1068+
_ = propertyReadLockMode.SetLocalValue(c.state, options.TransactionOptions.ReadLockMode)
1069+
}
1070+
if options.TransactionOptions.IsolationLevel != spannerpb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED {
1071+
_ = propertyIsolationLevel.SetLocalValue(c.state, toSqlIsolationLevelOrDefault(options.TransactionOptions.IsolationLevel))
1072+
}
1073+
if options.TransactionOptions.ExcludeTxnFromChangeStreams {
1074+
_ = propertyExcludeTxnFromChangeStreams.SetLocalValue(c.state, options.TransactionOptions.ExcludeTxnFromChangeStreams)
1075+
}
1076+
if options.TransactionOptions.CommitPriority != spannerpb.RequestOptions_PRIORITY_UNSPECIFIED {
1077+
_ = propertyCommitPriority.SetLocalValue(c.state, options.TransactionOptions.CommitPriority)
1078+
}
10491079
}
10501080

10511081
func (c *conn) getTransactionOptions(execOptions *ExecOptions) ReadWriteTransactionOptions {
1052-
if c.tempTransactionOptions != nil {
1053-
defer func() { c.tempTransactionOptions = nil }()
1054-
opts := *c.tempTransactionOptions
1055-
opts.TransactionOptions.BeginTransactionOption = c.convertDefaultBeginTransactionOption(opts.TransactionOptions.BeginTransactionOption)
1056-
return opts
1057-
}
10581082
txOpts := ReadWriteTransactionOptions{
10591083
TransactionOptions: execOptions.TransactionOptions,
10601084
DisableInternalRetries: !c.RetryAbortsInternally(),
@@ -1075,28 +1099,39 @@ func (c *conn) getTransactionOptions(execOptions *ExecOptions) ReadWriteTransact
10751099
}
10761100

10771101
func (c *conn) withTempReadOnlyTransactionOptions(options *ReadOnlyTransactionOptions) {
1078-
c.tempReadOnlyTransactionOptions = options
1102+
if options == nil {
1103+
return
1104+
}
1105+
c.tempTransactionCloseFunc = options.close
1106+
// Start a transaction for the connection state, so we can set the transaction options
1107+
// as local options in the current transaction.
1108+
_ = c.state.Begin()
1109+
if options.BeginTransactionOption != spanner.DefaultBeginTransaction {
1110+
_ = propertyBeginTransactionOption.SetLocalValue(c.state, options.BeginTransactionOption)
1111+
}
1112+
if options.TimestampBound.String() != "(strong)" {
1113+
_ = propertyReadOnlyStaleness.SetLocalValue(c.state, options.TimestampBound)
1114+
}
10791115
}
10801116

10811117
func (c *conn) getReadOnlyTransactionOptions() ReadOnlyTransactionOptions {
1082-
if c.tempReadOnlyTransactionOptions != nil {
1083-
defer func() { c.tempReadOnlyTransactionOptions = nil }()
1084-
opts := *c.tempReadOnlyTransactionOptions
1085-
opts.BeginTransactionOption = c.convertDefaultBeginTransactionOption(opts.BeginTransactionOption)
1086-
return opts
1087-
}
10881118
return ReadOnlyTransactionOptions{TimestampBound: c.ReadOnlyStaleness(), BeginTransactionOption: c.convertDefaultBeginTransactionOption(propertyBeginTransactionOption.GetValueOrDefault(c.state))}
10891119
}
10901120

10911121
func (c *conn) withTempBatchReadOnlyTransactionOptions(options *BatchReadOnlyTransactionOptions) {
1092-
c.tempBatchReadOnlyTransactionOptions = options
1122+
if options == nil {
1123+
return
1124+
}
1125+
c.tempTransactionCloseFunc = options.close
1126+
// Start a transaction for the connection state, so we can set the transaction options
1127+
// as local options in the current transaction.
1128+
_ = c.state.Begin()
1129+
if options.TimestampBound.String() != "(strong)" {
1130+
_ = propertyReadOnlyStaleness.SetLocalValue(c.state, options.TimestampBound)
1131+
}
10931132
}
10941133

10951134
func (c *conn) getBatchReadOnlyTransactionOptions() BatchReadOnlyTransactionOptions {
1096-
if c.tempBatchReadOnlyTransactionOptions != nil {
1097-
defer func() { c.tempBatchReadOnlyTransactionOptions = nil }()
1098-
return *c.tempBatchReadOnlyTransactionOptions
1099-
}
11001135
return BatchReadOnlyTransactionOptions{TimestampBound: c.ReadOnlyStaleness()}
11011136
}
11021137

@@ -1108,7 +1143,6 @@ func (c *conn) BeginReadOnlyTransaction(ctx context.Context, options *ReadOnlyTr
11081143
c.withTempReadOnlyTransactionOptions(options)
11091144
tx, err := c.BeginTx(ctx, driver.TxOptions{ReadOnly: true})
11101145
if err != nil {
1111-
c.withTempReadOnlyTransactionOptions(nil)
11121146
return nil, err
11131147
}
11141148
return tx, nil
@@ -1122,7 +1156,6 @@ func (c *conn) BeginReadWriteTransaction(ctx context.Context, options *ReadWrite
11221156
c.withTempTransactionOptions(options)
11231157
tx, err := c.BeginTx(ctx, driver.TxOptions{})
11241158
if err != nil {
1125-
c.withTempTransactionOptions(nil)
11261159
return nil, err
11271160
}
11281161
return tx, nil
@@ -1133,6 +1166,13 @@ func (c *conn) Begin() (driver.Tx, error) {
11331166
}
11341167

11351168
func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver.Tx, error) {
1169+
defer func() {
1170+
c.tempTransactionCloseFunc = nil
1171+
}()
1172+
return c.beginTx(ctx, driverOpts, c.tempTransactionCloseFunc)
1173+
}
1174+
1175+
func (c *conn) beginTx(ctx context.Context, driverOpts driver.TxOptions, closeFunc func()) (driver.Tx, error) {
11361176
if c.resetForRetry {
11371177
c.resetForRetry = false
11381178
return c.tx, nil
@@ -1141,6 +1181,10 @@ func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver
11411181
defer func() {
11421182
if c.tx != nil {
11431183
_ = c.state.Begin()
1184+
} else {
1185+
// Rollback in case the connection state transaction was started before this function
1186+
// was called, for example if the caller set temporary transaction options.
1187+
_ = c.state.Rollback()
11441188
}
11451189
}()
11461190

@@ -1180,6 +1224,9 @@ func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver
11801224
if batchReadOnly && !driverOpts.ReadOnly {
11811225
return nil, status.Error(codes.InvalidArgument, "levelBatchReadOnly can only be used for read-only transactions")
11821226
}
1227+
if closeFunc == nil {
1228+
closeFunc = func() {}
1229+
}
11831230

11841231
if driverOpts.ReadOnly {
11851232
var logger *slog.Logger
@@ -1188,49 +1235,47 @@ func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver
11881235
if batchReadOnly {
11891236
logger = c.logger.With("tx", "batchro")
11901237
var err error
1238+
// BatchReadOnly transactions (currently) do not support inline-begin.
1239+
// This means that the transaction options must be supplied here, and not through a callback.
11911240
bo, err = c.client.BatchReadOnlyTransaction(ctx, batchReadOnlyTxOpts.TimestampBound)
11921241
if err != nil {
11931242
return nil, err
11941243
}
11951244
ro = &bo.ReadOnlyTransaction
11961245
} else {
11971246
logger = c.logger.With("tx", "ro")
1198-
ro = c.client.ReadOnlyTransaction().WithBeginTransactionOption(readOnlyTxOpts.BeginTransactionOption).WithTimestampBound(readOnlyTxOpts.TimestampBound)
1247+
ro = c.client.ReadOnlyTransaction().WithBeginTransactionOption(readOnlyTxOpts.BeginTransactionOption)
11991248
}
12001249
c.tx = &readOnlyTransaction{
12011250
roTx: ro,
12021251
boTx: bo,
12031252
logger: logger,
12041253
close: func(result txResult) {
1205-
if batchReadOnlyTxOpts.close != nil {
1206-
batchReadOnlyTxOpts.close()
1207-
}
1208-
if readOnlyTxOpts.close != nil {
1209-
readOnlyTxOpts.close()
1210-
}
1254+
closeFunc()
12111255
if result == txResultCommit {
12121256
_ = c.state.Commit()
12131257
} else {
12141258
_ = c.state.Rollback()
12151259
}
12161260
c.tx = nil
12171261
},
1262+
timestampBoundCallback: func() spanner.TimestampBound {
1263+
return propertyReadOnlyStaleness.GetValueOrDefault(c.state)
1264+
},
12181265
}
12191266
return c.tx, nil
12201267
}
12211268

1269+
// These options are only used to determine how to start the transaction.
1270+
// All other options are fetched in a callback that is called when the transaction is actually started.
1271+
// That callback reads all transaction options from the connection state at that moment. This allows
1272+
// applications to execute a series of statement like this:
1273+
// BEGIN TRANSACTION;
1274+
// SET LOCAL transaction_tag='my_tag';
1275+
// SET LOCAL commit_priority=LOW;
1276+
// INSERT INTO my_table ... -- This starts the transaction with the options above included.
12221277
opts := spanner.TransactionOptions{}
1223-
if c.tempTransactionOptions != nil {
1224-
opts = c.tempTransactionOptions.TransactionOptions
1225-
}
1226-
opts.BeginTransactionOption = c.convertDefaultBeginTransactionOption(opts.BeginTransactionOption)
1227-
tempCloseFunc := func() {}
1228-
if c.tempTransactionOptions != nil && c.tempTransactionOptions.close != nil {
1229-
tempCloseFunc = c.tempTransactionOptions.close
1230-
}
1231-
if !disableRetryAborts && c.tempTransactionOptions != nil {
1232-
disableRetryAborts = c.tempTransactionOptions.DisableInternalRetries
1233-
}
1278+
opts.BeginTransactionOption = c.convertDefaultBeginTransactionOption(propertyBeginTransactionOption.GetValueOrDefault(c.state))
12341279

12351280
tx, err := spanner.NewReadWriteStmtBasedTransactionWithCallbackForOptions(ctx, c.client, opts, func() spanner.TransactionOptions {
12361281
defer func() {
@@ -1249,7 +1294,7 @@ func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver
12491294
logger: logger,
12501295
rwTx: tx,
12511296
close: func(result txResult, commitResponse *spanner.CommitResponse, commitErr error) {
1252-
tempCloseFunc()
1297+
closeFunc()
12531298
c.prevTx = c.tx
12541299
c.tx = nil
12551300
if commitErr == nil {

connection_properties.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,27 @@ var propertyMaxCommitDelay = createConnectionProperty(
257257
connectionstate.ContextUser,
258258
connectionstate.ConvertDuration,
259259
)
260+
var propertyCommitPriority = createConnectionProperty(
261+
"commit_priority",
262+
"Sets the priority for commit RPC invocations from this connection (HIGH/MEDIUM/LOW/UNSPECIFIED). "+
263+
"The default is UNSPECIFIED.",
264+
spannerpb.RequestOptions_PRIORITY_UNSPECIFIED,
265+
false,
266+
nil,
267+
connectionstate.ContextUser,
268+
func(value string) (spannerpb.RequestOptions_Priority, error) {
269+
return parseRpcPriority(value)
270+
},
271+
)
272+
var propertyReturnCommitStats = createConnectionProperty(
273+
"return_commit_stats",
274+
"return_commit_stats determines whether transactions should request Spanner to return commit statistics.",
275+
false,
276+
false,
277+
nil,
278+
connectionstate.ContextUser,
279+
connectionstate.ConvertBool,
280+
)
260281

261282
// ------------------------------------------------------------------------------------------------
262283
// Statement connection properties.

driver.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1148,7 +1148,6 @@ func BeginReadWriteTransaction(ctx context.Context, db *sql.DB, options ReadWrit
11481148
}
11491149
tx, err := conn.BeginTx(ctx, &sql.TxOptions{})
11501150
if err != nil {
1151-
clearTempReadWriteTransactionOptions(conn)
11521151
return nil, err
11531152
}
11541153
return tx, nil
@@ -1166,11 +1165,6 @@ func withTempReadWriteTransactionOptions(conn *sql.Conn, options *ReadWriteTrans
11661165
})
11671166
}
11681167

1169-
func clearTempReadWriteTransactionOptions(conn *sql.Conn) {
1170-
_ = withTempReadWriteTransactionOptions(conn, nil)
1171-
_ = conn.Close()
1172-
}
1173-
11741168
// ReadOnlyTransactionOptions can be used to create a read-only transaction
11751169
// on a Spanner connection.
11761170
type ReadOnlyTransactionOptions struct {
@@ -1529,6 +1523,24 @@ func toProtoIsolationLevelOrDefault(level sql.IsolationLevel) spannerpb.Transact
15291523
return res
15301524
}
15311525

1526+
func toSqlIsolationLevel(level spannerpb.TransactionOptions_IsolationLevel) (sql.IsolationLevel, error) {
1527+
switch level {
1528+
case spannerpb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED:
1529+
return sql.LevelDefault, nil
1530+
case spannerpb.TransactionOptions_SERIALIZABLE:
1531+
return sql.LevelSerializable, nil
1532+
case spannerpb.TransactionOptions_REPEATABLE_READ:
1533+
return sql.LevelRepeatableRead, nil
1534+
default:
1535+
}
1536+
return sql.LevelDefault, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "invalid or unsupported isolation level: %v", level))
1537+
}
1538+
1539+
func toSqlIsolationLevelOrDefault(level spannerpb.TransactionOptions_IsolationLevel) sql.IsolationLevel {
1540+
res, _ := toSqlIsolationLevel(level)
1541+
return res
1542+
}
1543+
15321544
type spannerIsolationLevel sql.IsolationLevel
15331545

15341546
const (

driver_with_mockserver_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5076,7 +5076,7 @@ func TestBeginReadWriteTransaction(t *testing.T) {
50765076
t.Fatalf("missing transaction for ExecuteSqlRequest")
50775077
}
50785078
if req.Transaction.GetId() == nil {
5079-
t.Fatalf("missing begin selector for ExecuteSqlRequest")
5079+
t.Fatalf("missing ID selector for ExecuteSqlRequest")
50805080
}
50815081
if g, w := req.RequestOptions.TransactionTag, tag; g != w {
50825082
t.Fatalf("transaction tag mismatch\n Got: %v\nWant: %v", g, w)

0 commit comments

Comments
 (0)