Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 73 additions & 23 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,15 @@ type Metrics struct {
// for metrics registration.
EngineMetrics EngineMetrics

// StatementCounters contains metrics for statements.
StatementCounters StatementCounters
// StartedStatementCounters contains metrics for statements initiated by
// users. These metrics count user-initiated operations, regardless of
// success (in particular, TxnCommitCount is the number of COMMIT statements
// attempted, not the number of transactions that successfully commit).
StartedStatementCounters StatementCounters

// ExecutedStatementCounters contains metrics for successfully executed
// statements.
ExecutedStatementCounters StatementCounters
}

// NewServer creates a new Server. Start() needs to be called before the Server
Expand Down Expand Up @@ -304,7 +311,8 @@ func makeMetrics(internal bool) Metrics {
TxnAbortCount: metric.NewCounter(getMetricMeta(MetaTxnAbort, internal)),
FailureCount: metric.NewCounter(getMetricMeta(MetaFailure, internal)),
},
StatementCounters: makeStatementCounters(internal),
StartedStatementCounters: makeStartedStatementCounters(internal),
ExecutedStatementCounters: makeExecutedStatementCounters(internal),
}
}

Expand Down Expand Up @@ -364,7 +372,7 @@ func (s *Server) GetExecutorConfig() *ExecutorConfig {
// and an error is returned if this validation fails.
// stmtBuf: The incoming statement for the new connExecutor.
// clientComm: The interface through which the new connExecutor is going to
// produce results for the client.
// produce results for the client.
// memMetrics: The metrics that statements executed on this connection will
// contribute to.
func (s *Server) SetupConn(
Expand Down Expand Up @@ -2118,10 +2126,7 @@ func (ex *connExecutor) sessionEventf(ctx context.Context, format string, args .
}

// StatementCounters groups metrics for counting different types of
// statements. These metrics count user-initiated operations,
// regardless of success (in particular, TxnCommitCount is the number
// of COMMIT statements attempted, not the number of transactions that
// successfully commit).
// statements.
type StatementCounters struct {
// QueryCount includes all statements and it is therefore the sum of
// all the below metrics.
Expand Down Expand Up @@ -2154,24 +2159,69 @@ type StatementCounters struct {
MiscCount telemetry.CounterWithMetric
}

func makeStatementCounters(internal bool) StatementCounters {
func makeStartedStatementCounters(internal bool) StatementCounters {
return StatementCounters{
TxnBeginCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaTxnBeginStarted, internal)),
TxnCommitCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaTxnCommitStarted, internal)),
TxnRollbackCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaTxnRollbackStarted, internal)),
SavepointCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaSavepointStarted, internal)),
RestartSavepointCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaRestartSavepointStarted, internal)),
ReleaseRestartSavepointCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaReleaseRestartSavepointStarted, internal)),
RollbackToRestartSavepointCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaRollbackToRestartSavepointStarted, internal)),
SelectCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaSelectStarted, internal)),
UpdateCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaUpdateStarted, internal)),
InsertCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaInsertStarted, internal)),
DeleteCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaDeleteStarted, internal)),
DdlCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaDdlStarted, internal)),
MiscCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaMiscStarted, internal)),
QueryCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaQueryStarted, internal)),
}
}

func makeExecutedStatementCounters(internal bool) StatementCounters {
return StatementCounters{
TxnBeginCount: telemetry.NewCounterWithMetric(getMetricMeta(MetaTxnBegin, internal)),
TxnCommitCount: telemetry.NewCounterWithMetric(getMetricMeta(MetaTxnCommit, internal)),
TxnRollbackCount: telemetry.NewCounterWithMetric(getMetricMeta(MetaTxnRollback, internal)),
SavepointCount: telemetry.NewCounterWithMetric(getMetricMeta(MetaSavepoint, internal)),
RestartSavepointCount: telemetry.NewCounterWithMetric(getMetricMeta(MetaRestartSavepoint, internal)),
TxnBeginCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaTxnBeginExecuted, internal)),
TxnCommitCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaTxnCommitExecuted, internal)),
TxnRollbackCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaTxnRollbackExecuted, internal)),
SavepointCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaSavepointExecuted, internal)),
RestartSavepointCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaRestartSavepointExecuted, internal)),
ReleaseRestartSavepointCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaReleaseRestartSavepoint, internal)),
getMetricMeta(MetaReleaseRestartSavepointExecuted, internal)),
RollbackToRestartSavepointCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaRollbackToRestartSavepoint, internal)),
SelectCount: telemetry.NewCounterWithMetric(getMetricMeta(MetaSelect, internal)),
UpdateCount: telemetry.NewCounterWithMetric(getMetricMeta(MetaUpdate, internal)),
InsertCount: telemetry.NewCounterWithMetric(getMetricMeta(MetaInsert, internal)),
DeleteCount: telemetry.NewCounterWithMetric(getMetricMeta(MetaDelete, internal)),
DdlCount: telemetry.NewCounterWithMetric(getMetricMeta(MetaDdl, internal)),
MiscCount: telemetry.NewCounterWithMetric(getMetricMeta(MetaMisc, internal)),
QueryCount: telemetry.NewCounterWithMetric(getMetricMeta(MetaQuery, internal)),
getMetricMeta(MetaRollbackToRestartSavepointExecuted, internal)),
SelectCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaSelectExecuted, internal)),
UpdateCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaUpdateExecuted, internal)),
InsertCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaInsertExecuted, internal)),
DeleteCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaDeleteExecuted, internal)),
DdlCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaDdlExecuted, internal)),
MiscCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaMiscExecuted, internal)),
QueryCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaQueryExecuted, internal)),
}
}

