Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
87b8e6f
Add Redshift tests
dain Dec 11, 2022
0312463
Add Redshift schema, table, and column length checks
dain Dec 11, 2022
7eab69a
Implement proper type mapping for Redshift
dain Dec 11, 2022
1e8887e
Implement Redshift DELETE
dain Dec 11, 2022
e65585d
Add Redshift statistics
dain Dec 11, 2022
c225b8f
Add Redshift pushdown
dain Dec 10, 2022
be62233
Test SET PATH support by clients
wendigo Dec 7, 2022
dfe33c7
Fix HTTP_Status on OAuth2 refresh token redirect
huberty89 Dec 7, 2022
c056bc7
Fix recording of projection metrics
raunaqmorarka Dec 10, 2022
a46a510
Fix dereference operations for union type in Hive Connector
groupcache4321 Dec 2, 2022
0459735
Cleanup PartitioningExchanger
pettyjamesm Dec 9, 2022
1788829
Fix table name in TestDropTableTask
krvikash Dec 12, 2022
3c1c1fa
Document examples for datetime functions
rigogsilva Dec 12, 2022
58bd159
Decode path as URI in Delta Lake connector
jkylling Nov 25, 2022
35da905
Remove unused method from AstBuilder
ebyhr Dec 13, 2022
ce7b57f
Refactor BigQuery connector
ebyhr Nov 18, 2022
a660133
Fix projection pushdown when unsupported column exists in BigQuery
ebyhr Nov 19, 2022
cd8eac6
Update Iceberg to 1.1.0
Fokko Nov 17, 2022
b9d26a7
Document Top-N pushdown
findinpath Jul 5, 2021
f5b5fb8
Remove unnecessary override for `getTableProperties` method
findinpath Dec 14, 2022
1a79def
Reproduce kafka protobuf schema parsing stack overflow error
mx123 Dec 5, 2022
e669359
Reproduce NPE on Kafka sending when proto file has `import`
mx123 Dec 7, 2022
e5b3590
Use declared variable for descriptor
mx123 Dec 7, 2022
4cad364
Prevent protobuf schema message parsing dead loop recursion
mx123 Dec 5, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@

import okhttp3.OkHttpClient;

import java.util.Optional;
import java.util.Set;

