Skip to content
52 changes: 52 additions & 0 deletions presto-main/src/main/java/com/facebook/presto/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;
Comment thread
dain marked this conversation as resolved.
Outdated

import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.util.Failures.checkCondition;
Expand Down Expand Up @@ -344,6 +345,57 @@ public Session beginTransactionId(TransactionId transactionId, TransactionManage
preparedStatements);
}

public Session withDefaultProperties(Map<String, String> systemPropertyDefaults, Map<String, Map<String, String>> catalogPropertyDefaults)
{
requireNonNull(systemPropertyDefaults, "systemPropertyDefaults is null");
requireNonNull(catalogPropertyDefaults, "catalogPropertyDefaults is null");

// to remove this check properties must be authenticated and validated as in beginTransactionId
checkState(
!this.transactionId.isPresent() && this.connectorProperties.isEmpty(),
"Session properties cannot be overridden once a transaction is active");

Map<String, String> systemProperties = new HashMap<>();
systemProperties.putAll(systemPropertyDefaults);
systemProperties.putAll(this.systemProperties);

Map<String, Map<String, String>> connectorProperties = catalogPropertyDefaults.entrySet().stream()
.map(entry -> Maps.immutableEntry(entry.getKey(), new HashMap<>(entry.getValue())))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
for (Entry<String, Map<String, String>> catalogProperties : this.unprocessedCatalogProperties.entrySet()) {
String catalog = catalogProperties.getKey();
for (Entry<String, String> entry : catalogProperties.getValue().entrySet()) {
connectorProperties.computeIfAbsent(catalog, id -> new HashMap<>())
.put(entry.getKey(), entry.getValue());
}
}

return new Session(
queryId,
transactionId,
clientTransactionSupport,
identity,
source,
catalog,
schema,
path,
traceToken,
timeZoneKey,
locale,
remoteUserAddress,
userAgent,
clientInfo,
clientTags,
clientCapabilities,
resourceEstimates,
startTime,
systemProperties,
ImmutableMap.of(),
connectorProperties,
sessionPropertyManager,
preparedStatements);
}

