Skip to content

Commit fd84d24

Browse files
authored
Merge branch 'master' into fd-parquet
2 parents 2634a7b + d81aad9 commit fd84d24

File tree

34 files changed

+590
-154
lines changed

34 files changed

+590
-154
lines changed

core/trino-main/src/main/java/io/trino/sql/planner/optimizations/DeterminePartitionCount.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,13 @@ private Optional<Integer> determinePartitionCount(
161161
}
162162
verify(minPartitionCount <= maxPartitionCount, "minPartitionCount %s larger than maxPartitionCount %s",
163163
minPartitionCount, maxPartitionCount);
164+
int maxPossiblePartitionCount = taskCountEstimator.estimateHashedTaskCount(session);
165+
RetryPolicy retryPolicy = getRetryPolicy(session);
166+
if (maxPossiblePartitionCount <= 2 * minPartitionCount && !retryPolicy.equals(RetryPolicy.TASK)) {
167+
// Do not set partition count if the possible partition count is already close to the minimum partition count.
168+
// This avoids incurring cost of fetching table statistics for simple queries when the cluster is small.
169+
return Optional.empty();
170+
}
164171

165172
StatsProvider statsProvider = new CachingStatsProvider(statsCalculator, session, tableStatsProvider);
166173
long queryMaxMemoryPerNode = getQueryMaxMemoryPerNode(session).toBytes();
@@ -188,7 +195,7 @@ private Optional<Integer> determinePartitionCount(
188195
return Optional.empty();
189196
}
190197

191-
if (partitionCount * 2 >= taskCountEstimator.estimateHashedTaskCount(session) && !getRetryPolicy(session).equals(RetryPolicy.TASK)) {
198+
if (partitionCount * 2 >= maxPossiblePartitionCount && !retryPolicy.equals(RetryPolicy.TASK)) {
192199
// Do not cap partition count if it's already close to the possible number of tasks.
193200
return Optional.empty();
194201
}

core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestDeterminePartitionCount.java

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,51 @@ SELECT count(column_a) FROM table_with_stats_a group by column_b
241241
node(TableScanNode.class))))))));
242242
}
243243

244+
@Test
245+
public void testDoesNotSetPartitionCountWhenNodeCountIsCloseToMinPartitionCount()
246+
{
247+
@Language("SQL") String query =
248+
"""
249+
SELECT count(column_a) FROM table_with_stats_a group by column_b
250+
""";
251+
252+
// DeterminePartitionCount shouldn't put partition count when 2 * MIN_HASH_PARTITION_COUNT
253+
// is greater or equal to number of workers.
254+
assertDistributedPlan(
255+
query,
256+
Session.builder(getPlanTester().getDefaultSession())
257+
.setSystemProperty(MAX_HASH_PARTITION_COUNT, "8")
258+
.setSystemProperty(MIN_HASH_PARTITION_COUNT, "4")
259+
.setSystemProperty(MIN_INPUT_SIZE_PER_TASK, "20MB")
260+
.setSystemProperty(MIN_INPUT_ROWS_PER_TASK, "400")
261+
.build(),
262+
output(
263+
project(
264+
node(AggregationNode.class,
265+
exchange(LOCAL,
266+
exchange(REMOTE, REPARTITION, Optional.empty(),
267+
node(AggregationNode.class,
268+
node(TableScanNode.class))))))));
269+
270+
// DeterminePartitionCount should still put partition count for FTE
271+
assertDistributedPlan(
272+
query,
273+
Session.builder(getPlanTester().getDefaultSession())
274+
.setSystemProperty(RETRY_POLICY, "task")
275+
.setSystemProperty(MAX_HASH_PARTITION_COUNT, "8")
276+
.setSystemProperty(MIN_HASH_PARTITION_COUNT, "4")
277+
.setSystemProperty(MIN_INPUT_SIZE_PER_TASK, "20MB")
278+
.setSystemProperty(MIN_INPUT_ROWS_PER_TASK, "400")
279+
.build(),
280+
output(
281+
project(
282+
node(AggregationNode.class,
283+
exchange(LOCAL,
284+
exchange(REMOTE, REPARTITION, Optional.of(10),
285+
node(AggregationNode.class,
286+
node(TableScanNode.class))))))));
287+
}
288+
244289
@Test
245290
public void testPlanWhenTableStatisticsAreAbsent()
246291
{
@@ -539,15 +584,15 @@ SELECT count(column_a) FROM table_with_stats_a group by column_b
539584
Session.builder(getPlanTester().getDefaultSession())
540585
.setSystemProperty(RETRY_POLICY, "task")
541586
.setSystemProperty(FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT, "21")
542-
.setSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT, "11")
587+
.setSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT, "10")
543588
.setSystemProperty(MIN_INPUT_SIZE_PER_TASK, "20MB")
544589
.setSystemProperty(MIN_INPUT_ROWS_PER_TASK, "400")
545590
.build(),
546591
output(
547592
project(
548593
node(AggregationNode.class,
549594
exchange(LOCAL,
550-
exchange(REMOTE, REPARTITION, FIXED_HASH_DISTRIBUTION, Optional.of(11),
595+
exchange(REMOTE, REPARTITION, FIXED_HASH_DISTRIBUTION, Optional.of(10),
551596
node(AggregationNode.class,
552597
node(TableScanNode.class))))))));
553598
}
@@ -563,7 +608,7 @@ SELECT count(column_a) FROM table_with_stats_a group by column_b
563608
query,
564609
Session.builder(getPlanTester().getDefaultSession())
565610
.setSystemProperty(RETRY_POLICY, "task")
566-
.setSystemProperty(FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT, "8")
611+
.setSystemProperty(FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT, "9")
567612
.setSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT, "4")
568613
.setSystemProperty(MIN_INPUT_SIZE_PER_TASK, "20MB")
569614
.setSystemProperty(MIN_INPUT_ROWS_PER_TASK, "400")