public final class StatementClientFactory
{
private StatementClientFactory() {}

public static StatementClient newStatementClient(OkHttpClient httpClient, ClientSession session, String query)
{
return new StatementClientV1(httpClient, session, query);
return new StatementClientV1(httpClient, session, query, Optional.empty());
}

public static StatementClient newStatementClient(OkHttpClient httpClient, ClientSession session, String query, Optional<Set<String>> clientCapabilities)
{
return new StatementClientV1(httpClient, session, query, clientCapabilities);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.net.HttpHeaders.ACCEPT_ENCODING;
import static com.google.common.net.HttpHeaders.USER_AGENT;
import static io.trino.client.JsonCodec.jsonCodec;
Expand All @@ -56,6 +57,7 @@
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
import static java.util.Arrays.stream;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

Expand Down Expand Up @@ -93,7 +95,7 @@ class StatementClientV1

private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);

public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query)
public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query, Optional<Set<String>> clientCapabilities)
{
requireNonNull(httpClient, "httpClient is null");
requireNonNull(session, "session is null");
Expand All @@ -107,7 +109,9 @@ public StatementClientV1(OkHttpClient httpClient, ClientSession session, String
.filter(Optional::isPresent)
.map(Optional::get)
.findFirst();
this.clientCapabilities = Joiner.on(",").join(ClientCapabilities.values());
this.clientCapabilities = Joiner.on(",").join(clientCapabilities.orElseGet(() -> stream(ClientCapabilities.values())
.map(Enum::name)
.collect(toImmutableSet())));
this.compressionDisabled = session.isCompressionDisabled();

Request request = buildQueryRequest(session, query);
Expand Down
1 change: 1 addition & 0 deletions core/trino-main/src/main/java/io/trino/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ private SessionBuilder(Session session)
this.remoteUserAddress = session.remoteUserAddress.orElse(null);
this.userAgent = session.userAgent.orElse(null);
this.clientInfo = session.clientInfo.orElse(null);
this.clientCapabilities = ImmutableSet.copyOf(session.clientCapabilities);
this.clientTags = ImmutableSet.copyOf(session.clientTags);
this.start = session.start;
this.systemProperties.putAll(session.systemProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.SchemaTableName;
Expand Down Expand Up @@ -139,12 +138,6 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
return builder.buildOrThrow();
}

@Override
public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table)
{
return new ConnectorTableProperties();
}

@Override
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(ConnectorSession session, ConnectorTableHandle handle, Constraint constraint)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.trino.spi.Page;
import it.unimi.dsi.fastutil.ints.IntArrayList;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import java.util.List;
Expand Down Expand Up @@ -58,18 +57,7 @@ public PartitioningExchanger(
@Override
public void accept(Page page)
{
Consumer<Page> wholePagePartition = partitionPageOrFindWholePagePartition(page, partitionedPagePreparer.apply(page));
if (wholePagePartition != null) {
// whole input page will go to this partition, compact the input page avoid over-retaining memory and to
// match the behavior of sub-partitioned pages that copy positions out
page.compact();
sendPageToPartition(wholePagePartition, page);
}
}

@Nullable
private Consumer<Page> partitionPageOrFindWholePagePartition(Page page, Page partitionPage)
{
Page partitionPage = partitionedPagePreparer.apply(page);
// assign each row to a partition. The assignments lists are all expected to cleared by the previous iterations
for (int position = 0; position < partitionPage.getPositionCount(); position++) {
int partition = partitionFunction.getPartition(partitionPage, position);
Expand All @@ -89,22 +77,19 @@ private Consumer<Page> partitionPageOrFindWholePagePartition(Page page, Page par
int[] positions = positionsList.elements();
positionsList.clear();

Page pageSplit;
if (partitionSize == page.getPositionCount()) {
// entire page will be sent to this partition, compact and send the page after releasing the lock
return buffers.get(partition);
// whole input page will go to this partition, compact the input page avoid over-retaining memory and to
// match the behavior of sub-partitioned pages that copy positions out
page.compact();
pageSplit = page;
}
Page pageSplit = page.copyPositions(positions, 0, partitionSize);
sendPageToPartition(buffers.get(partition), pageSplit);
else {
pageSplit = page.copyPositions(positions, 0, partitionSize);
}
memoryManager.updateMemoryUsage(pageSplit.getRetainedSizeInBytes());
buffers.get(partition).accept(pageSplit);
}
// No single partition receives the entire input page
return null;
}

// This is safe to call without synchronizing because the partition buffers are themselves threadsafe
private void sendPageToPartition(Consumer<Page> buffer, Page pageSplit)
{
memoryManager.updateMemoryUsage(pageSplit.getRetainedSizeInBytes());
buffer.accept(pageSplit);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,13 +342,15 @@ private ProcessBatchResult processBatch(int batchSize)
}
else {
if (pageProjectWork == null) {
Page inputPage = projection.getInputChannels().getInputChannels(page);
expressionProfiler.start();
pageProjectWork = projection.project(session, yieldSignal, inputPage, positionsBatch);
long projectionTimeNanos = expressionProfiler.stop(positionsBatch.size());
metrics.recordProjectionTime(projectionTimeNanos);
pageProjectWork = projection.project(session, yieldSignal, projection.getInputChannels().getInputChannels(page), positionsBatch);
}
if (!pageProjectWork.process()) {

expressionProfiler.start();
boolean finished = pageProjectWork.process();
long projectionTimeNanos = expressionProfiler.stop(positionsBatch.size());
metrics.recordProjectionTime(projectionTimeNanos);

if (!finished) {
return ProcessBatchResult.processBatchYield();
}
previouslyComputedResults[i] = pageProjectWork.getResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private void redirectForNewToken(ContainerRequestContext request, String refresh
{
OAuth2Client.Response response = client.refreshTokens(refreshToken);
String serializedToken = tokenPairSerializer.serialize(TokenPair.fromOAuth2Response(response));
request.abortWith(Response.seeOther(request.getUriInfo().getRequestUri())
request.abortWith(Response.temporaryRedirect(request.getUriInfo().getRequestUri())
.cookie(OAuthWebUiCookie.create(serializedToken, tokenExpiration.map(expiration -> Instant.now().plus(expiration)).orElse(response.getExpiration())))
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableLayout;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.MaterializedViewNotFoundException;
Expand Down Expand Up @@ -331,12 +330,6 @@ public void grantTablePrivileges(ConnectorSession session, SchemaTableName table
@Override
public void revokeTablePrivileges(ConnectorSession session, SchemaTableName tableName, Set<Privilege> privileges, TrinoPrincipal grantee, boolean grantOption) {}

@Override
public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table)
{
return new ConnectorTableProperties();
}

public void clear()
{
views.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@

import io.trino.Session;
import io.trino.Session.SessionBuilder;
import io.trino.client.ClientCapabilities;
import io.trino.execution.QueryIdGenerator;
import io.trino.metadata.SessionPropertyManager;
import io.trino.spi.security.Identity;
import io.trino.spi.type.TimeZoneKey;

import java.util.Arrays;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.Locale.ENGLISH;

public final class TestingSession
Expand Down Expand Up @@ -54,6 +58,8 @@ public static SessionBuilder testSessionBuilder(SessionPropertyManager sessionPr
.setSchema("schema")
.setTimeZoneKey(DEFAULT_TIME_ZONE_KEY)
.setLocale(ENGLISH)
.setClientCapabilities(Arrays.stream(ClientCapabilities.values()).map(Enum::name)
.collect(toImmutableSet()))
.setRemoteUserAddress("address")
.setUserAgent("agent");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class TestDropTableTask
@Test
public void testDropExistingTable()
{
QualifiedObjectName tableName = qualifiedObjectName("not_existing_table");
QualifiedObjectName tableName = qualifiedObjectName("existing_table");
metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), false);
assertThat(metadata.getTableHandle(testSession, tableName)).isPresent();

Expand Down
Loading