Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugin/trino-mysql-event-listener/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
<artifactId>json</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
*/
package io.trino.plugin.eventlistener.mysql;

import com.google.common.base.Joiner;
import com.google.inject.Inject;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.trino.spi.ErrorCode;
import io.trino.spi.ErrorType;
import io.trino.spi.TrinoWarning;
Expand Down Expand Up @@ -47,6 +49,10 @@
public class MysqlEventListener
implements EventListener
{
private static final Logger log = Logger.get(MysqlEventListener.class);

private static final long MAX_OPERATOR_SUMMARIES_JSON_LENGTH = 16 * 1024 * 1024;

private final QueryDao dao;
private final JsonCodec<Set<String>> clientTagsJsonCodec;
private final JsonCodec<Map<String, String>> sessionPropertiesJsonCodec;
Expand Down Expand Up @@ -152,10 +158,25 @@ public void queryCompleted(QueryCompletedEvent event)
stats.getCumulativeMemory(),
stats.getFailedCumulativeMemory(),
stats.getCompletedSplits(),
context.getRetryPolicy());
context.getRetryPolicy(),
createOperatorSummariesJson(metadata.getQueryId(), stats.getOperatorSummaries()));
dao.store(entity);
}

private Optional<String> createOperatorSummariesJson(String queryId, List<String> summaries)
{
StringBuilder builder = new StringBuilder();
builder.append("[");
Joiner.on(",").appendTo(builder, summaries);
builder.append("]");
String result = builder.toString();
if (result.length() > MAX_OPERATOR_SUMMARIES_JSON_LENGTH) {
log.info("Exceeded maximum operator summaries length for query %s: %s", queryId, result);
return Optional.empty();
}
return Optional.of(result);
}

@Override
public void splitCompleted(SplitCompletedEvent event) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ public interface QueryDao
" cumulative_memory DOUBLE NOT NULL,\n" +
" failed_cumulative_memory DOUBLE NOT NULL,\n" +
" completed_splits BIGINT NOT NULL,\n" +
" retry_policy VARCHAR(255) NOT NULL\n" +
")")
" retry_policy VARCHAR(255) NOT NULL,\n" +
" operator_summaries_json MEDIUMTEXT NOT NULL\n" +
")\n" +
"DEFAULT CHARACTER SET utf8")
void createTable();

@SqlUpdate("INSERT INTO trino_queries (\n" +
Expand Down Expand Up @@ -152,7 +154,8 @@ public interface QueryDao
" cumulative_memory,\n" +
" failed_cumulative_memory,\n" +
" completed_splits,\n" +
" retry_policy\n" +
" retry_policy,\n" +
" operator_summaries_json\n" +
")\n" +
"VALUES (\n" +
" :queryId,\n" +
Expand Down Expand Up @@ -219,7 +222,8 @@ public interface QueryDao
" :cumulativeMemory,\n" +
" :failedCumulativeMemory,\n" +
" :completedSplits,\n" +
" :retryPolicy\n" +
" :retryPolicy,\n" +
" :operatorSummariesJson\n" +
")")
void store(@BindBean QueryEntity entity);
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class QueryEntity
private final int completedSplits;

private final String retryPolicy;
private final Optional<String> operatorSummariesJson;

public QueryEntity(
String queryId,
Expand Down Expand Up @@ -165,7 +166,8 @@ public QueryEntity(
double cumulativeMemory,
double failedCumulativeMemory,
int completedSplits,
String retryPolicy)
String retryPolicy,
Optional<String> operatorSummariesJson)
{
this.queryId = requireNonNull(queryId, "queryId is null");
this.transactionId = requireNonNull(transactionId, "transactionId is null");
Expand Down Expand Up @@ -232,6 +234,7 @@ public QueryEntity(
this.failedCumulativeMemory = failedCumulativeMemory;
this.completedSplits = completedSplits;
this.retryPolicy = requireNonNull(retryPolicy, "retryPolicy is null");
this.operatorSummariesJson = requireNonNull(operatorSummariesJson, "operatorSummariesJson is null");
}

public String getQueryId()
Expand Down Expand Up @@ -558,4 +561,9 @@ public String getRetryPolicy()
{
return retryPolicy;
}

public Optional<String> getOperatorSummariesJson()
{
return operatorSummariesJson;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public class TestMysqlEventListener
// not stored
Collections.emptyList(),
// not stored
Collections.emptyList(),
List.of("{operator: \"operator1\"}", "{operator: \"operator2\"}"),
// not stored
Optional.empty());

Expand Down Expand Up @@ -267,7 +267,6 @@ public class TestMysqlEventListener
Collections.emptyList(),
// not stored
Collections.emptyList(),
// not stored
Collections.emptyList(),
// not stored
Optional.empty());
Expand Down Expand Up @@ -421,6 +420,7 @@ public void testFull()
assertEquals(resultSet.getDouble("failed_cumulative_memory"), 129.0);
assertEquals(resultSet.getLong("completed_splits"), 130);
assertEquals(resultSet.getString("retry_policy"), "TASK");
assertEquals(resultSet.getString("operator_summaries_json"), "[{operator: \"operator1\"},{operator: \"operator2\"}]");
assertFalse(resultSet.next());
}
}
Expand Down Expand Up @@ -503,6 +503,7 @@ public void testMinimal()
assertEquals(resultSet.getDouble("failed_cumulative_memory"), 129.0);
assertEquals(resultSet.getLong("completed_splits"), 130);
assertEquals(resultSet.getString("retry_policy"), "NONE");
assertEquals(resultSet.getString("operator_summaries_json"), "[]");
assertFalse(resultSet.next());
}
}
Expand Down