diff --git a/internal/component/database_observability/mysql/collector/query_samples.go b/internal/component/database_observability/mysql/collector/query_samples.go index 1c9b09727ec..25f83361bd6 100644 --- a/internal/component/database_observability/mysql/collector/query_samples.go +++ b/internal/component/database_observability/mysql/collector/query_samples.go @@ -57,7 +57,9 @@ SELECT waits.event_name as WAIT_EVENT_NAME, waits.object_name as WAIT_OBJECT_NAME, waits.object_type as WAIT_OBJECT_TYPE, - waits.timer_wait as WAIT_TIMER_WAIT + waits.timer_wait as WAIT_TIMER_WAIT, + threads.PROCESSLIST_USER as QUERY_USER, + threads.PROCESSLIST_HOST as QUERY_HOST %s FROM performance_schema.events_statements_history AS statements @@ -65,6 +67,9 @@ LEFT JOIN performance_schema.events_waits_history waits ON statements.thread_id = waits.thread_id AND statements.EVENT_ID = waits.NESTING_EVENT_ID +LEFT JOIN + performance_schema.threads threads + ON statements.THREAD_ID = threads.THREAD_ID WHERE statements.DIGEST IS NOT NULL AND statements.CURRENT_SCHEMA NOT IN %s @@ -294,6 +299,10 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error { WaitObjectName sql.NullString WaitObjectType sql.NullString WaitTime sql.NullFloat64 + + // user and host who issued the query + User sql.NullString + Host sql.NullString }{} scanArgs := []any{ @@ -314,6 +323,8 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error { &row.WaitObjectName, &row.WaitObjectType, &row.WaitTime, + &row.User, + &row.Host, } if semver.MustParseRange(">=8.0.28")(c.engineVersion) { @@ -345,8 +356,8 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error { elapsedTime := picosecondsToMilliseconds(row.ElapsedTimePicoseconds.Float64) logMessage := fmt.Sprintf( - `schema="%s" thread_id="%s" event_id="%s" end_event_id="%s" digest="%s" rows_examined="%d" rows_sent="%d" rows_affected="%d" errors="%d" max_controlled_memory="%db" max_total_memory="%db" cpu_time="%fms" elapsed_time="%fms" elapsed_time_ms="%fms"`, - row.Schema.String, row.ThreadID.String, + `schema="%s" user="%s" client_host="%s" thread_id="%s" event_id="%s" end_event_id="%s" digest="%s" rows_examined="%d" rows_sent="%d" rows_affected="%d" errors="%d" max_controlled_memory="%db" max_total_memory="%db" cpu_time="%fms" elapsed_time="%fms" elapsed_time_ms="%fms"`, + row.Schema.String, row.User.String, row.Host.String, row.ThreadID.String, row.StatementEventID.String, row.StatementEndEventID.String, row.Digest.String, row.RowsExamined, @@ -378,8 +389,10 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error { if row.WaitEventID.Valid && row.WaitTime.Valid { waitTime := picosecondsToMilliseconds(row.WaitTime.Float64) waitLogMessage := fmt.Sprintf( - `schema="%s" thread_id="%s" digest="%s" event_id="%s" wait_event_id="%s" wait_end_event_id="%s" wait_event_name="%s" wait_object_name="%s" wait_object_type="%s" wait_time="%fms"`, + `schema="%s" user="%s" client_host="%s" thread_id="%s" digest="%s" event_id="%s" wait_event_id="%s" wait_end_event_id="%s" wait_event_name="%s" wait_object_name="%s" wait_object_type="%s" wait_time="%fms"`, row.Schema.String, + row.User.String, + row.Host.String, row.ThreadID.String, row.Digest.String, row.StatementEventID.String, diff --git a/internal/component/database_observability/mysql/collector/query_samples_test.go b/internal/component/database_observability/mysql/collector/query_samples_test.go index 494222f7681..c95e827cd37 100644 --- a/internal/component/database_observability/mysql/collector/query_samples_test.go +++ b/internal/component/database_observability/mysql/collector/query_samples_test.go @@ -53,6 +53,8 @@ func TestQuerySamples(t *testing.T) { nil, nil, nil, + "some_user", + "some_host", "10000000", "456", "457", @@ -61,7 +63,7 @@ func TestQuerySamples(t *testing.T) { {"op": OP_QUERY_SAMPLE}, }, logsLines: []string{ - "level=\"info\" schema=\"some_schema\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", + "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", }, }, { @@ -84,6 +86,8 @@ func TestQuerySamples(t *testing.T) { nil, nil, nil, + "some_user", + "some_host", "10000000", "456", "457", @@ -105,6 +109,8 @@ func TestQuerySamples(t *testing.T) { nil, nil, nil, + "some_user", + "some_host", "10000000", "456", "457", @@ -114,8 +120,8 @@ func TestQuerySamples(t *testing.T) { {"op": OP_QUERY_SAMPLE}, }, logsLines: []string{ - "level=\"info\" schema=\"some_schema\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", - "level=\"info\" schema=\"some_other_schema\" thread_id=\"891\" event_id=\"124\" end_event_id=\"235\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", + "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", + "level=\"info\" schema=\"some_other_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"891\" event_id=\"124\" end_event_id=\"235\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", }, }, } @@ -182,6 +188,8 @@ func TestQuerySamples(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "statements.CPU_TIME", "statements.MAX_CONTROLLED_MEMORY", "statements.MAX_TOTAL_MEMORY", @@ -266,6 +274,8 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "statements.CPU_TIME", "statements.MAX_CONTROLLED_MEMORY", "statements.MAX_TOTAL_MEMORY", @@ -288,6 +298,8 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "wait_object_name", "wait_object_type", "100000000", + "some_user", + "some_host", "10000000", "456", "457", @@ -314,9 +326,9 @@ func TestQuerySamples_WaitEvents(t *testing.T) { lokiEntries := lokiClient.Received() assert.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - assert.Equal(t, "level=\"info\" schema=\"some_schema\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) + assert.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) assert.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, lokiEntries[1].Labels) - assert.Equal(t, "level=\"info\" schema=\"some_schema\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"124\" wait_end_event_id=\"124\" wait_event_name=\"wait/io/file/innodb/innodb_data_file\" wait_object_name=\"wait_object_name\" wait_object_type=\"wait_object_type\" wait_time=\"0.100000ms\"", lokiEntries[1].Line) + assert.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"124\" wait_end_event_id=\"124\" wait_event_name=\"wait/io/file/innodb/innodb_data_file\" wait_object_name=\"wait_object_name\" wait_object_type=\"wait_object_type\" wait_time=\"0.100000ms\"", lokiEntries[1].Line) }) t.Run("wait event with NULL timer_wait is skipped", func(t *testing.T) { @@ -361,6 +373,8 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "statements.CPU_TIME", "statements.MAX_CONTROLLED_MEMORY", "statements.MAX_TOTAL_MEMORY", @@ -383,6 +397,8 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "wait_object_name", "wait_object_type", nil, // NULL timer_wait + "some_user", + "some_host", "10000000", "456", "457", @@ -411,7 +427,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { lokiEntries := lokiClient.Received() require.Len(t, lokiEntries, 1) assert.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - assert.Equal(t, "level=\"info\" schema=\"some_schema\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) + assert.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) }) t.Run("query sample and multiple wait events are collected", func(t *testing.T) { @@ -456,6 +472,8 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "statements.CPU_TIME", "statements.MAX_CONTROLLED_MEMORY", "statements.MAX_TOTAL_MEMORY", @@ -478,6 +496,8 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "books", "TABLE", "150000", + "some_user", + "some_host", "10000000", "456", "457", @@ -500,6 +520,8 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "categories", "TABLE", "350000", + "some_user", + "some_host", "10000000", "456", "457", @@ -522,6 +544,8 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "books", "TABLE", "500000", + "some_user", + "some_host", "10000000", "456", "457", @@ -544,6 +568,8 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "categories", "TABLE", "700000", + "some_user", + "some_host", "10000000", "456", "457", @@ -570,15 +596,15 @@ func TestQuerySamples_WaitEvents(t *testing.T) { lokiEntries := lokiClient.Received() assert.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - assert.Equal(t, "level=\"info\" schema=\"books_store\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) + assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) assert.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, lokiEntries[1].Labels) - assert.Equal(t, "level=\"info\" schema=\"books_store\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"124\" wait_end_event_id=\"125\" wait_event_name=\"wait/lock/table/sql/handler\" wait_object_name=\"books\" wait_object_type=\"TABLE\" wait_time=\"0.000150ms\"", lokiEntries[1].Line) + assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"124\" wait_end_event_id=\"125\" wait_event_name=\"wait/lock/table/sql/handler\" wait_object_name=\"books\" wait_object_type=\"TABLE\" wait_time=\"0.000150ms\"", lokiEntries[1].Line) assert.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, lokiEntries[2].Labels) - assert.Equal(t, "level=\"info\" schema=\"books_store\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"126\" wait_end_event_id=\"126\" wait_event_name=\"wait/lock/table/sql/handler\" wait_object_name=\"categories\" wait_object_type=\"TABLE\" wait_time=\"0.000350ms\"", lokiEntries[2].Line) + assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"126\" wait_end_event_id=\"126\" wait_event_name=\"wait/lock/table/sql/handler\" wait_object_name=\"categories\" wait_object_type=\"TABLE\" wait_time=\"0.000350ms\"", lokiEntries[2].Line) assert.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, lokiEntries[3].Labels) - assert.Equal(t, "level=\"info\" schema=\"books_store\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"127\" wait_end_event_id=\"127\" wait_event_name=\"wait/io/table/sql/handler\" wait_object_name=\"books\" wait_object_type=\"TABLE\" wait_time=\"0.000500ms\"", lokiEntries[3].Line) + assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"127\" wait_end_event_id=\"127\" wait_event_name=\"wait/io/table/sql/handler\" wait_object_name=\"books\" wait_object_type=\"TABLE\" wait_time=\"0.000500ms\"", lokiEntries[3].Line) assert.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, lokiEntries[4].Labels) - assert.Equal(t, "level=\"info\" schema=\"books_store\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"128\" wait_end_event_id=\"128\" wait_event_name=\"wait/io/table/sql/handler\" wait_object_name=\"categories\" wait_object_type=\"TABLE\" wait_time=\"0.000700ms\"", lokiEntries[4].Line) + assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"128\" wait_end_event_id=\"128\" wait_event_name=\"wait/io/table/sql/handler\" wait_object_name=\"categories\" wait_object_type=\"TABLE\" wait_time=\"0.000700ms\"", lokiEntries[4].Line) }) t.Run("query sample and its wait event and another query sample are collected", func(t *testing.T) { @@ -623,6 +649,8 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "statements.CPU_TIME", "statements.MAX_CONTROLLED_MEMORY", "statements.MAX_TOTAL_MEMORY", @@ -645,6 +673,8 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "books", "TABLE", "150000", + "some_user", + "some_host", "10000000", "456", "457", @@ -667,6 +697,8 @@ func TestQuerySamples_WaitEvents(t *testing.T) { nil, nil, nil, + "some_user", + "some_host", "10000000", "456", "457", @@ -693,11 +725,11 @@ func TestQuerySamples_WaitEvents(t *testing.T) { lokiEntries := lokiClient.Received() assert.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - assert.Equal(t, "level=\"info\" schema=\"books_store\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) + assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) assert.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, lokiEntries[1].Labels) - assert.Equal(t, "level=\"info\" schema=\"books_store\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"124\" wait_end_event_id=\"125\" wait_event_name=\"wait/lock/table/sql/handler\" wait_object_name=\"books\" wait_object_type=\"TABLE\" wait_time=\"0.000150ms\"", lokiEntries[1].Line) + assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"124\" wait_end_event_id=\"125\" wait_event_name=\"wait/lock/table/sql/handler\" wait_object_name=\"books\" wait_object_type=\"TABLE\" wait_time=\"0.000150ms\"", lokiEntries[1].Line) assert.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[2].Labels) - assert.Equal(t, "level=\"info\" schema=\"books_store\" thread_id=\"890\" event_id=\"126\" end_event_id=\"234\" digest=\"another_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[2].Line) + assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"126\" end_event_id=\"234\" digest=\"another_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[2].Line) }) t.Run("wait event with disabled sql redaction", func(t *testing.T) { @@ -759,6 +791,8 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "statements.CPU_TIME", "statements.MAX_CONTROLLED_MEMORY", "statements.MAX_TOTAL_MEMORY", @@ -781,6 +815,8 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "wait_object_name", "wait_object_type", "100000000", + "some_user", + "some_host", "10000000", "456", "457", @@ -807,9 +843,9 @@ func TestQuerySamples_WaitEvents(t *testing.T) { lokiEntries := lokiClient.Received() assert.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - assert.Equal(t, "level=\"info\" schema=\"some_schema\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" sql_text=\"select * from some_table where id = 1\"", lokiEntries[0].Line) + assert.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" sql_text=\"select * from some_table where id = 1\"", lokiEntries[0].Line) assert.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, lokiEntries[1].Labels) - assert.Equal(t, "level=\"info\" schema=\"some_schema\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"124\" wait_end_event_id=\"125\" wait_event_name=\"wait/io/file/innodb/innodb_data_file\" wait_object_name=\"wait_object_name\" wait_object_type=\"wait_object_type\" wait_time=\"0.100000ms\" sql_text=\"select * from some_table where id = 1\"", lokiEntries[1].Line) + assert.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"124\" wait_end_event_id=\"125\" wait_event_name=\"wait/io/file/innodb/innodb_data_file\" wait_object_name=\"wait_object_name\" wait_object_type=\"wait_object_type\" wait_time=\"0.100000ms\" sql_text=\"select * from some_table where id = 1\"", lokiEntries[1].Line) }) } @@ -872,6 +908,8 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "statements.CPU_TIME", "statements.MAX_CONTROLLED_MEMORY", "statements.MAX_TOTAL_MEMORY", @@ -894,6 +932,8 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { nil, nil, nil, + "some_user", + "some_host", "10000000", "456", "457", @@ -920,7 +960,7 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - require.Equal(t, "level=\"info\" schema=\"some_schema\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" sql_text=\"select * from some_table where id = 1\"", lokiEntries[0].Line) + require.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" sql_text=\"select * from some_table where id = 1\"", lokiEntries[0].Line) }) t.Run("does not collect sql text when disabled", func(t *testing.T) { @@ -980,6 +1020,8 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "statements.CPU_TIME", "statements.MAX_CONTROLLED_MEMORY", "statements.MAX_TOTAL_MEMORY", @@ -1001,6 +1043,8 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { nil, nil, nil, + "some_user", + "some_host", "10000000", "456", "457", @@ -1026,7 +1070,7 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - require.Equal(t, "level=\"info\" schema=\"some_schema\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) + require.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) }) } @@ -1063,8 +1107,10 @@ func TestQuerySamplesMySQLVersions(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", }, - expectedLogOutput: `level="info" schema="test_schema" thread_id="890" event_id="123" end_event_id="234" digest="some_digest" rows_examined="5" rows_sent="5" rows_affected="0" errors="0" max_controlled_memory="0b" max_total_memory="0b" cpu_time="0.000000ms" elapsed_time="0.020000ms" elapsed_time_ms="0.020000ms"`, + expectedLogOutput: `level="info" schema="test_schema" user="some_user" client_host="some_host" thread_id="890" event_id="123" end_event_id="234" digest="some_digest" rows_examined="5" rows_sent="5" rows_affected="0" errors="0" max_controlled_memory="0b" max_total_memory="0b" cpu_time="0.000000ms" elapsed_time="0.020000ms" elapsed_time_ms="0.020000ms"`, scanValues: []driver.Value{ "test_schema", "890", @@ -1083,6 +1129,8 @@ func TestQuerySamplesMySQLVersions(t *testing.T) { nil, nil, nil, + "some_user", + "some_host", }, }, { @@ -1107,9 +1155,11 @@ func TestQuerySamplesMySQLVersions(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "statements.CPU_TIME", }, - expectedLogOutput: `level="info" schema="test_schema" thread_id="890" event_id="123" end_event_id="234" digest="some_digest" rows_examined="5" rows_sent="5" rows_affected="0" errors="0" max_controlled_memory="0b" max_total_memory="0b" cpu_time="0.010000ms" elapsed_time="0.020000ms" elapsed_time_ms="0.020000ms"`, + expectedLogOutput: `level="info" schema="test_schema" user="some_user" client_host="some_host" thread_id="890" event_id="123" end_event_id="234" digest="some_digest" rows_examined="5" rows_sent="5" rows_affected="0" errors="0" max_controlled_memory="0b" max_total_memory="0b" cpu_time="0.010000ms" elapsed_time="0.020000ms" elapsed_time_ms="0.020000ms"`, scanValues: []driver.Value{ "test_schema", "890", @@ -1128,6 +1178,8 @@ func TestQuerySamplesMySQLVersions(t *testing.T) { nil, nil, nil, + "some_user", + "some_host", "10000000", // CPU_TIME }, }, @@ -1153,11 +1205,13 @@ func TestQuerySamplesMySQLVersions(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "statements.CPU_TIME", "statements.MAX_CONTROLLED_MEMORY", "statements.MAX_TOTAL_MEMORY", }, - expectedLogOutput: `level="info" schema="test_schema" thread_id="890" event_id="123" end_event_id="234" digest="some_digest" rows_examined="5" rows_sent="5" rows_affected="0" errors="0" max_controlled_memory="1024b" max_total_memory="2048b" cpu_time="0.010000ms" elapsed_time="0.020000ms" elapsed_time_ms="0.020000ms"`, + expectedLogOutput: `level="info" schema="test_schema" user="some_user" client_host="some_host" thread_id="890" event_id="123" end_event_id="234" digest="some_digest" rows_examined="5" rows_sent="5" rows_affected="0" errors="0" max_controlled_memory="1024b" max_total_memory="2048b" cpu_time="0.010000ms" elapsed_time="0.020000ms" elapsed_time_ms="0.020000ms"`, scanValues: []driver.Value{ "test_schema", "890", @@ -1176,6 +1230,8 @@ func TestQuerySamplesMySQLVersions(t *testing.T) { nil, nil, nil, + "some_user", + "some_host", "10000000", // CPU_TIME "1024", // MAX_CONTROLLED_MEMORY "2048", // MAX_TOTAL_MEMORY @@ -1332,6 +1388,8 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "statements.CPU_TIME", "statements.MAX_CONTROLLED_MEMORY", "statements.MAX_TOTAL_MEMORY", @@ -1353,6 +1411,8 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { nil, nil, nil, + "some_user", + "some_host", "10000000", "456", "457", @@ -1378,7 +1438,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - require.Equal(t, "level=\"info\" schema=\"some_schema\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) + require.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) }) t.Run("result set iteration error", func(t *testing.T) { @@ -1437,6 +1497,8 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "statements.CPU_TIME", "statements.MAX_CONTROLLED_MEMORY", "statements.MAX_TOTAL_MEMORY", @@ -1458,6 +1520,8 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { nil, nil, nil, + "some_user", + "some_host", "10000000", "456", "457", @@ -1479,6 +1543,8 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { nil, nil, nil, + "some_user", + "some_host", "10000000", "456", "457", @@ -1504,7 +1570,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - require.Equal(t, "level=\"info\" schema=\"some_schema\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) + require.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) }) t.Run("connection error recovery", func(t *testing.T) { @@ -1580,6 +1646,8 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "statements.CPU_TIME", "statements.MAX_CONTROLLED_MEMORY", "statements.MAX_TOTAL_MEMORY", @@ -1601,6 +1669,8 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { nil, nil, nil, + "some_user", + "some_host", "10000000", "456", "457", @@ -1626,7 +1696,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - require.Equal(t, "level=\"info\" schema=\"some_schema\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) + require.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) }) } @@ -1706,6 +1776,8 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "statements.CPU_TIME", "statements.MAX_CONTROLLED_MEMORY", "statements.MAX_TOTAL_MEMORY", @@ -1728,6 +1800,8 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { nil, // WAIT_OBJECT_NAME nil, // WAIT_OBJECT_TYPE nil, // WAIT_TIME + "some_user", // PROCESSLIST_USER + "some_host", // PROCESSLIST_HOST 555555, // cpu_time 1048576, // max_controlled_memory (1MB) 2097152, // max_total_memory (2MB) @@ -1758,7 +1832,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { assert.Equal(t, model.LabelSet{ "op": OP_QUERY_SAMPLE, }, lokiClient.Received()[0].Labels) - assert.Equal(t, "level=\"info\" schema=\"test_schema\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some digest\" rows_examined=\"1000\" rows_sent=\"100\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"1048576b\" max_total_memory=\"2097152b\" cpu_time=\"0.000556ms\" elapsed_time=\"2000.000000ms\" elapsed_time_ms=\"2000.000000ms\"", lokiClient.Received()[0].Line) + assert.Equal(t, "level=\"info\" schema=\"test_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some digest\" rows_examined=\"1000\" rows_sent=\"100\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"1048576b\" max_total_memory=\"2097152b\" cpu_time=\"0.000556ms\" elapsed_time=\"2000.000000ms\" elapsed_time_ms=\"2000.000000ms\"", lokiClient.Received()[0].Line) }) t.Run("asserts that expected query text is used in the constants", func(t *testing.T) { @@ -1800,6 +1874,8 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "cpu_time", "max_controlled_memory", "max_total_memory", @@ -1850,6 +1926,8 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "cpu_time", "max_controlled_memory", "max_total_memory", @@ -1900,6 +1978,8 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "cpu_time", "max_controlled_memory", "max_total_memory", @@ -1940,6 +2020,8 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "cpu_time", "max_controlled_memory", "max_total_memory", @@ -1982,6 +2064,8 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "cpu_time", "max_controlled_memory", "max_total_memory", @@ -2026,6 +2110,8 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "cpu_time", "max_controlled_memory", "max_total_memory", @@ -2100,6 +2186,8 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "cpu_time", "max_controlled_memory", "max_total_memory", @@ -2119,6 +2207,8 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { nil, // WAIT_OBJECT_NAME nil, // WAIT_OBJECT_TYPE nil, // WAIT_TIME + "some_user", // PROCESSLIST_USER + "some_host", // PROCESSLIST_HOST 555555, // cpu_time 1048576, // max_controlled_memory (1MB) 2097152, // max_total_memory (2MB) @@ -2255,6 +2345,8 @@ func TestQuerySamples_AutoEnableSetupConsumers(t *testing.T) { "waits.object_name", "waits.object_type", "waits.timer_wait", + "threads.PROCESSLIST_USER", + "threads.PROCESSLIST_HOST", "statements.CPU_TIME", "statements.MAX_CONTROLLED_MEMORY", "statements.MAX_TOTAL_MEMORY", @@ -2276,6 +2368,8 @@ func TestQuerySamples_AutoEnableSetupConsumers(t *testing.T) { nil, nil, nil, + "some_user", + "some_host", "10000000", "456", "457", @@ -2384,7 +2478,7 @@ func TestQuerySamplesExcludeSchemas(t *testing.T) { "current_schema", "thread_id", "event_id", "end_event_id", "digest", "timer_end", "timer_wait", "rows_examined", "rows_sent", "rows_affected", "errors", "object_schema", "object_name", "object_type", "index_name", - "lock_time", "digest_text", "cpu_time", "max_controlled_memory", "max_total_memory", + "lock_time", "digest_text", "threads.PROCESSLIST_USER", "threads.PROCESSLIST_HOST", "cpu_time", "max_controlled_memory", "max_total_memory", })) c.fetchQuerySamples(t.Context())