Expand Down
48 changes: 39 additions & 9 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,12 @@ func (ex *connExecutor) recordFailure() {
func (ex *connExecutor) execStmtInOpenState(
ctx context.Context, stmt Statement, pinfo *tree.PlaceholderInfo, res RestrictedCommandResult,
) (retEv fsm.Event, retPayload fsm.EventPayload, retErr error) {
ex.incrementStmtCounter(stmt)
ex.incrementStartedStmtCounter(stmt)
defer func() {
if retErr == nil && !payloadHasError(retPayload) {
ex.incrementExecutedStmtCounter(stmt)
}
}()
os := ex.machine.CurState().(stateOpen)

var timeoutTicker *time.Timer
Expand Down Expand Up @@ -947,10 +952,15 @@ func (ex *connExecutor) beginTransactionTimestampsAndReadMode(
// stateOpen, at each point its results will also be flushed.
func (ex *connExecutor) execStmtInNoTxnState(
ctx context.Context, stmt Statement,
) (fsm.Event, fsm.EventPayload) {
) (_ fsm.Event, payload fsm.EventPayload) {
switch s := stmt.AST.(type) {
case *tree.BeginTransaction:
ex.incrementStmtCounter(stmt)
ex.incrementStartedStmtCounter(stmt)
defer func() {
if !payloadHasError(payload) {
ex.incrementExecutedStmtCounter(stmt)
}
}()
pri, err := priorityToProto(s.Modes.UserPriority)
if err != nil {
return ex.makeErrEvent(err, s)
Expand Down Expand Up @@ -1096,17 +1106,22 @@ func (ex *connExecutor) execStmtInAbortedState(
// Everything but COMMIT/ROLLBACK causes errors. ROLLBACK is treated like COMMIT.
func (ex *connExecutor) execStmtInCommitWaitState(
stmt Statement, res RestrictedCommandResult,
) (fsm.Event, fsm.EventPayload) {
ex.incrementStmtCounter(stmt)
) (ev fsm.Event, payload fsm.EventPayload) {
ex.incrementStartedStmtCounter(stmt)
defer func() {
if !payloadHasError(payload) {
ex.incrementExecutedStmtCounter(stmt)
}
}()
switch stmt.AST.(type) {
case *tree.CommitTransaction, *tree.RollbackTransaction:
// Reply to a rollback with the COMMIT tag, by analogy to what we do when we
// get a COMMIT in state Aborted.
res.ResetStmtType((*tree.CommitTransaction)(nil))
return eventTxnFinish{}, eventTxnFinishPayload{commit: true}
default:
ev := eventNonRetriableErr{IsCommit: fsm.False}
payload := eventNonRetriableErrPayload{
ev = eventNonRetriableErr{IsCommit: fsm.False}
payload = eventNonRetriableErrPayload{
err: sqlbase.NewTransactionCommittedError(),
}
return ev, payload
Expand Down Expand Up @@ -1291,8 +1306,23 @@ func (ex *connExecutor) handleAutoCommit(
return ev, payload
}

func (ex *connExecutor) incrementStmtCounter(stmt Statement) {
ex.metrics.StatementCounters.incrementCount(ex, stmt.AST)
// incrementStartedStmtCounter increments the appropriate started
// statement counter for stmt's type.
func (ex *connExecutor) incrementStartedStmtCounter(stmt Statement) {
ex.metrics.StartedStatementCounters.incrementCount(ex, stmt.AST)
}

// incrementExecutedStmtCounter increments the appropriate executed
// statement counter for stmt's type.
func (ex *connExecutor) incrementExecutedStmtCounter(stmt Statement) {
ex.metrics.ExecutedStatementCounters.incrementCount(ex, stmt.AST)
}

// payloadHasError returns true if the passed payload implements
// payloadWithError.
func payloadHasError(payload fsm.EventPayload) bool {
_, hasErr := payload.(payloadWithError)
return hasErr
}

// validateSavepointName validates that it is that the provided ident
Expand Down
Loading