core/trino-web-ui/src/main/resources/webapp-preview/src/api/webapp/api.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,19 @@ export interface Session {
206206
catalogProperties: { [key: string]: string | number | boolean }
207207
}
208208

209+
export interface QueryTable {
210+
catalog: string
211+
schema: string
212+
table: string
213+
authorization: string
214+
directlyReferenced: boolean
215+
}
216+
217+
export interface QueryRoutine {
218+
routine: string
219+
authorization: string
220+
}
221+
209222
export interface QueryStatusInfo extends QueryInfoBase {
210223
session: Session
211224
query: string
@@ -214,6 +227,8 @@ export interface QueryStatusInfo extends QueryInfoBase {
214227
retryPolicy: string
215228
pruned: boolean
216229
finalQueryInfo: boolean
230+
referencedTables: QueryTable[]
231+
routines: QueryRoutine[]
217232
}
218233

219234
export async function statsApi(): Promise<ApiResponse<Stats>> {
@@ -232,6 +247,6 @@ export async function queryApi(): Promise<ApiResponse<QueryInfo[]>> {
232247
return await api.get<QueryInfo[]>('/ui/api/query')
233248
}
234249

235-
export async function queryStatusApi(queryId: string): Promise<ApiResponse<QueryStatusInfo>> {
236-
return await api.get<QueryStatusInfo>(`/ui/api/query/${queryId}`)
250+
export async function queryStatusApi(queryId: string, pruned: boolean = false): Promise<ApiResponse<QueryStatusInfo>> {
251+
return await api.get<QueryStatusInfo>(`/ui/api/query/${queryId}${pruned ? '?pruned=true' : ''}`)
237252
}

core/trino-web-ui/src/main/resources/webapp-preview/src/components/QueryDetails.tsx

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import React, { ReactNode, useState } from 'react'
1515
import { useLocation, useParams } from 'react-router-dom'
1616
import { Alert, Box, Divider, Grid2 as Grid, Tabs, Tab, Typography } from '@mui/material'
1717
import { QueryJson } from './QueryJson'
18+
import { QueryReferences } from './QueryReferences'
1819
import { QueryOverview } from './QueryOverview'
1920
import { Texts } from '../constant.ts'
2021

@@ -26,7 +27,7 @@ const tabComponentMap: Record<TabValue, ReactNode> = {
2627
stagePerformance: <Alert severity="error">{Texts.Error.NotImplemented}</Alert>,
2728
splits: <Alert severity="error">{Texts.Error.NotImplemented}</Alert>,
2829
json: <QueryJson />,
29-
references: <Alert severity="error">{Texts.Error.NotImplemented}</Alert>,
30+
references: <QueryReferences />,
3031
}
3132
export const QueryDetails = () => {
3233
const { queryId } = useParams()
@@ -60,7 +61,7 @@ export const QueryDetails = () => {
6061
<Tab value="stagePerformance" label="Stage performance" disabled />
6162
<Tab value="splits" label="Splits" disabled />
6263
<Tab value="json" label="JSON" />
63-
<Tab value="references" label="References" disabled />
64+
<Tab value="references" label="References" />
6465
</Tabs>
6566
</Box>
6667
</Grid>
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
import { useParams } from 'react-router-dom'
15+
import { useEffect, useRef, useState } from 'react'
16+
import {
17+
Alert,
18+
Box,
19+
CircularProgress,
20+
Divider,
21+
Grid2 as Grid,
22+
Table,
23+
TableBody,
24+
TableCell,
25+
TableContainer,
26+
TableRow,
27+
Typography,
28+
} from '@mui/material'
29+
import { queryStatusApi, QueryRoutine, QueryStatusInfo, QueryTable } from '../api/webapp/api.ts'
30+
import { Texts } from '../constant.ts'
31+
import { ApiResponse } from '../api/base.ts'
32+
import { QueryProgressBar } from './QueryProgressBar.tsx'
33+
34+
interface IQueryStatus {
35+
info: QueryStatusInfo | null
36+
ended: boolean
37+
}
38+
39+
export const QueryReferences = () => {
40+
const { queryId } = useParams()
41+
const initialQueryStatus: IQueryStatus = {
42+
info: null,
43+
ended: false,
44+
}
45+
46+
const [queryStatus, setQueryStatus] = useState<IQueryStatus>(initialQueryStatus)
47+
48+
const [loading, setLoading] = useState<boolean>(true)
49+
const [error, setError] = useState<string | null>(null)
50+
const queryStatusRef = useRef(queryStatus)
51+
52+
useEffect(() => {
53+
queryStatusRef.current = queryStatus
54+
}, [queryStatus])
55+
56+
useEffect(() => {
57+
const runLoop = () => {
58+
const queryEnded = !!queryStatusRef.current.info?.finalQueryInfo
59+
if (!queryEnded) {
60+
getQueryStatus()
61+
setTimeout(runLoop, 3000)
62+
}
63+
}
64+
65+
if (queryId) {
66+
queryStatusRef.current = initialQueryStatus
67+
}
68+
69+
runLoop()
70+
// eslint-disable-next-line react-hooks/exhaustive-deps
71+
}, [queryId])
72+
73+
const getQueryStatus = () => {
74+
if (queryId) {
75+
queryStatusApi(queryId, false).then((apiResponse: ApiResponse<QueryStatusInfo>) => {
76+
setLoading(false)
77+
if (apiResponse.status === 200 && apiResponse.data) {
78+
setQueryStatus({
79+
info: apiResponse.data,
80+
ended: apiResponse.data.finalQueryInfo,
81+
})
82+
setError(null)
83+
} else {
84+
setError(`${Texts.Error.Communication} ${apiResponse.status}: ${apiResponse.message}`)
85+
}
86+
})
87+
}
88+
}
89+
90+
const renderReferencedTables = (tables: QueryTable[]) => {
91+
if (!tables || tables.length === 0) {
92+
return (
93+
<Box sx={{ width: '100%', mt: 1 }}>
94+
<Alert severity="info">No referenced tables.</Alert>
95+
</Box>
96+
)
97+
}
98+
99+
return (
100+
<TableContainer>
101+
<Table aria-label="simple table">
102+
<TableBody>
103+
{tables.map((table: QueryTable) => {
104+
const tableName = `${table.catalog}.${table.schema}.${table.table}`
105+
return (
106+
<TableRow key={tableName}>
107+
<TableCell sx={{ width: '50%' }}>{tableName}</TableCell>
108+
<TableCell sx={{ width: '50%' }}>
109+
{`Authorization: ${table.authorization}, Directly Referenced: ${table.directlyReferenced}`}
110+
</TableCell>
111+
</TableRow>
112+
)
113+
})}
114+
</TableBody>
115+
</Table>
116+
</TableContainer>
117+
)
118+
}
119+
120+
const renderRoutines = (routines: QueryRoutine[]) => {
121+
if (!routines || routines.length === 0) {
122+
return (
123+
<Box sx={{ width: '100%', mt: 1 }}>
124+
<Alert severity="info">No referenced routines.</Alert>
125+
</Box>
126+
)
127+
}
128+
129+
return (
130+
<TableContainer>
131+
<Table aria-label="simple table">
132+
<TableBody>
133+
{routines.map((routine: QueryRoutine, idx: number) => (
134+
<TableRow key={`${routine.routine}-${idx}`}>
135+
<TableCell sx={{ width: '50%' }}>{`${routine.routine}`}</TableCell>
136+
<TableCell sx={{ width: '50%' }}>{`Authorization: ${routine.authorization}`}</TableCell>
137+
</TableRow>
138+
))}
139+
</TableBody>
140+
</Table>
141+
</TableContainer>
142+
)
143+
}
144+
145+
return (
146+
<>
147+
{loading && <CircularProgress />}
148+
{error && <Alert severity="error">{Texts.Error.QueryNotFound}</Alert>}
149+
150+
{!loading && !error && queryStatus.info && (
151+
<Grid container spacing={0}>
152+
<Grid size={{ xs: 12 }}>
153+
<Box sx={{ pt: 2 }}>
154+
<Box sx={{ width: '100%' }}>
155+
<QueryProgressBar queryInfoBase={queryStatus.info} />
156+
</Box>
157+
158+
{queryStatus.ended ? (
159+
<Grid container spacing={3}>
160+
<Grid size={{ xs: 12, md: 12 }}>
161+
<Box sx={{ pt: 2 }}>
162+
<Typography variant="h6">Referenced Tables</Typography>
163+
<Divider />
164+
</Box>
165+
{renderReferencedTables(queryStatus.info.referencedTables)}
166+
<Box sx={{ pt: 2 }}>
167+
<Typography variant="h6">Routines</Typography>
168+
<Divider />
169+
</Box>
170+
{renderRoutines(queryStatus.info.routines)}
171+
</Grid>
172+
</Grid>
173+
) : (
174+
<>
175+
<Box sx={{ width: '100%', mt: 1 }}>
176+
<Alert severity="info">
177+
References will appear automatically when query completes.
178+
</Alert>
179+
</Box>
180+
</>
181+
)}
182+
</Box>
183+
</Grid>
184+
</Grid>
185+
)}
186+
</>
187+
)
188+
}

docs/src/main/sphinx/admin/event-listeners-openlineage.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ not limited to) Spark, Airflow, Flink.
3030
- Run Event Time
3131
*
3232
- Query Id
33-
- Job Facet Name
33+
- Job Facet Name (default, can be overriden)
3434
*
3535
- `trino:// + {openlineage-event-listener.trino.uri.getHost()} + ":" + {openlineage-event-listener.trino.uri.getPort()}`
3636
- Job Facet Namespace (default, can be overridden)
@@ -157,6 +157,13 @@ event-listener.config-files=etc/openlineage-event-listener.properties,...
157157
- Custom namespace to be used for Job `namespace` attribute. If blank will
158158
default to Dataset Namespace.
159159
- None.
160+
*
161+
- openlineage-event-listener.job.name-format
162+
- Custom namespace to use for the job `name` attribute.
163+
Use any string with, with optional substitution
164+
variables: `$QUERY_ID`, `$USER`, `$SOURCE`, `$CLIENT_IP`.
165+
For example: `As $USER from $CLIENT_IP via $SOURCE`.
166+
- `$QUERY_ID`.
160167

161168
:::
162169

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/IdentityCacheMapping.java renamed to lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/identity/IdentityCacheMapping.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
package io.trino.plugin.jdbc;
14+
package io.trino.plugin.base.cache.identity;
1515

1616
import io.trino.spi.connector.ConnectorSession;
1717

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/SingletonIdentityCacheMapping.java renamed to lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/identity/SingletonIdentityCacheMapping.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
package io.trino.plugin.jdbc;
14+
package io.trino.plugin.base.cache.identity;
1515

1616
import io.trino.spi.connector.ConnectorSession;
1717

0 commit comments

Comments
 (0)