Skip to content

Commit 0c72600

Browse files
mkarrmannfeilong-liu
authored andcommitted
feat(OSS Presto): Support tracking Page Sink Runtime Stats in TableWriterOperator (prestodb#25846)
Summary: Pull Request resolved: prestodb#25846 Pass the Operator Context's Runtime Stats down into the `TableWriteOperator`'s Page Sink. Specifically this diff makes the following changes: a) `TableWriteOperator` passes its `RuntimeStats` into the Page Sink it creates via `PageSinkManager.createPageSink` b) When the `PageSinkManager.createPageSink` is provided `RuntimeStats`, these `RuntimeStats` are passed into the `Session.toConnectorSession` call, which creates a `FullConnectorSession` instance c) When `Session.toConnectorSession` is provided `RuntimeStats`, it passes this into the `FullConnectorSession` instance it constructs d) Add a `Builder` to `FullConnectorSession`, which allows providing a `RuntimeStats` instance to `FullConnectorSession` at construction-time. `FullConnectorSession.getRuntimeStats()` now returns the `RuntimeStats` which was set at construction-time. If no `RuntimeStats` were provided at construction-time, then `FullConnectorSession.getRuntimeStats()` defaults to return the `Session` object's `RuntimeStats`—this preserves backwards compatibility. All changes preserve forward-compatibility. ## Context Without this change, the `FullConnectorSession`'s `RuntimeStats` points to the `Session`'s `RuntimeStat`s. All metrics added to the `Session`'s `RuntimeStats` within an Operator Worker-side are discarded. That is, all Runtime Metrics added to the Connector Session's RuntimeStats when executing `TableWriterOperator` were being completely discarded. Specifically, in Meta, the stats from our internal filesystem implementation were missing. Passing the Operator Context's `RuntimeStats` instance down into Connector Session is the simplest way to fix this. Additionally, since the previous `RuntimeStat`s for `TableWriteOperator`'s `FullConnectorSession` were always discarded, we can be confident that replacing them with the `OperatorContext` `RuntimeStat`s will not break anyone else's code. Differential Revision: D80675849
1 parent 8bf4ab1 commit 0c72600

File tree

4 files changed

+162
-36
lines changed

4 files changed

+162
-36
lines changed

presto-main-base/src/main/java/com/facebook/presto/FullConnectorSession.java

Lines changed: 120 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,11 @@ public class FullConnectorSession
4848
private final SessionPropertyManager sessionPropertyManager;
4949
private final SqlFunctionProperties sqlFunctionProperties;
5050
private final Map<SqlFunctionId, SqlInvokedFunction> sessionFunctions;
51+
private final RuntimeStats runtimeStats;
5152

5253
public FullConnectorSession(Session session, ConnectorIdentity identity)
5354
{
54-
this.session = requireNonNull(session, "session is null");
55-
this.identity = requireNonNull(identity, "identity is null");
56-
this.properties = null;
57-
this.connectorId = null;
58-
this.catalog = null;
59-
this.sessionPropertyManager = null;
60-
this.sqlFunctionProperties = session.getSqlFunctionProperties();
61-
this.sessionFunctions = ImmutableMap.copyOf(session.getSessionFunctions());
55+
this(builder(session, identity, null, null, null, null));
6256
}
6357

6458
public FullConnectorSession(
@@ -69,14 +63,123 @@ public FullConnectorSession(
6963
String catalog,
7064
SessionPropertyManager sessionPropertyManager)
7165
{
72-
this.session = requireNonNull(session, "session is null");
73-
this.identity = requireNonNull(identity, "identity is null");
74-
this.properties = ImmutableMap.copyOf(requireNonNull(properties, "properties is null"));
75-
this.connectorId = requireNonNull(connectorId, "connectorId is null");
76-
this.catalog = requireNonNull(catalog, "catalog is null");
77-
this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
78-
this.sqlFunctionProperties = session.getSqlFunctionProperties();
79-
this.sessionFunctions = ImmutableMap.copyOf(session.getSessionFunctions());
66+
this(builder(session, identity, properties, connectorId, catalog, sessionPropertyManager));
67+
}
68+
69+
private FullConnectorSession(Builder builder)
70+
{
71+
this.session = builder.getSession();
72+
this.identity = builder.getIdentity();
73+
this.properties = builder.getProperties();
74+
this.connectorId = builder.getConnectorId();
75+
this.catalog = builder.getCatalog();
76+
this.sessionPropertyManager = builder.getSessionPropertyManager();
77+
this.sqlFunctionProperties = builder.getSqlFunctionProperties() != null ? builder.getSqlFunctionProperties() : builder.getSession().getSqlFunctionProperties();
78+
this.sessionFunctions = builder.getSessionFunctions() != null ? builder.getSessionFunctions() : ImmutableMap.copyOf(builder.getSession().getSessionFunctions());
79+
this.runtimeStats = builder.getRuntimeStats() != null ? builder.getRuntimeStats() : builder.getSession().getRuntimeStats();
80+
}
81+
82+
public static Builder builder(
83+
Session session,
84+
ConnectorIdentity identity,
85+
Map<String, String> properties,
86+
ConnectorId connectorId,
87+
String catalog,
88+
SessionPropertyManager sessionPropertyManager)
89+
{
90+
return new Builder(session, identity, properties, connectorId, catalog, sessionPropertyManager);
91+
}
92+
93+
public static class Builder
94+
{
95+
private final Session session;
96+
private final ConnectorIdentity identity;
97+
private final Map<String, String> properties;
98+
private final ConnectorId connectorId;
99+
private final String catalog;
100+
private final SessionPropertyManager sessionPropertyManager;
101+
102+
private SqlFunctionProperties sqlFunctionProperties;
103+
private Map<SqlFunctionId, SqlInvokedFunction> sessionFunctions;
104+
private RuntimeStats runtimeStats;
105+
106+
private Builder(Session session, ConnectorIdentity identity, Map<String, String> properties, ConnectorId connectorId, String catalog, SessionPropertyManager sessionPropertyManager)
107+
{
108+
this.session = requireNonNull(session, "session is null");
109+
this.identity = requireNonNull(identity, "identity is null");
110+
this.properties = properties;
111+
this.connectorId = connectorId;
112+
this.catalog = catalog;
113+
this.sessionPropertyManager = sessionPropertyManager;
114+
}
115+
116+
public Session getSession()
117+
{
118+
return session;
119+
}
120+
121+
public ConnectorIdentity getIdentity()
122+
{
123+
return identity;
124+
}
125+
126+
public Map<String, String> getProperties()
127+
{
128+
return properties;
129+
}
130+
131+
public ConnectorId getConnectorId()
132+
{
133+
return connectorId;
134+
}
135+
136+
public String getCatalog()
137+
{
138+
return catalog;
139+
}
140+
141+
public SessionPropertyManager getSessionPropertyManager()
142+
{
143+
return sessionPropertyManager;
144+
}
145+
146+
public SqlFunctionProperties getSqlFunctionProperties()
147+
{
148+
return sqlFunctionProperties;
149+
}
150+
151+
public Builder setSqlFunctionProperties(SqlFunctionProperties sqlFunctionProperties)
152+
{
153+
this.sqlFunctionProperties = sqlFunctionProperties;
154+
return this;
155+
}
156+
157+
public Map<SqlFunctionId, SqlInvokedFunction> getSessionFunctions()
158+
{
159+
return sessionFunctions;
160+
}
161+
162+
public Builder setSessionFunctions(Map<SqlFunctionId, SqlInvokedFunction> sessionFunctions)
163+
{
164+
this.sessionFunctions = sessionFunctions;
165+
return this;
166+
}
167+
168+
public RuntimeStats getRuntimeStats()
169+
{
170+
return runtimeStats;
171+
}
172+
173+
public Builder setRuntimeStats(RuntimeStats runtimeStats)
174+
{
175+
this.runtimeStats = runtimeStats;
176+
return this;
177+
}
178+
179+
public FullConnectorSession build()
180+
{
181+
return new FullConnectorSession(this);
182+
}
80183
}
81184

82185
public Session getSession()
@@ -197,7 +300,7 @@ public WarningCollector getWarningCollector()
197300
@Override
198301
public RuntimeStats getRuntimeStats()
199302
{
200-
return session.getRuntimeStats();
303+
return runtimeStats;
201304
}
202305

203306
@Override

presto-main-base/src/main/java/com/facebook/presto/Session.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -495,17 +495,26 @@ public SqlFunctionProperties getSqlFunctionProperties()
495495
.build();
496496
}
497497

498-
public ConnectorSession toConnectorSession(ConnectorId connectorId)
498+
public ConnectorSession toConnectorSession(ConnectorId connectorId, RuntimeStats runtimeStats)
499499
{
500500
requireNonNull(connectorId, "connectorId is null");
501501

502-
return new FullConnectorSession(
503-
this,
504-
identity.toConnectorIdentity(connectorId.getCatalogName()),
505-
connectorProperties.getOrDefault(connectorId, ImmutableMap.of()),
506-
connectorId,
507-
connectorId.getCatalogName(),
508-
sessionPropertyManager);
502+
FullConnectorSession.Builder connectorSessionBuilder = FullConnectorSession
503+
.builder(
504+
this,
505+
identity.toConnectorIdentity(connectorId.getCatalogName()),
506+
connectorProperties.getOrDefault(connectorId, ImmutableMap.of()),
507+
connectorId,
508+
connectorId.getCatalogName(),
509+
sessionPropertyManager)
510+
.setRuntimeStats(runtimeStats);
511+
512+
return connectorSessionBuilder.build();
513+
}
514+
515+
public ConnectorSession toConnectorSession(ConnectorId connectorId)
516+
{
517+
return toConnectorSession(connectorId, runtimeStats);
509518
}
510519

511520
public SessionRepresentation toSessionRepresentation()

presto-main-base/src/main/java/com/facebook/presto/operator/TableWriterOperator.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.facebook.drift.annotations.ThriftStruct;
2121
import com.facebook.presto.Session;
2222
import com.facebook.presto.common.Page;
23+
import com.facebook.presto.common.RuntimeStats;
2324
import com.facebook.presto.common.block.Block;
2425
import com.facebook.presto.common.block.BlockBuilder;
2526
import com.facebook.presto.common.block.RunLengthEncodedBlock;
@@ -130,7 +131,7 @@ public Operator createOperator(DriverContext driverContext)
130131
boolean statisticsCpuTimerEnabled = !(statisticsAggregationOperator instanceof DevNullOperator) && isStatisticsCpuTimerEnabled(session);
131132
return new TableWriterOperator(
132133
context,
133-
createPageSink(),
134+
createPageSink(context),
134135
columnChannels,
135136
notNullChannelColumnNames,
136137
statisticsAggregationOperator,
@@ -140,19 +141,21 @@ public Operator createOperator(DriverContext driverContext)
140141
pageSinkCommitStrategy);
141142
}
142143