public ConnectorSession toConnectorSession()
{
return new FullConnectorSession(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public String getName()
@Override
public ListenableFuture<?> execute(Call call, TransactionManager transactionManager, Metadata metadata, AccessControl accessControl, QueryStateMachine stateMachine, List<Expression> parameters)
{
if (!stateMachine.isAutoCommit()) {
if (!transactionManager.isAutoCommit(stateMachine.getSession().getRequiredTransactionId())) {
throw new PrestoException(NOT_SUPPORTED, "Procedures cannot be called within a transaction (use autocommit mode)");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import javax.inject.Inject;
Expand Down Expand Up @@ -116,6 +117,30 @@ public DataSize getTotalMemoryReservation()
return new DataSize(0, BYTE);
}

@Override
public DateTime getCreateTime()
{
return stateMachine.getCreateTime();
}

@Override
public Optional<DateTime> getExecutionStartTime()
{
return stateMachine.getExecutionStartTime();
}

@Override
public DateTime getLastHeartbeat()
{
return stateMachine.getLastHeartbeat();
}

@Override
public Optional<DateTime> getEndTime()
{
return stateMachine.getEndTime();
}

@Override
public Duration getTotalCpuTime()
{
Expand Down Expand Up @@ -231,11 +256,7 @@ public QueryId getQueryId()
@Override
public QueryInfo getQueryInfo()
{
Optional<QueryInfo> finalQueryInfo = stateMachine.getFinalQueryInfo();
if (finalQueryInfo.isPresent()) {
return finalQueryInfo.get();
}
return stateMachine.updateQueryInfo(Optional.empty());
return stateMachine.getFinalQueryInfo().orElseGet(() -> stateMachine.updateQueryInfo(Optional.empty()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,23 @@
import com.facebook.presto.Session;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.memory.VersionedMemoryPoolId;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.spi.ErrorCode;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.joda.time.DateTime;

import java.net.URI;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.function.Consumer;

import static com.facebook.presto.execution.QueryInfo.immediateFailureQueryInfo;
Comment thread
dain marked this conversation as resolved.
Outdated
import static com.facebook.presto.execution.QueryState.FAILED;
import static com.facebook.presto.memory.LocalMemoryManager.GENERAL_POOL;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.airlift.units.DataSize.Unit.BYTE;
Expand All @@ -43,17 +44,16 @@ public class FailedQueryExecution
{
private final QueryInfo queryInfo;
private final Session session;
private final Executor executor;
private final Optional<ResourceGroupId> resourceGroup;
private final Executor executor;

public FailedQueryExecution(QueryId queryId, String query, Optional<ResourceGroupId> resourceGroup, Session session, URI self, TransactionManager transactionManager, Executor executor, Metadata metadata, Throwable cause)
public FailedQueryExecution(Session session, String query, URI self, Optional<ResourceGroupId> resourceGroup, Executor executor, Throwable cause)
{
requireNonNull(cause, "cause is null");
this.session = requireNonNull(session, "session is null");
this.executor = requireNonNull(executor, "executor is null");
QueryStateMachine queryStateMachine = QueryStateMachine.failed(queryId, query, session, self, transactionManager, executor, metadata, cause);
queryInfo = queryStateMachine.updateQueryInfo(Optional.empty());
this.resourceGroup = requireNonNull(resourceGroup, "resourceGroup is null");
this.executor = requireNonNull(executor, "executor is null");
this.queryInfo = immediateFailureQueryInfo(session, query, self, cause);
}

@Override
Expand Down Expand Up @@ -116,6 +116,30 @@ public Session getSession()
return session;
}

@Override
public DateTime getCreateTime()
{
return queryInfo.getQueryStats().getCreateTime();
}

@Override
public Optional<DateTime> getExecutionStartTime()
{
return Optional.ofNullable(queryInfo.getQueryStats().getExecutionStartTime());
}

@Override
public DateTime getLastHeartbeat()
{
return queryInfo.getQueryStats().getLastHeartbeat();
}

@Override
public Optional<DateTime> getEndTime()
{
return Optional.ofNullable(queryInfo.getQueryStats().getEndTime());
}

@Override
public Optional<ErrorCode> getErrorCode()
{
Expand Down Expand Up @@ -149,7 +173,7 @@ public ListenableFuture<QueryState> getStateChange(QueryState currentState)
@Override
public void addStateChangeListener(StateChangeListener<QueryState> stateChangeListener)
{
executor.execute(() -> stateChangeListener.stateChanged(QueryState.FAILED));
executor.execute(() -> stateChangeListener.stateChanged(FAILED));
}

@Override
Expand Down Expand Up @@ -203,6 +227,6 @@ public Optional<ResourceGroupId> getResourceGroup()
@Override
public void setResourceGroup(ResourceGroupId resourceGroupId)
{
throw new UnsupportedOperationException("setResouceGroup is not supported for FailedQueryExecution");
throw new UnsupportedOperationException("setResourceGroup is not supported for FailedQueryExecution");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.airlift.concurrent.SetThreadName;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.joda.time.DateTime;

import javax.annotation.concurrent.GuardedBy;

Expand Down Expand Up @@ -58,6 +59,12 @@ public QueryId getQueryId()
return delegate.getQueryId();
}

@Override
public Session getSession()
{
return delegate.getSession();
}

@Override
public QueryInfo getQueryInfo()
{
Expand Down Expand Up @@ -125,9 +132,27 @@ public BasicQueryInfo getBasicQueryInfo()
}

@Override
public Session getSession()
public DateTime getCreateTime()
{
return delegate.getSession();
return delegate.getCreateTime();
}

@Override
public Optional<DateTime> getExecutionStartTime()
{
return delegate.getExecutionStartTime();
}

@Override
public DateTime getLastHeartbeat()
{
return delegate.getLastHeartbeat();
}

@Override
public Optional<DateTime> getEndTime()
{
return delegate.getEndTime();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.execution;

import com.facebook.presto.Session;
import com.facebook.presto.execution.QueryTracker.TrackedQuery;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.execution.warnings.WarningCollector;
import com.facebook.presto.memory.VersionedMemoryPoolId;
Expand All @@ -36,10 +37,8 @@
import static java.util.Objects.requireNonNull;

public interface QueryExecution
extends ManagedQueryExecution
extends ManagedQueryExecution, TrackedQuery
{
QueryId getQueryId();

QueryState getState();

ListenableFuture<QueryState> getStateChange(QueryState currentState);
Expand All @@ -62,11 +61,6 @@ public interface QueryExecution

void recordHeartbeat();

// XXX: This should be removed when the client protocol is improved, so that we don't need to hold onto so much query history
void pruneInfo();

void addStateChangeListener(StateChangeListener<QueryState> stateChangeListener);

void addFinalQueryInfoListener(StateChangeListener<QueryInfo> stateChangeListener);

interface QueryExecutionFactory<T extends QueryExecution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.execution;

import com.facebook.presto.Session;
import com.facebook.presto.SessionRepresentation;
import com.facebook.presto.spi.ErrorCode;
import com.facebook.presto.spi.ErrorType;
Expand All @@ -36,7 +37,11 @@
import java.util.Optional;
import java.util.Set;

import static com.facebook.presto.execution.QueryState.FAILED;
import static com.facebook.presto.execution.QueryStats.immediateFailureQueryStats;
import static com.facebook.presto.execution.StageInfo.getAllStages;
import static com.facebook.presto.memory.LocalMemoryManager.GENERAL_POOL;
import static com.facebook.presto.util.Failures.toFailure;
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -153,6 +158,41 @@ public QueryInfo(
this.resourceGroupId = resourceGroupId;
}

public static QueryInfo immediateFailureQueryInfo(Session session, String query, URI self, Throwable throwable)
{
ExecutionFailureInfo failureCause = toFailure(throwable);
QueryInfo queryInfo = new QueryInfo(
session.getQueryId(),
session.toSessionRepresentation(),
FAILED,
GENERAL_POOL,
false,
self,
ImmutableList.of(),
query,
immediateFailureQueryStats(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
ImmutableMap.of(),
ImmutableSet.of(),
ImmutableMap.of(),
ImmutableSet.of(),
Optional.empty(),
false,
null,
Optional.empty(),
failureCause,
failureCause.getErrorCode(),
ImmutableList.of(),
ImmutableSet.of(),
Optional.empty(),
true,
Optional.empty());

return queryInfo;
}

@JsonProperty
public QueryId getQueryId()
{
Expand Down
Loading