Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public Module getMetadataModule()
return new DatabaseMetadataModule();
}

public Map<String, Module> getFileSystemProviders()
{
return ImmutableMap.of();
}

public Map<String, Module> getBackupProviders()
{
return ImmutableMap.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.airlift.bootstrap.Bootstrap;
import com.facebook.airlift.json.JsonModule;
import com.facebook.presto.raptor.backup.BackupModule;
import com.facebook.presto.raptor.filesystem.FileSystemModule;
import com.facebook.presto.raptor.security.RaptorSecurityModule;
import com.facebook.presto.raptor.storage.StorageModule;
import com.facebook.presto.raptor.util.RebindSafeMBeanServer;
Expand Down Expand Up @@ -46,13 +47,15 @@ public class RaptorConnectorFactory
{
private final String name;
private final Module metadataModule;
private final Map<String, Module> fileSystemProviders;
private final Map<String, Module> backupProviders;

public RaptorConnectorFactory(String name, Module metadataModule, Map<String, Module> backupProviders)
public RaptorConnectorFactory(String name, Module metadataModule, Map<String, Module> fileSystemProviders, Map<String, Module> backupProviders)
{
checkArgument(!isNullOrEmpty(name), "name is null or empty");
this.name = name;
this.metadataModule = requireNonNull(metadataModule, "metadataModule is null");
this.fileSystemProviders = requireNonNull(fileSystemProviders, "fileSystemProviders is null");
this.backupProviders = ImmutableMap.copyOf(requireNonNull(backupProviders, "backupProviders is null"));
}

Expand Down Expand Up @@ -84,6 +87,7 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
binder.bind(TypeManager.class).toInstance(context.getTypeManager());
},
metadataModule,
new FileSystemModule(fileSystemProviders),
new BackupModule(backupProviders),
new StorageModule(catalogName),
new RaptorModule(catalogName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public enum RaptorErrorCode
RAPTOR_NOT_ENOUGH_NODES(14, EXTERNAL),
RAPTOR_WRITER_DATA_ERROR(15, EXTERNAL),
RAPTOR_UNSUPPORTED_COMPRESSION_KIND(16, EXTERNAL),
RAPTOR_LOCAL_FILE_SYSTEM_ERROR(17, EXTERNAL),
RAPTOR_FILE_SYSTEM_ERROR(17, EXTERNAL),
RAPTOR_TOO_MANY_FILES_CREATED(18, EXTERNAL);

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.raptor;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.raptor.filesystem.FileSystemContext;
import com.facebook.presto.raptor.metadata.ShardInfo;
import com.facebook.presto.raptor.storage.StorageManager;
import com.facebook.presto.raptor.storage.organization.TemporalFunction;
Expand Down Expand Up @@ -71,10 +72,12 @@ public class RaptorPageSink
private final Optional<Type> temporalColumnType;
private final TemporalFunction temporalFunction;
private final int maxAllowedFilesPerWriter;
private final FileSystemContext context;

private final PageWriter pageWriter;

public RaptorPageSink(
FileSystemContext context,
PageSorter pageSorter,
StorageManager storageManager,
TemporalFunction temporalFunction,
Expand Down Expand Up @@ -103,6 +106,7 @@ public RaptorPageSink(

this.bucketCount = bucketCount;
this.bucketFields = bucketColumnIds.stream().mapToInt(columnIds::indexOf).toArray();
this.context = requireNonNull(context, "context is null");

if (temporalColumnHandle.isPresent() && columnIds.contains(temporalColumnHandle.get().getColumnId())) {
temporalColumnIndex = OptionalInt.of(columnIds.indexOf(temporalColumnHandle.get().getColumnId()));
Expand Down Expand Up @@ -169,7 +173,7 @@ private PageBuffer createPageBuffer(OptionalInt bucketNumber)
{
return new PageBuffer(
maxBufferBytes,
storageManager.createStoragePageSink(transactionId, bucketNumber, columnIds, columnTypes, true),
storageManager.createStoragePageSink(context, transactionId, bucketNumber, columnIds, columnTypes, true),
columnTypes,
sortFields,
sortOrders,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.raptor;

import com.facebook.presto.raptor.filesystem.FileSystemContext;
import com.facebook.presto.raptor.storage.StorageManager;
import com.facebook.presto.raptor.storage.StorageManagerConfig;
import com.facebook.presto.raptor.storage.organization.TemporalFunction;
Expand Down Expand Up @@ -60,6 +61,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa

RaptorOutputTableHandle handle = (RaptorOutputTableHandle) tableHandle;
return new RaptorPageSink(
new FileSystemContext(session),
pageSorter,
storageManager,
temporalFunction,
Expand All @@ -82,6 +84,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa

RaptorInsertTableHandle handle = (RaptorInsertTableHandle) tableHandle;
return new RaptorPageSink(
new FileSystemContext(session),
pageSorter,
storageManager,
temporalFunction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.raptor;

import com.facebook.presto.raptor.filesystem.FileSystemContext;
import com.facebook.presto.raptor.storage.ReaderAttributes;
import com.facebook.presto.raptor.storage.StorageManager;
import com.facebook.presto.raptor.util.ConcatPageSource;
Expand Down Expand Up @@ -60,19 +61,22 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti
OptionalLong transactionId = raptorSplit.getTransactionId();
Optional<Map<String, Type>> columnTypes = raptorSplit.getColumnTypes();

FileSystemContext context = new FileSystemContext(session);

if (raptorSplit.getShardUuids().size() == 1) {
UUID shardUuid = raptorSplit.getShardUuids().iterator().next();
return createPageSource(shardUuid, bucketNumber, columns, predicate, attributes, transactionId, columnTypes);
return createPageSource(context, shardUuid, bucketNumber, columns, predicate, attributes, transactionId, columnTypes);
}

Iterator<ConnectorPageSource> iterator = raptorSplit.getShardUuids().stream()
.map(shardUuid -> createPageSource(shardUuid, bucketNumber, columns, predicate, attributes, transactionId, columnTypes))
.map(shardUuid -> createPageSource(context, shardUuid, bucketNumber, columns, predicate, attributes, transactionId, columnTypes))
.iterator();

return new ConcatPageSource(iterator);
}

private ConnectorPageSource createPageSource(
FileSystemContext context,
UUID shardUuid,
OptionalInt bucketNumber,
List<ColumnHandle> columns,
Expand All @@ -85,6 +89,6 @@ private ConnectorPageSource createPageSource(
List<Long> columnIds = columnHandles.stream().map(RaptorColumnHandle::getColumnId).collect(toList());
List<Type> columnTypes = columnHandles.stream().map(RaptorColumnHandle::getColumnType).collect(toList());

return storageManager.getPageSource(shardUuid, bucketNumber, columnIds, columnTypes, predicate, attributes, transactionId, allColumnTypes);
return storageManager.getPageSource(context, shardUuid, bucketNumber, columnIds, columnTypes, predicate, attributes, transactionId, allColumnTypes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class RaptorPlugin
{
private final String name;
private final Module metadataModule;
private final Map<String, Module> fileSystemProviders;
private final Map<String, Module> backupProviders;

public RaptorPlugin()
Expand All @@ -42,21 +43,22 @@ public RaptorPlugin()

private RaptorPlugin(PluginInfo info)
{
this(info.getName(), info.getMetadataModule(), info.getBackupProviders());
this(info.getName(), info.getMetadataModule(), info.getFileSystemProviders(), info.getBackupProviders());
}

public RaptorPlugin(String name, Module metadataModule, Map<String, Module> backupProviders)
public RaptorPlugin(String name, Module metadataModule, Map<String, Module> fileSystemProviders, Map<String, Module> backupProviders)
{
checkArgument(!isNullOrEmpty(name), "name is null or empty");
this.name = name;
this.metadataModule = requireNonNull(metadataModule, "metadataModule is null");
this.fileSystemProviders = requireNonNull(fileSystemProviders, "fileSystemProviders is null");
this.backupProviders = ImmutableMap.copyOf(requireNonNull(backupProviders, "backupProviders is null"));
}

@Override
public Iterable<ConnectorFactory> getConnectorFactories()
{
return ImmutableList.of(new RaptorConnectorFactory(name, metadataModule, backupProviders));
return ImmutableList.of(new RaptorConnectorFactory(name, metadataModule, fileSystemProviders, backupProviders));
}

private static PluginInfo getPluginInfo()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_BACKUP_CORRUPTION;
import static com.facebook.presto.raptor.storage.LocalOrcDataEnvironment.tryGetLocalFileSystem;
import static com.facebook.presto.raptor.filesystem.LocalOrcDataEnvironment.tryGetLocalFileSystem;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.units.DataSize.Unit.BYTE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_BACKUP_ERROR;
import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_BACKUP_NOT_FOUND;
import static com.facebook.presto.raptor.storage.FileStorageService.getFileSystemPath;
import static com.facebook.presto.raptor.filesystem.LocalFileStorageService.getFileSystemPath;
import static java.nio.file.Files.deleteIfExists;
import static java.util.Objects.requireNonNull;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.raptor.filesystem;

import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.security.ConnectorIdentity;

import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

// TODO: Add schema name and table name to context
public class FileSystemContext
{
public static final FileSystemContext DEFAULT_RAPTOR_CONTEXT = new FileSystemContext(new ConnectorIdentity("presto-raptor", Optional.empty(), Optional.empty()));

private final ConnectorIdentity identity;
private final Optional<String> source;
private final Optional<String> queryId;
private final Optional<String> clientInfo;

public FileSystemContext(ConnectorIdentity identity)
{
this.identity = requireNonNull(identity, "identity is null");
this.source = Optional.empty();
this.queryId = Optional.empty();
this.clientInfo = Optional.empty();
}

public FileSystemContext(ConnectorSession session)
{
requireNonNull(session, "session is null");
this.identity = requireNonNull(session.getIdentity(), "session.getIdentity() is null");
this.source = requireNonNull(session.getSource(), "session.getSource()");
this.queryId = Optional.of(session.getQueryId());
this.clientInfo = session.getClientInfo();
}

public ConnectorIdentity getIdentity()
{
return identity;
}

public Optional<String> getSource()
{
return source;
}

public Optional<String> getQueryId()
{
return queryId;
}

public Optional<String> getClientInfo()
{
return clientInfo;
}

@Override
public String toString()
{
return toStringHelper(this)
.omitNullValues()
.add("user", identity)
.add("source", source.orElse(null))
.add("queryId", queryId.orElse(null))
.add("clientInfo", clientInfo.orElse(null))
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.raptor.filesystem;

import com.facebook.airlift.configuration.AbstractConfigurationAwareModule;
import com.facebook.airlift.configuration.ConfigurationAwareModule;
import com.facebook.presto.raptor.storage.StorageManagerConfig;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Module;

import java.util.Map;

import static com.facebook.airlift.configuration.ConfigBinder.configBinder;

public class FileSystemModule
extends AbstractConfigurationAwareModule
{
private final Map<String, Module> providers;

public FileSystemModule(Map<String, Module> providers)
{
this.providers = ImmutableMap.<String, Module>builder()
.put("file", new LocalFileSystemModule())
.put("hdfs", new HdfsModule())
.putAll(providers)
.build();
}

@Override
protected void setup(Binder binder)
{
configBinder(binder).bindConfig(StorageManagerConfig.class);

String fileSystemProvider = buildConfigObject(StorageManagerConfig.class).getFileSystemProvider();
Module module = providers.get(fileSystemProvider);

if (module == null) {
binder.addError("Unsupported file system: %s", fileSystemProvider);
}
else if (module instanceof ConfigurationAwareModule) {
install(module);
}
else {
binder.install(module);
}
}
}
Loading