143-
private ConnectorPageSink createPageSink()
144+
private ConnectorPageSink createPageSink(OperatorContext operatorContext)
144145
{
145146
PageSinkContext.Builder pageSinkContextBuilder = PageSinkContext.builder()
146147
.setCommitRequired(pageSinkCommitStrategy.isCommitRequired());
147148

149+
RuntimeStats runtimeStats = operatorContext.getRuntimeStats();
150+
148151
if (target instanceof CreateHandle) {
149-
return pageSinkManager.createPageSink(session, ((CreateHandle) target).getHandle(), pageSinkContextBuilder.build());
152+
return pageSinkManager.createPageSink(session, ((CreateHandle) target).getHandle(), pageSinkContextBuilder.build(), runtimeStats);
150153
}
151154
if (target instanceof InsertHandle) {
152-
return pageSinkManager.createPageSink(session, ((InsertHandle) target).getHandle(), pageSinkContextBuilder.build());
155+
return pageSinkManager.createPageSink(session, ((InsertHandle) target).getHandle(), pageSinkContextBuilder.build(), runtimeStats);
153156
}
154157
if (target instanceof RefreshMaterializedViewHandle) {
155-
return pageSinkManager.createPageSink(session, ((RefreshMaterializedViewHandle) target).getHandle(), pageSinkContextBuilder.build());
158+
return pageSinkManager.createPageSink(session, ((RefreshMaterializedViewHandle) target).getHandle(), pageSinkContextBuilder.build(), runtimeStats);
156159
}
157160
throw new UnsupportedOperationException("Unhandled target type: " + target.getClass().getName());
158161
}

