Skip to content

Commit

Permalink
Merge #33674
Browse files Browse the repository at this point in the history
33674: kv: provide option to request eager txn record creation r=nvanbenschoten a=nvanbenschoten

Informs #33656.

I'm not aware of any places where this is necessary for correctness, but it is useful to prevent test flakes until tests are updated to expect lazy transaction record creation.

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Jan 11, 2019
2 parents a2627b8 + bd00a26 commit 8138f8b
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 1 deletion.
10 changes: 10 additions & 0 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ type TxnSender interface {
// like the transaction that merges ranges together.
DisablePipelining() error

// EagerRecord instructs the transaction write its transaction record as soon as
// possible, instead of waiting for the transaction's first heartbeat or for the
// end of the transaction to write it.
//
// TODO(nvanbenschoten): Fix up flaky tests to allow us to get rid of this.
EagerRecord() error

// OrigTimestamp returns the transaction's starting timestamp.
// Note a transaction can be internally pushed forward in time before
// committing so this is not guaranteed to be the commit timestamp.
Expand Down Expand Up @@ -361,6 +368,9 @@ func (m *MockTransactionalSender) UpdateStateOnRemoteRetryableErr(
// DisablePipelining is part of the client.TxnSender interface.
func (m *MockTransactionalSender) DisablePipelining() error { return nil }

// EagerRecord is part of the client.TxnSender interface.
func (m *MockTransactionalSender) EagerRecord() error { return nil }

// MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders.
type MockTxnSenderFactory struct {
senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) (
Expand Down
12 changes: 12 additions & 0 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,18 @@ func (txn *Txn) DisablePipelining() error {
return txn.mu.sender.DisablePipelining()
}

// EagerRecord instructs the transaction write its transaction record as soon as
// possible, instead of waiting for the transaction's first heartbeat or for the
// end of the transaction to write it.
//
// EagerRecord must be called before any operations are performed on the
// transaction.
func (txn *Txn) EagerRecord() error {
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.sender.EagerRecord()
}

// NewBatch creates and returns a new empty batch object for use with the Txn.
func (txn *Txn) NewBatch() *Batch {
return &Batch{txn: txn}
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,17 @@ func (tc *TxnCoordSender) DisablePipelining() error {
return nil
}

// EagerRecord is part of the client.TxnSender interface.
func (tc *TxnCoordSender) EagerRecord() error {
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.mu.active {
return errors.Errorf("cannot request an eager transaction record write on a running transaction")
}
tc.interceptorAlloc.txnHeartbeat.eagerRecord = true
return nil
}

// commitReadOnlyTxnLocked "commits" a read-only txn. It is equivalent, but
// cheaper than, sending an EndTransactionRequest. A read-only txn doesn't have
// a transaction record, so there's no need to send any request to the server.
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/txn_interceptor_heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ type txnHeartbeat struct {
// is to notify the TxnCoordSender to shut itself down.
asyncAbortCallbackLocked func(context.Context)

// When set to true, the transaction will always send a BeginTxn request to
// lay down a transaction record as early as possible.
eagerRecord bool

// mu contains state protected by the TxnCoordSender's mutex.
mu struct {
sync.Locker
Expand Down Expand Up @@ -178,7 +182,7 @@ func (h *txnHeartbeat) SendLocked(
ba.Txn.Key = anchor
}

if !h.st.Version.IsActive(cluster.VersionLazyTxnRecord) {
if h.eagerRecord || !h.st.Version.IsActive(cluster.VersionLazyTxnRecord) {
addedBeginTxn = true

// Set the key in the begin transaction request to the txn's anchor key.
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,14 @@ func (r *Replica) AdminMerge(
// TODO(benesch): expose a proper API for preventing the fast path.
_ = txn.CommitTimestamp()

// Multiple merge tests rely on a transaction record being written as
// early as possible in the lifecycle of a transaction. They aren't
// prepared for an untimely lease transfer to cause a restart.
// TODO(nvanbenschoten): Remove this once the tests are fixed.
if err := txn.EagerRecord(); err != nil {
return err
}

// Pipelining might send QueryIntent requests to the RHS after the RHS has
// noticed the merge and started blocking all traffic. This causes the merge
// transaction to deadlock. Just turn pipelining off; the structure of the
Expand Down

0 comments on commit 8138f8b

Please sign in to comment.