Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: allow HeartbeatTxn and EndTxn requests to create txn records #33396

Merged
merged 5 commits into from
Jan 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions pkg/internal/client/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func newTestTxnFactory(
if pErr != nil {
return nil, pErr
}
var writing bool
status := roachpb.PENDING
for i, req := range ba.Requests {
args := req.GetInner()
Expand All @@ -111,13 +110,9 @@ func newTestTxnFactory(
union := &br.Responses[i] // avoid operating on copy
union.MustSetInner(&testPutRespCopy)
}
if roachpb.IsTransactionWrite(args) {
writing = true
}
}
if args, ok := ba.GetArg(roachpb.EndTransaction); ok {
et := args.(*roachpb.EndTransactionRequest)
writing = true
if et.Commit {
status = roachpb.COMMITTED
} else {
Expand All @@ -128,7 +123,6 @@ func newTestTxnFactory(
txnClone := ba.Txn.Clone()
br.Txn = &txnClone
if pErr == nil {
br.Txn.Writing = writing
br.Txn.Status = status
}
// Update the MockTxnSender's proto.
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,9 @@ func TestReverseScanWithSplitAndMerge(t *testing.T) {

func TestBadRequest(t *testing.T) {
defer leaktest.AfterTest(t)()
t.Skip("TODO(andreimatei): This last assertion in this test was broken by #33150. " +
"I suspect the reason is that there is no longer a single Range " +
"that spans [KeyMin, z), so we're not hitting the error.")
s, db := startNoSplitMergeServer(t)
defer s.Stopper().Stop(context.TODO())
ctx := context.TODO()
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
// 3) Another txn runs into the intents on r2, tries to push, and succeeds
// because the txn record doesn't exist yet. It writes the txn record as
// Aborted.
// TODO(nvanbenschoten): This test will change when #25437 is fully addressed.
// 4) If the Begin were to execute now, it'd discover the Aborted txn and return
// a retryable TxnAbortedError. But it doesn't execute now; it's still delayed
// somehow.
Expand All @@ -67,6 +68,7 @@ import (
// result of this convoluted scenario.
func TestDelayedBeginRetryable(t *testing.T) {
defer leaktest.AfterTest(t)()
t.Skip(`Flaky, will be removed when #25437 is addressed`)

// Here's how this test is gonna go:
// - We're going to send a BeginTxn+Put(a)+Put(c). The first two will be split
Expand Down Expand Up @@ -183,7 +185,7 @@ func TestDelayedBeginRetryable(t *testing.T) {
if _, ok := pErr.GetDetail().(*roachpb.HandledRetryableTxnError); !ok {
t.Fatalf("expected HandledRetryableTxnError, got: %v", pErr)
}
exp := "TransactionAbortedError(ABORT_REASON_ALREADY_COMMITTED_OR_ROLLED_BACK_POSSIBLE_REPLAY)"
exp := "TransactionAbortedError(ABORT_REASON_ABORT_SPAN)"
if !testutils.IsPError(pErr, regexp.QuoteMeta(exp)) {
t.Fatalf("expected %s, got: %s", exp, pErr)
}
Expand Down
70 changes: 5 additions & 65 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -677,7 +676,7 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
ctx := context.Background()
origTS := makeTS(123, 0)
plus10 := origTS.Add(10, 10)
plus20 := plus10.Add(10, 0)
plus20 := origTS.Add(20, 0)
testCases := []struct {
// The test's name.
name string
Expand Down Expand Up @@ -779,6 +778,10 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
if pErr == nil {
reply = ba.CreateReply()
reply.Txn = ba.Txn
} else if txn := pErr.GetTxn(); txn != nil {
// Update the manual clock to simulate an
// error updating a local hlc clock.
manual.Set(txn.Timestamp.WallTime)
}
return reply, pErr
}
Expand Down Expand Up @@ -890,67 +893,6 @@ func TestTxnMultipleCoord(t *testing.T) {
}
}

// TestTxnCoordSenderErrorWithIntent validates that if a transactional request
// returns an error but also indicates a Writing transaction, the coordinator
// tracks it just like a successful request.
//
// Note(andrei): This test was written at a time when the Writing status
// returned by the server mattered for the client. As of June 2018, that's no
// longer the case. The test doesn't hurt, though.
func TestTxnCoordSenderErrorWithIntent(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
manual := hlc.NewManualClock(123)
clock := hlc.NewClock(manual.UnixNano, 20*time.Nanosecond)

testCases := []struct {
roachpb.Error
errMsg string
}{
{*roachpb.NewError(roachpb.NewTransactionRetryError(roachpb.RETRY_REASON_UNKNOWN)), "retry txn"},
{
*roachpb.NewError(roachpb.NewTransactionPushError(roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{ID: uuid.MakeV4()}}),
), "failed to push",
},
{*roachpb.NewErrorf("testError"), "testError"},
}
for i, test := range testCases {
t.Run("", func(t *testing.T) {
var senderFn client.SenderFunc = func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
txn := ba.Txn.Clone()
txn.Writing = true
pErr := &roachpb.Error{}
*pErr = test.Error
pErr.SetTxn(&txn)
return nil, pErr
}
factory := NewTxnCoordSenderFactory(
TxnCoordSenderFactoryConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Clock: clock,
Stopper: stopper,
},
senderFn,
)

var ba roachpb.BatchRequest
key := roachpb.Key("test")
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: key}})
ba.Add(&roachpb.EndTransactionRequest{})
txn := roachpb.MakeTransaction("test", key, 0, clock.Now(), 0)
meta := roachpb.MakeTxnCoordMeta(txn)
tc := factory.TransactionalSender(client.RootTxn, meta)
ba.Txn = &txn
_, pErr := tc.Send(context.Background(), ba)
if !testutils.IsPError(pErr, test.errMsg) {
t.Errorf("%d: error did not match %s: %v", i, test.errMsg, pErr)
}
})
}
}

// TestTxnCoordSenderNoDuplicateIntents verifies that TxnCoordSender does not
// generate duplicate intents and that it merges intents for overlapping ranges.
func TestTxnCoordSenderNoDuplicateIntents(t *testing.T) {
Expand All @@ -973,7 +915,6 @@ func TestTxnCoordSenderNoDuplicateIntents(t *testing.T) {
br := ba.CreateReply()
txnClone := ba.Txn.Clone()
br.Txn = &txnClone
br.Txn.Writing = true
return br, nil
}
ambient := log.AmbientContext{Tracer: tracing.NewTracer()}
Expand Down Expand Up @@ -1333,7 +1274,6 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) {
if ba.Txn != nil && br.Txn == nil {
txnClone := ba.Txn.Clone()
br.Txn = &txnClone
br.Txn.Writing = true
br.Txn.Status = roachpb.PENDING
}
} else if et, hasET := ba.GetArg(roachpb.EndTransaction); hasET {
Expand Down
27 changes: 12 additions & 15 deletions pkg/kv/txn_interceptor_heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,27 +459,30 @@ func (h *txnHeartbeat) heartbeat(ctx context.Context) bool {
// then we ignore the error. This is possible if the heartbeat loop was
// started before a BeginTxn request succeeds because of ambiguity in the
// first write request's response.
//
// TODO(nvanbenschoten): Remove this in 2.3.
if tse, ok := pErr.GetDetail().(*roachpb.TransactionStatusError); ok &&
tse.Reason == roachpb.TransactionStatusError_REASON_TXN_NOT_FOUND {
return true
}

if pErr.GetTxn() != nil {
// It is not expected for a 2.1 node to return an error with a transaction
// in it. For one, heartbeats are not supposed to return
// TransactionAbortedErrors.
// TODO(andrei): Remove this in 2.2.
respTxn = pErr.GetTxn()
} else {
return true
}
respTxn = pErr.GetTxn()
} else {
respTxn = br.Responses[0].GetInner().(*roachpb.HeartbeatTxnResponse).Txn
}

// Update our txn. In particular, we need to make sure that the client will
// notice when the txn has been aborted (in which case we'll give them an
// error on their next request).
//
// TODO(nvanbenschoten): It's possible for a HeartbeatTxn request to observe
// the result of an EndTransaction request and beat it back to the client.
// This is an issue when a COMMITTED txn record is GCed and later re-written
// as ABORTED. The coordinator's local status could flip from PENDING to
// ABORTED (after heartbeat response) to COMMITTED (after commit response).
// This appears to be benign, but it's still somewhat disconcerting. If this
// ever causes any issues, we'll need to be smarter about detecting this race
// on the client and conditionally ignoring the result of heartbeat responses.
h.mu.txn.Update(respTxn)
if h.mu.txn.Status != roachpb.PENDING {
if h.mu.txn.Status == roachpb.ABORTED {
Expand All @@ -494,12 +497,6 @@ func (h *txnHeartbeat) heartbeat(ctx context.Context) bool {
// abortTxnAsyncLocked send an EndTransaction(commmit=false) asynchronously.
// The asyncAbortCallbackLocked callback is also called.
func (h *txnHeartbeat) abortTxnAsyncLocked(ctx context.Context) {
// Stop the heartbeat loop if it is still running.
if h.mu.txnEnd != nil {
close(h.mu.txnEnd)
h.mu.txnEnd = nil
}

if h.mu.txn.Status != roachpb.ABORTED {
log.Fatalf(ctx, "abortTxnAsyncLocked called for non-aborted txn: %s", h.mu.txn)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ func (*ScanRequest) flags() int { return isRead | isRange | isTxn | update
func (*ReverseScanRequest) flags() int {
return isRead | isRange | isReverse | isTxn | updatesReadTSCache | needsRefresh
}
func (*BeginTransactionRequest) flags() int { return isWrite | isTxn | consultsTSCache }
func (*BeginTransactionRequest) flags() int { return isWrite | isTxn }

// EndTransaction updates the write timestamp cache to prevent
// replays. Replays for the same transaction key and timestamp will
Expand Down
13 changes: 5 additions & 8 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ func (t *Transaction) BumpEpoch() {
func (t *Transaction) InclusiveTimeBounds() (hlc.Timestamp, hlc.Timestamp) {
min := t.OrigTimestamp
max := t.Timestamp
if t.Epoch != 0 {
if t.Epoch != 0 && t.EpochZeroTimestamp != (hlc.Timestamp{}) {
if min.Less(t.EpochZeroTimestamp) {
panic(fmt.Sprintf("orig timestamp %s less than epoch zero %s", min, t.EpochZeroTimestamp))
}
Expand Down Expand Up @@ -1153,20 +1153,17 @@ func PrepareTransactionForRetry(
// TODO(andrei): Should we preserve the ObservedTimestamps across the
// restart?
errTxnPri := txn.Priority
// The OrigTimestamp of the new transaction is going to be the greater of
// two the current clock and the timestamp received in the error.
// TODO(andrei): Can we just use the clock since it has already been
// advanced to at least the error's timestamp?
// Start the new transaction at the current time from the local clock.
// The local hlc should have been advanced to at least the error's
// timestamp already.
now := clock.Now()
newTxnTimestamp := now
newTxnTimestamp.Forward(txn.Timestamp)
txn = MakeTransaction(
txn.Name,
nil, // baseKey
// We have errTxnPri, but this wants a UserPriority. So we're going to
// overwrite the priority below.
NormalUserPriority,
newTxnTimestamp,
now,
clock.MaxOffset().Nanoseconds(),
)
// Use the priority communicated back by the server.
Expand Down
9 changes: 0 additions & 9 deletions pkg/roachpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,15 +408,6 @@ func NewTransactionStatusError(msg string) *TransactionStatusError {
}
}

// NewTransactionNotFoundStatusError initializes a new TransactionStatusError with
// a REASON_TXN_NOT_FOUND reason.
func NewTransactionNotFoundStatusError() *TransactionStatusError {
return &TransactionStatusError{
Msg: "txn record not found",
Reason: TransactionStatusError_REASON_TXN_NOT_FOUND,
}
}

// NewTransactionCommittedStatusError initializes a new TransactionStatusError
// with a REASON_TXN_COMMITTED.
func NewTransactionCommittedStatusError() *TransactionStatusError {
Expand Down
Loading