presto-main-base/src/main/java/com/facebook/presto/split/PageSinkManager.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.facebook.presto.split;
1515

1616
import com.facebook.presto.Session;
17+
import com.facebook.presto.common.RuntimeStats;
1718
import com.facebook.presto.metadata.InsertTableHandle;
1819
import com.facebook.presto.metadata.OutputTableHandle;
1920
import com.facebook.presto.spi.ConnectorId;
@@ -46,22 +47,32 @@ public void removeConnectorPageSinkProvider(ConnectorId connectorId)
4647
pageSinkProviders.remove(connectorId);
4748
}
4849

49-
@Override
50-
public ConnectorPageSink createPageSink(Session session, OutputTableHandle tableHandle, PageSinkContext pageSinkContext)
50+
public ConnectorPageSink createPageSink(Session session, OutputTableHandle tableHandle, PageSinkContext pageSinkContext, RuntimeStats runtimeStats)
5151
{
5252
// assumes connectorId and catalog are the same
53-
ConnectorSession connectorSession = session.toConnectorSession(tableHandle.getConnectorId());
53+
ConnectorSession connectorSession = session.toConnectorSession(tableHandle.getConnectorId(), runtimeStats);
5454
return providerFor(tableHandle.getConnectorId()).createPageSink(tableHandle.getTransactionHandle(), connectorSession, tableHandle.getConnectorHandle(), pageSinkContext);
5555
}
5656

