From bd00a26afd75b84160735ad848a7d48e3cc8a660 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 11 Jan 2019 14:29:20 -0500 Subject: [PATCH] kv: provide option to request eager txn record creation Informs #33656. I'm not aware of any places where this is necessary for correctness, but it is useful to prevent test flakes until tests are updated to expect lazy transaction record creation. Release note: None --- pkg/internal/client/sender.go | 10 ++++++++++ pkg/internal/client/txn.go | 12 ++++++++++++ pkg/kv/txn_coord_sender.go | 11 +++++++++++ pkg/kv/txn_interceptor_heartbeat.go | 6 +++++- pkg/storage/replica_command.go | 8 ++++++++ 5 files changed, 46 insertions(+), 1 deletion(-) diff --git a/pkg/internal/client/sender.go b/pkg/internal/client/sender.go index 1c9ba4cb2d7c..d3947ff96ba3 100644 --- a/pkg/internal/client/sender.go +++ b/pkg/internal/client/sender.go @@ -170,6 +170,13 @@ type TxnSender interface { // like the transaction that merges ranges together. DisablePipelining() error + // EagerRecord instructs the transaction write its transaction record as soon as + // possible, instead of waiting for the transaction's first heartbeat or for the + // end of the transaction to write it. + // + // TODO(nvanbenschoten): Fix up flaky tests to allow us to get rid of this. + EagerRecord() error + // OrigTimestamp returns the transaction's starting timestamp. // Note a transaction can be internally pushed forward in time before // committing so this is not guaranteed to be the commit timestamp. @@ -361,6 +368,9 @@ func (m *MockTransactionalSender) UpdateStateOnRemoteRetryableErr( // DisablePipelining is part of the client.TxnSender interface. func (m *MockTransactionalSender) DisablePipelining() error { return nil } +// EagerRecord is part of the client.TxnSender interface. +func (m *MockTransactionalSender) EagerRecord() error { return nil } + // MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders. type MockTxnSenderFactory struct { senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) ( diff --git a/pkg/internal/client/txn.go b/pkg/internal/client/txn.go index bc0e10d778af..2adff023b97a 100644 --- a/pkg/internal/client/txn.go +++ b/pkg/internal/client/txn.go @@ -292,6 +292,18 @@ func (txn *Txn) DisablePipelining() error { return txn.mu.sender.DisablePipelining() } +// EagerRecord instructs the transaction write its transaction record as soon as +// possible, instead of waiting for the transaction's first heartbeat or for the +// end of the transaction to write it. +// +// EagerRecord must be called before any operations are performed on the +// transaction. +func (txn *Txn) EagerRecord() error { + txn.mu.Lock() + defer txn.mu.Unlock() + return txn.mu.sender.EagerRecord() +} + // NewBatch creates and returns a new empty batch object for use with the Txn. func (txn *Txn) NewBatch() *Batch { return &Batch{txn: txn} diff --git a/pkg/kv/txn_coord_sender.go b/pkg/kv/txn_coord_sender.go index cb5f8cc9e0b2..b1ad7d798ca9 100644 --- a/pkg/kv/txn_coord_sender.go +++ b/pkg/kv/txn_coord_sender.go @@ -544,6 +544,17 @@ func (tc *TxnCoordSender) DisablePipelining() error { return nil } +// EagerRecord is part of the client.TxnSender interface. +func (tc *TxnCoordSender) EagerRecord() error { + tc.mu.Lock() + defer tc.mu.Unlock() + if tc.mu.active { + return errors.Errorf("cannot request an eager transaction record write on a running transaction") + } + tc.interceptorAlloc.txnHeartbeat.eagerRecord = true + return nil +} + // commitReadOnlyTxnLocked "commits" a read-only txn. It is equivalent, but // cheaper than, sending an EndTransactionRequest. A read-only txn doesn't have // a transaction record, so there's no need to send any request to the server. diff --git a/pkg/kv/txn_interceptor_heartbeat.go b/pkg/kv/txn_interceptor_heartbeat.go index 2b133cc3a1cb..d676dcc77e88 100644 --- a/pkg/kv/txn_interceptor_heartbeat.go +++ b/pkg/kv/txn_interceptor_heartbeat.go @@ -65,6 +65,10 @@ type txnHeartbeat struct { // is to notify the TxnCoordSender to shut itself down. asyncAbortCallbackLocked func(context.Context) + // When set to true, the transaction will always send a BeginTxn request to + // lay down a transaction record as early as possible. + eagerRecord bool + // mu contains state protected by the TxnCoordSender's mutex. mu struct { sync.Locker @@ -178,7 +182,7 @@ func (h *txnHeartbeat) SendLocked( ba.Txn.Key = anchor } - if !h.st.Version.IsActive(cluster.VersionLazyTxnRecord) { + if h.eagerRecord || !h.st.Version.IsActive(cluster.VersionLazyTxnRecord) { addedBeginTxn = true // Set the key in the begin transaction request to the txn's anchor key. diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 78d7e42f5157..d7b9c8619a5d 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -502,6 +502,14 @@ func (r *Replica) AdminMerge( // TODO(benesch): expose a proper API for preventing the fast path. _ = txn.CommitTimestamp() + // Multiple merge tests rely on a transaction record being written as + // early as possible in the lifecycle of a transaction. They aren't + // prepared for an untimely lease transfer to cause a restart. + // TODO(nvanbenschoten): Remove this once the tests are fixed. + if err := txn.EagerRecord(); err != nil { + return err + } + // Pipelining might send QueryIntent requests to the RHS after the RHS has // noticed the merge and started blocking all traffic. This causes the merge // transaction to deadlock. Just turn pipelining off; the structure of the