Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,12 @@
<version>${dep.drift.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-transport-spi</artifactId>
<version>${dep.drift.version}</version>
</dependency>

<dependency>
<groupId>io.airlift.tpch</groupId>
<artifactId>tpch</artifactId>
Expand Down Expand Up @@ -1311,6 +1317,12 @@
<groupId>com.facebook.presto.cassandra</groupId>
<artifactId>cassandra-driver</artifactId>
<version>3.1.4-1</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down
10 changes: 10 additions & 0 deletions presto-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>transport-netty4-client</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down Expand Up @@ -157,6 +161,12 @@
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>transport-netty4-client</artifactId>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
<version>${dep.elasticsearch.version}</version>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.tests.AbstractTestDistributedQueries;
import com.google.common.collect.ImmutableMap;

import java.util.Optional;

import static com.facebook.presto.hive.HiveQueryRunner.createQueryRunner;
import static io.airlift.tpch.TpchTable.getTables;

public class TestHiveDistributedQueriesWithThriftRpc
extends AbstractTestDistributedQueries
{
public TestHiveDistributedQueriesWithThriftRpc()
{
super(() -> createQueryRunner(
getTables(),
ImmutableMap.of(
"internal-communication.task-communication-protocol", "THRIFT",
"internal-communication.server-info-communication-protocol", "THRIFT"),
ImmutableMap.of(),
Optional.empty()));
}

@Override
protected boolean supportsNotNullColumns()
{
return false;
}

@Override
public void testDelete()
{
// Hive connector currently does not support row-by-row delete
}

// Hive specific tests should normally go in TestHiveIntegrationSmokeTest
}
30 changes: 30 additions & 0 deletions presto-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,36 @@
<artifactId>joda-to-java-time-bridge</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-server</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-transport-netty</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-transport-spi</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-client</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-codec</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-api</artifactId>
</dependency>

<dependency>
<groupId>com.teradata</groupId>
<artifactId>re2j-td</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ public interface LocationFactory

URI createLocalTaskLocation(TaskId taskId);

/**
* TODO: this method is required since not not all RPC call is supported by thrift.
* It should be merged into {@code createTaskLocation} once full thrift support is in-place for v1/task
*/
@Deprecated
URI createLegacyTaskLocation(InternalNode node, TaskId taskId);

/**
* TODO: implement full thrift support for v1/task
*/
URI createTaskLocation(InternalNode node, TaskId taskId);

URI createMemoryInfoLocation(InternalNode node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListenableFuture;

import java.net.URI;

public interface RemoteTask
{
TaskId getTaskId();
Expand All @@ -30,6 +32,11 @@ public interface RemoteTask

TaskStatus getTaskStatus();

/**
* TODO: this should be merged into getTaskStatus once full thrift support is in-place for v1/task
*/
URI getRemoteTaskLocation();

void start();

void addSplits(Multimap<PlanNodeId, Split> splits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ public synchronized void addExchangeLocations(PlanFragmentId fragmentId, Set<Rem
ImmutableMultimap.Builder<PlanNodeId, Split> newSplits = ImmutableMultimap.builder();
for (RemoteTask sourceTask : sourceTasks) {
TaskStatus sourceTaskStatus = sourceTask.getTaskStatus();
newSplits.put(remoteSource.getId(), createRemoteSplitFor(task.getTaskId(), sourceTaskStatus.getSelf(), sourceTaskStatus.getTaskId()));
newSplits.put(remoteSource.getId(), createRemoteSplitFor(task.getTaskId(), sourceTask.getRemoteTaskLocation(), sourceTaskStatus.getTaskId()));
}
task.addSplits(newSplits.build());
}
Expand Down Expand Up @@ -491,7 +491,7 @@ private synchronized RemoteTask scheduleTask(InternalNode node, TaskId taskId, M
sourceTasks.forEach((planNodeId, task) -> {
TaskStatus status = task.getTaskStatus();
if (status.getState() != TaskState.FINISHED) {
initialSplits.put(planNodeId, createRemoteSplitFor(taskId, status.getSelf(), status.getTaskId()));
initialSplits.put(planNodeId, createRemoteSplitFor(taskId, task.getRemoteTaskLocation(), status.getTaskId()));
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
*/
package com.facebook.presto.execution;

import com.facebook.drift.annotations.ThriftConstructor;
import com.facebook.drift.annotations.ThriftField;
import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.spi.QueryId;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
Expand All @@ -25,6 +28,7 @@
import static java.lang.Integer.parseInt;
import static java.util.Objects.requireNonNull;

@ThriftStruct
public class StageExecutionId
{
private final StageId stageId;
Expand All @@ -42,17 +46,20 @@ public static StageExecutionId valueOf(List<String> ids)
return new StageExecutionId(new StageId(new QueryId(ids.get(0)), parseInt(ids.get(1))), parseInt(ids.get(2)));
}

@ThriftConstructor
public StageExecutionId(StageId stageId, int id)
{
this.stageId = requireNonNull(stageId, "stageId is null");
this.id = id;
}

@ThriftField(1)
public StageId getStageId()
{
return stageId;
}

@ThriftField(2)
public int getId()
{
return id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
*/
package com.facebook.presto.execution;

import com.facebook.drift.annotations.ThriftConstructor;
import com.facebook.drift.annotations.ThriftField;
import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.spi.QueryId;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
Expand All @@ -23,6 +26,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

@ThriftStruct
public class StageId
{
@JsonCreator
Expand All @@ -47,11 +51,24 @@ public StageId(QueryId queryId, int id)
this.id = id;
}

@ThriftConstructor
public StageId(String queryId, int id)
{
this(QueryId.valueOf(queryId), id);
}

public QueryId getQueryId()
{
return queryId;
}

@ThriftField(value = 1, name = "queryId")
public String getQueryIdString()
{
return queryId.toString();
}

@ThriftField(2)
public int getId()
{
return id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
*/
package com.facebook.presto.execution;

import com.facebook.drift.annotations.ThriftConstructor;
import com.facebook.drift.annotations.ThriftField;
import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.spi.QueryId;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
Expand All @@ -25,6 +28,7 @@
import static java.lang.Integer.parseInt;
import static java.util.Objects.requireNonNull;

@ThriftStruct
public class TaskId
{
private final StageExecutionId stageExecutionId;
Expand All @@ -42,18 +46,21 @@ public TaskId(String queryId, int stageId, int stageExecutionId, int id)
this(new StageExecutionId(new StageId(new QueryId(queryId), stageId), stageExecutionId), id);
}

@ThriftConstructor
public TaskId(StageExecutionId stageExecutionId, int id)
{
this.stageExecutionId = requireNonNull(stageExecutionId, "stageExecutionId");
checkArgument(id >= 0, "id is negative");
this.id = id;
}

@ThriftField(1)
public StageExecutionId getStageExecutionId()
{
return stageExecutionId;
}

@ThriftField(2)
public int getId()
{
return id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
*/
package com.facebook.presto.execution.buffer;

import com.facebook.drift.annotations.ThriftConstructor;
import com.facebook.drift.annotations.ThriftField;
import com.facebook.drift.annotations.ThriftStruct;
import com.google.common.collect.ImmutableList;

import java.util.List;
Expand All @@ -23,6 +26,7 @@
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.Objects.requireNonNull;

@ThriftStruct
public class BufferResult
{
public static BufferResult emptyResults(String taskInstanceId, long token, boolean bufferComplete)
Expand All @@ -36,6 +40,7 @@ public static BufferResult emptyResults(String taskInstanceId, long token, boole
private final boolean bufferComplete;
private final List<SerializedPage> serializedPages;

@ThriftConstructor
public BufferResult(String taskInstanceId, long token, long nextToken, boolean bufferComplete, List<SerializedPage> serializedPages)
{
checkArgument(!isNullOrEmpty(taskInstanceId), "taskInstanceId is null");
Expand All @@ -47,21 +52,31 @@ public BufferResult(String taskInstanceId, long token, long nextToken, boolean b
this.serializedPages = ImmutableList.copyOf(requireNonNull(serializedPages, "serializedPages is null"));
}

@ThriftField(1)
public String getTaskInstanceId()
{
return taskInstanceId;
}

@ThriftField(2)
public long getToken()
{
return token;
}

@ThriftField(3)
public long getNextToken()
{
return nextToken;
}

@ThriftField(4)
public boolean isBufferComplete()
{
return bufferComplete;
}

@ThriftField(5)
public List<SerializedPage> getSerializedPages()
{
return serializedPages;
Expand All @@ -77,11 +92,6 @@ public boolean isEmpty()
return serializedPages.isEmpty();
}

public String getTaskInstanceId()
{
return taskInstanceId;
}

@Override
public boolean equals(Object o)
{
Expand Down
Loading