57-
@Override
58-
public ConnectorPageSink createPageSink(Session session, InsertTableHandle tableHandle, PageSinkContext pageSinkContext)
57+
public ConnectorPageSink createPageSink(Session session, InsertTableHandle tableHandle, PageSinkContext pageSinkContext, RuntimeStats runtimeStats)
5958
{
6059
// assumes connectorId and catalog are the same
61-
ConnectorSession connectorSession = session.toConnectorSession(tableHandle.getConnectorId());
60+
ConnectorSession connectorSession = session.toConnectorSession(tableHandle.getConnectorId(), runtimeStats);
6261
return providerFor(tableHandle.getConnectorId()).createPageSink(tableHandle.getTransactionHandle(), connectorSession, tableHandle.getConnectorHandle(), pageSinkContext);
6362
}
6463

64+
@Override
65+
public ConnectorPageSink createPageSink(Session session, OutputTableHandle tableHandle, PageSinkContext pageSinkContext)
66+
{
67+
return createPageSink(session, tableHandle, pageSinkContext, null);
68+
}
69+
70+
@Override
71+
public ConnectorPageSink createPageSink(Session session, InsertTableHandle tableHandle, PageSinkContext pageSinkContext)
72+
{
73+
return createPageSink(session, tableHandle, pageSinkContext, null);
74+
}
75+
6576
private ConnectorPageSinkProvider providerFor(ConnectorId connectorId)
6677
{
6778
ConnectorPageSinkProvider provider = pageSinkProviders.get(connectorId);

0 commit comments

Comments
 (0)