diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/PluginInfo.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/PluginInfo.java index 7294176905d68..6b6c7e97c5013 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/PluginInfo.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/PluginInfo.java @@ -31,6 +31,11 @@ public Module getMetadataModule() return new DatabaseMetadataModule(); } + public Map getFileSystemProviders() + { + return ImmutableMap.of(); + } + public Map getBackupProviders() { return ImmutableMap.of(); diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorConnectorFactory.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorConnectorFactory.java index a5a650e040648..8f69857351e03 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorConnectorFactory.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorConnectorFactory.java @@ -16,6 +16,7 @@ import com.facebook.airlift.bootstrap.Bootstrap; import com.facebook.airlift.json.JsonModule; import com.facebook.presto.raptor.backup.BackupModule; +import com.facebook.presto.raptor.filesystem.FileSystemModule; import com.facebook.presto.raptor.security.RaptorSecurityModule; import com.facebook.presto.raptor.storage.StorageModule; import com.facebook.presto.raptor.util.RebindSafeMBeanServer; @@ -46,13 +47,15 @@ public class RaptorConnectorFactory { private final String name; private final Module metadataModule; + private final Map fileSystemProviders; private final Map backupProviders; - public RaptorConnectorFactory(String name, Module metadataModule, Map backupProviders) + public RaptorConnectorFactory(String name, Module metadataModule, Map fileSystemProviders, Map backupProviders) { checkArgument(!isNullOrEmpty(name), "name is null or empty"); this.name = name; this.metadataModule = requireNonNull(metadataModule, "metadataModule is null"); + this.fileSystemProviders = requireNonNull(fileSystemProviders, "fileSystemProviders is null"); this.backupProviders = ImmutableMap.copyOf(requireNonNull(backupProviders, "backupProviders is null")); } @@ -84,6 +87,7 @@ public Connector create(String catalogName, Map config, Connecto binder.bind(TypeManager.class).toInstance(context.getTypeManager()); }, metadataModule, + new FileSystemModule(fileSystemProviders), new BackupModule(backupProviders), new StorageModule(catalogName), new RaptorModule(catalogName), diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorErrorCode.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorErrorCode.java index 6bd92ab594a14..b9a2304981ede 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorErrorCode.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorErrorCode.java @@ -39,7 +39,7 @@ public enum RaptorErrorCode RAPTOR_NOT_ENOUGH_NODES(14, EXTERNAL), RAPTOR_WRITER_DATA_ERROR(15, EXTERNAL), RAPTOR_UNSUPPORTED_COMPRESSION_KIND(16, EXTERNAL), - RAPTOR_LOCAL_FILE_SYSTEM_ERROR(17, EXTERNAL), + RAPTOR_FILE_SYSTEM_ERROR(17, EXTERNAL), RAPTOR_TOO_MANY_FILES_CREATED(18, EXTERNAL); private final ErrorCode errorCode; diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSink.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSink.java index a481147ea9f34..f78e0656e9b13 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSink.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSink.java @@ -14,6 +14,7 @@ package com.facebook.presto.raptor; import com.facebook.airlift.json.JsonCodec; +import com.facebook.presto.raptor.filesystem.FileSystemContext; import com.facebook.presto.raptor.metadata.ShardInfo; import com.facebook.presto.raptor.storage.StorageManager; import com.facebook.presto.raptor.storage.organization.TemporalFunction; @@ -71,10 +72,12 @@ public class RaptorPageSink private final Optional temporalColumnType; private final TemporalFunction temporalFunction; private final int maxAllowedFilesPerWriter; + private final FileSystemContext context; private final PageWriter pageWriter; public RaptorPageSink( + FileSystemContext context, PageSorter pageSorter, StorageManager storageManager, TemporalFunction temporalFunction, @@ -103,6 +106,7 @@ public RaptorPageSink( this.bucketCount = bucketCount; this.bucketFields = bucketColumnIds.stream().mapToInt(columnIds::indexOf).toArray(); + this.context = requireNonNull(context, "context is null"); if (temporalColumnHandle.isPresent() && columnIds.contains(temporalColumnHandle.get().getColumnId())) { temporalColumnIndex = OptionalInt.of(columnIds.indexOf(temporalColumnHandle.get().getColumnId())); @@ -169,7 +173,7 @@ private PageBuffer createPageBuffer(OptionalInt bucketNumber) { return new PageBuffer( maxBufferBytes, - storageManager.createStoragePageSink(transactionId, bucketNumber, columnIds, columnTypes, true), + storageManager.createStoragePageSink(context, transactionId, bucketNumber, columnIds, columnTypes, true), columnTypes, sortFields, sortOrders, diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSinkProvider.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSinkProvider.java index f8fe940975224..a2d66251455e6 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSinkProvider.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSinkProvider.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.raptor; +import com.facebook.presto.raptor.filesystem.FileSystemContext; import com.facebook.presto.raptor.storage.StorageManager; import com.facebook.presto.raptor.storage.StorageManagerConfig; import com.facebook.presto.raptor.storage.organization.TemporalFunction; @@ -60,6 +61,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa RaptorOutputTableHandle handle = (RaptorOutputTableHandle) tableHandle; return new RaptorPageSink( + new FileSystemContext(session), pageSorter, storageManager, temporalFunction, @@ -82,6 +84,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa RaptorInsertTableHandle handle = (RaptorInsertTableHandle) tableHandle; return new RaptorPageSink( + new FileSystemContext(session), pageSorter, storageManager, temporalFunction, diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSourceProvider.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSourceProvider.java index 4438787ffa394..bb0d1b66bc473 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSourceProvider.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSourceProvider.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.raptor; +import com.facebook.presto.raptor.filesystem.FileSystemContext; import com.facebook.presto.raptor.storage.ReaderAttributes; import com.facebook.presto.raptor.storage.StorageManager; import com.facebook.presto.raptor.util.ConcatPageSource; @@ -60,19 +61,22 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti OptionalLong transactionId = raptorSplit.getTransactionId(); Optional> columnTypes = raptorSplit.getColumnTypes(); + FileSystemContext context = new FileSystemContext(session); + if (raptorSplit.getShardUuids().size() == 1) { UUID shardUuid = raptorSplit.getShardUuids().iterator().next(); - return createPageSource(shardUuid, bucketNumber, columns, predicate, attributes, transactionId, columnTypes); + return createPageSource(context, shardUuid, bucketNumber, columns, predicate, attributes, transactionId, columnTypes); } Iterator iterator = raptorSplit.getShardUuids().stream() - .map(shardUuid -> createPageSource(shardUuid, bucketNumber, columns, predicate, attributes, transactionId, columnTypes)) + .map(shardUuid -> createPageSource(context, shardUuid, bucketNumber, columns, predicate, attributes, transactionId, columnTypes)) .iterator(); return new ConcatPageSource(iterator); } private ConnectorPageSource createPageSource( + FileSystemContext context, UUID shardUuid, OptionalInt bucketNumber, List columns, @@ -85,6 +89,6 @@ private ConnectorPageSource createPageSource( List columnIds = columnHandles.stream().map(RaptorColumnHandle::getColumnId).collect(toList()); List columnTypes = columnHandles.stream().map(RaptorColumnHandle::getColumnType).collect(toList()); - return storageManager.getPageSource(shardUuid, bucketNumber, columnIds, columnTypes, predicate, attributes, transactionId, allColumnTypes); + return storageManager.getPageSource(context, shardUuid, bucketNumber, columnIds, columnTypes, predicate, attributes, transactionId, allColumnTypes); } } diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPlugin.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPlugin.java index 66ee02f727a9e..5f08fa97fe685 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPlugin.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPlugin.java @@ -33,6 +33,7 @@ public class RaptorPlugin { private final String name; private final Module metadataModule; + private final Map fileSystemProviders; private final Map backupProviders; public RaptorPlugin() @@ -42,21 +43,22 @@ public RaptorPlugin() private RaptorPlugin(PluginInfo info) { - this(info.getName(), info.getMetadataModule(), info.getBackupProviders()); + this(info.getName(), info.getMetadataModule(), info.getFileSystemProviders(), info.getBackupProviders()); } - public RaptorPlugin(String name, Module metadataModule, Map backupProviders) + public RaptorPlugin(String name, Module metadataModule, Map fileSystemProviders, Map backupProviders) { checkArgument(!isNullOrEmpty(name), "name is null or empty"); this.name = name; this.metadataModule = requireNonNull(metadataModule, "metadataModule is null"); + this.fileSystemProviders = requireNonNull(fileSystemProviders, "fileSystemProviders is null"); this.backupProviders = ImmutableMap.copyOf(requireNonNull(backupProviders, "backupProviders is null")); } @Override public Iterable getConnectorFactories() { - return ImmutableList.of(new RaptorConnectorFactory(name, metadataModule, backupProviders)); + return ImmutableList.of(new RaptorConnectorFactory(name, metadataModule, fileSystemProviders, backupProviders)); } private static PluginInfo getPluginInfo() diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/backup/BackupManager.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/backup/BackupManager.java index 1dc702a045e02..7be1f3483900e 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/backup/BackupManager.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/backup/BackupManager.java @@ -40,7 +40,7 @@ import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_BACKUP_CORRUPTION; -import static com.facebook.presto.raptor.storage.LocalOrcDataEnvironment.tryGetLocalFileSystem; +import static com.facebook.presto.raptor.filesystem.LocalOrcDataEnvironment.tryGetLocalFileSystem; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static io.airlift.units.DataSize.Unit.BYTE; diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/backup/FileBackupStore.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/backup/FileBackupStore.java index 62e7189537c9b..c7fa5d5370c59 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/backup/FileBackupStore.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/backup/FileBackupStore.java @@ -29,7 +29,7 @@ import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_BACKUP_ERROR; import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_BACKUP_NOT_FOUND; -import static com.facebook.presto.raptor.storage.FileStorageService.getFileSystemPath; +import static com.facebook.presto.raptor.filesystem.LocalFileStorageService.getFileSystemPath; import static java.nio.file.Files.deleteIfExists; import static java.util.Objects.requireNonNull; diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/FileSystemContext.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/FileSystemContext.java new file mode 100644 index 0000000000000..36d33b49c5011 --- /dev/null +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/FileSystemContext.java @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.raptor.filesystem; + +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.security.ConnectorIdentity; + +import java.util.Optional; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +// TODO: Add schema name and table name to context +public class FileSystemContext +{ + public static final FileSystemContext DEFAULT_RAPTOR_CONTEXT = new FileSystemContext(new ConnectorIdentity("presto-raptor", Optional.empty(), Optional.empty())); + + private final ConnectorIdentity identity; + private final Optional source; + private final Optional queryId; + private final Optional clientInfo; + + public FileSystemContext(ConnectorIdentity identity) + { + this.identity = requireNonNull(identity, "identity is null"); + this.source = Optional.empty(); + this.queryId = Optional.empty(); + this.clientInfo = Optional.empty(); + } + + public FileSystemContext(ConnectorSession session) + { + requireNonNull(session, "session is null"); + this.identity = requireNonNull(session.getIdentity(), "session.getIdentity() is null"); + this.source = requireNonNull(session.getSource(), "session.getSource()"); + this.queryId = Optional.of(session.getQueryId()); + this.clientInfo = session.getClientInfo(); + } + + public ConnectorIdentity getIdentity() + { + return identity; + } + + public Optional getSource() + { + return source; + } + + public Optional getQueryId() + { + return queryId; + } + + public Optional getClientInfo() + { + return clientInfo; + } + + @Override + public String toString() + { + return toStringHelper(this) + .omitNullValues() + .add("user", identity) + .add("source", source.orElse(null)) + .add("queryId", queryId.orElse(null)) + .add("clientInfo", clientInfo.orElse(null)) + .toString(); + } +} diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/FileSystemModule.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/FileSystemModule.java new file mode 100644 index 0000000000000..ea73787385f2a --- /dev/null +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/FileSystemModule.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.raptor.filesystem; + +import com.facebook.airlift.configuration.AbstractConfigurationAwareModule; +import com.facebook.airlift.configuration.ConfigurationAwareModule; +import com.facebook.presto.raptor.storage.StorageManagerConfig; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Binder; +import com.google.inject.Module; + +import java.util.Map; + +import static com.facebook.airlift.configuration.ConfigBinder.configBinder; + +public class FileSystemModule + extends AbstractConfigurationAwareModule +{ + private final Map providers; + + public FileSystemModule(Map providers) + { + this.providers = ImmutableMap.builder() + .put("file", new LocalFileSystemModule()) + .put("hdfs", new HdfsModule()) + .putAll(providers) + .build(); + } + + @Override + protected void setup(Binder binder) + { + configBinder(binder).bindConfig(StorageManagerConfig.class); + + String fileSystemProvider = buildConfigObject(StorageManagerConfig.class).getFileSystemProvider(); + Module module = providers.get(fileSystemProvider); + + if (module == null) { + binder.addError("Unsupported file system: %s", fileSystemProvider); + } + else if (module instanceof ConfigurationAwareModule) { + install(module); + } + else { + binder.install(module); + } + } +} diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/FileSystemUtil.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/FileSystemUtil.java index 463e8f843b9af..404df2523e5e7 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/FileSystemUtil.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/FileSystemUtil.java @@ -15,18 +15,52 @@ import com.facebook.presto.spi.PrestoException; import io.airlift.slice.XxHash64; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.IOException; import java.io.InputStream; +import java.util.Map; import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_ERROR; public final class FileSystemUtil { + private static final Configuration INITIAL_CONFIGURATION; + + static { + Configuration.addDefaultResource("hdfs-default.xml"); + Configuration.addDefaultResource("hdfs-site.xml"); + + // must not be transitively reloaded during the future loading of various Hadoop modules + // all the required default resources must be declared above + INITIAL_CONFIGURATION = new Configuration(false); + Configuration defaultConfiguration = new Configuration(); + FileSystemUtil.copy(defaultConfiguration, FileSystemUtil.INITIAL_CONFIGURATION); + } + private FileSystemUtil() {} + public static Configuration getInitialConfiguration() + { + return copy(INITIAL_CONFIGURATION); + } + + public static Configuration copy(Configuration configuration) + { + Configuration copy = new Configuration(false); + copy(configuration, copy); + return copy; + } + + public static void copy(Configuration from, Configuration to) + { + for (Map.Entry entry : from) { + to.set(entry.getKey(), entry.getValue()); + } + } + public static long xxhash64(FileSystem fileSystem, Path file) { try (InputStream in = fileSystem.open(file)) { diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/HdfsModule.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/HdfsModule.java new file mode 100644 index 0000000000000..e8c140c7cd9f0 --- /dev/null +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/HdfsModule.java @@ -0,0 +1,48 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.raptor.filesystem; + +import com.facebook.presto.raptor.storage.OrcDataEnvironment; +import com.facebook.presto.raptor.storage.StorageManagerConfig; +import com.facebook.presto.raptor.storage.StorageService; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import org.apache.hadoop.fs.Path; + +import javax.inject.Singleton; + +import static com.facebook.airlift.configuration.ConfigBinder.configBinder; + +public class HdfsModule + implements Module +{ + @Override + public void configure(Binder binder) + { + configBinder(binder).bindConfig(StorageManagerConfig.class); + configBinder(binder).bindConfig(RaptorHdfsConfig.class); + binder.bind(RaptorHdfsConfiguration.class).to(RaptorHiveHdfsConfiguration.class).in(Scopes.SINGLETON); + binder.bind(StorageService.class).to(HdfsStorageService.class).in(Scopes.SINGLETON); + binder.bind(OrcDataEnvironment.class).to(HdfsOrcDataEnvironment.class).in(Scopes.SINGLETON); + } + + @Singleton + @Provides + public Path createBaseLocation(StorageManagerConfig config) + { + return new Path(config.getDataDirectory()); + } +} diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/HdfsOrcDataEnvironment.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/HdfsOrcDataEnvironment.java new file mode 100644 index 0000000000000..2d12dc614d30a --- /dev/null +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/HdfsOrcDataEnvironment.java @@ -0,0 +1,77 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.raptor.filesystem; + +import com.facebook.presto.orc.OrcDataSink; +import com.facebook.presto.orc.OrcDataSource; +import com.facebook.presto.orc.OrcDataSourceId; +import com.facebook.presto.orc.OutputStreamOrcDataSink; +import com.facebook.presto.raptor.storage.OrcDataEnvironment; +import com.facebook.presto.raptor.storage.ReaderAttributes; +import com.facebook.presto.spi.PrestoException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import javax.inject.Inject; + +import java.io.IOException; + +import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_FILE_SYSTEM_ERROR; +import static java.util.Objects.requireNonNull; + +public class HdfsOrcDataEnvironment + implements OrcDataEnvironment +{ + private final Path baseLocation; + private final RaptorHdfsConfiguration configuration; + + @Inject + public HdfsOrcDataEnvironment(Path baseLocation, RaptorHdfsConfiguration configuration) + { + this.baseLocation = requireNonNull(baseLocation, "baseLocation is null"); + this.configuration = requireNonNull(configuration, "configuration is null"); + } + + @Override + public FileSystem getFileSystem(FileSystemContext fileSystemContext) + { + try { + return baseLocation.getFileSystem(configuration.getConfiguration(fileSystemContext, baseLocation.toUri())); + } + catch (IOException e) { + throw new PrestoException(RAPTOR_FILE_SYSTEM_ERROR, "Raptor cannot create HDFS file system", e); + } + } + + @Override + public OrcDataSource createOrcDataSource(FileSystem fileSystem, Path path, ReaderAttributes readerAttributes) + throws IOException + { + return new HdfsOrcDataSource( + new OrcDataSourceId(path.toString()), + fileSystem.getFileStatus(path).getLen(), + readerAttributes.getMaxMergeDistance(), + readerAttributes.getMaxReadSize(), + readerAttributes.getStreamBufferSize(), + readerAttributes.isLazyReadSmallRanges(), + fileSystem.open(path)); + } + + @Override + public OrcDataSink createOrcDataSink(FileSystem fileSystem, Path path) + throws IOException + { + return new OutputStreamOrcDataSink(fileSystem.create(path)); + } +} diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/HdfsOrcDataSource.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/HdfsOrcDataSource.java new file mode 100644 index 0000000000000..907a4a019134e --- /dev/null +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/HdfsOrcDataSource.java @@ -0,0 +1,74 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.raptor.filesystem; + +import com.facebook.presto.orc.AbstractOrcDataSource; +import com.facebook.presto.orc.OrcDataSourceId; +import com.facebook.presto.spi.PrestoException; +import io.airlift.units.DataSize; +import org.apache.hadoop.fs.FSDataInputStream; + +import java.io.IOException; + +import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_ERROR; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class HdfsOrcDataSource + extends AbstractOrcDataSource +{ + private final FSDataInputStream inputStream; + + public HdfsOrcDataSource( + OrcDataSourceId id, + long size, + DataSize maxMergeDistance, + DataSize maxReadSize, + DataSize streamBufferSize, + boolean lazyReadSmallRanges, + FSDataInputStream inputStream) + { + super(id, size, maxMergeDistance, maxReadSize, streamBufferSize, lazyReadSmallRanges); + this.inputStream = requireNonNull(inputStream, "inputStream is null"); + } + + @Override + public void close() + throws IOException + { + inputStream.close(); + } + + @Override + protected void readInternal(long position, byte[] buffer, int bufferOffset, int bufferLength) + { + try { + inputStream.readFully(position, buffer, bufferOffset, bufferLength); + } + catch (PrestoException e) { + // just in case there is a Presto wrapper or hook + throw e; + } + catch (Exception e) { + String message = format("Error reading from %s at position %s", this, position); + if (e.getClass().getSimpleName().equals("BlockMissingException")) { + throw new PrestoException(RAPTOR_ERROR, message, e); + } + if (e instanceof IOException) { + throw new PrestoException(RAPTOR_ERROR, message, e); + } + throw new PrestoException(RAPTOR_ERROR, message, e); + } + } +} diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/HdfsStorageService.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/HdfsStorageService.java new file mode 100644 index 0000000000000..54c173fe788f2 --- /dev/null +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/HdfsStorageService.java @@ -0,0 +1,132 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.raptor.filesystem; + +import com.facebook.presto.raptor.storage.OrcDataEnvironment; +import com.facebook.presto.raptor.storage.StorageService; +import com.facebook.presto.spi.PrestoException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import javax.annotation.PostConstruct; +import javax.inject.Inject; + +import java.io.IOException; +import java.util.Set; +import java.util.UUID; + +import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_ERROR; +import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_FILE_SYSTEM_ERROR; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Locale.ENGLISH; +import static java.util.Objects.requireNonNull; + +// TODO: this only works for hard file affinity; but good for a prototype +// TODO: need to handle race condition on all workers +// TODO: Staging dir and storage dir are the same in this StorageService implementation +public class HdfsStorageService + implements StorageService +{ + private static final String FILE_EXTENSION = ".orc"; + + private final Path baseStorageDir; + private final Path baseQuarantineDir; + private final OrcDataEnvironment environment; + + @Inject + public HdfsStorageService(OrcDataEnvironment environment, Path baseLocation) + { + requireNonNull(baseLocation, "baseLocation is null"); + this.baseStorageDir = new Path(baseLocation, "storage"); + this.baseQuarantineDir = new Path(baseLocation, "quarantine"); + this.environment = requireNonNull(environment, "environment is null"); + } + + @Override + @PostConstruct + public void start() + { + createDirectory(baseStorageDir); + createDirectory(baseQuarantineDir); + } + + @Override + public long getAvailableBytes() + { + return Long.MAX_VALUE; + } + + @Override + public Path getStorageFile(UUID shardUuid) + { + return getFileSystemPath(baseStorageDir, shardUuid); + } + + @Override + public Path getStagingFile(UUID shardUuid) + { + // Deliberately returned storage file path because we don't have a stage directory here + return getStorageFile(shardUuid); + } + + @Override + public Path getQuarantineFile(UUID shardUuid) + { + throw new PrestoException(RAPTOR_ERROR, "Possible data corruption is detected in metadata or remote storage"); + } + + @Override + public Set getStorageShards() + { + // Bug: This prevent ShardCleaner from cleaning up any unused shards so currently storage will grow indefinitely + // This can only be solved until we re-design the metadata layout for disagg Raptor + throw new UnsupportedOperationException("HDFS storage does not support list directory on purpose"); + } + + @Override + public void createParents(Path file) + { + checkArgument(file.getParent().equals(baseStorageDir) || file.getParent().equals(baseQuarantineDir)); + // No need to create parent as we only have 2 levels of directory now + // TODO: This may change based on metadata redesign + } + + @Override + public void promoteFromStagingToStorage(UUID shardUuid) + { + // Nothing to do as we don't have staging directory + } + + private static Path getFileSystemPath(Path base, UUID shardUuid) + { + String uuid = shardUuid.toString().toLowerCase(ENGLISH); + return new Path(base, uuid + FILE_EXTENSION); + } + + private void createDirectory(Path directory) + { + boolean madeDirectory; + try { + FileSystem fileSystem = environment.getFileSystem(FileSystemContext.DEFAULT_RAPTOR_CONTEXT); + madeDirectory = fileSystem.mkdirs(directory) && fileSystem.isDirectory(directory); + } + catch (IOException e) { + throw new PrestoException(RAPTOR_FILE_SYSTEM_ERROR, "Failed creating directories: " + directory, e); + } + + if (!madeDirectory) { + throw new PrestoException(RAPTOR_FILE_SYSTEM_ERROR, "Failed creating directories: " + directory); + } + } +} diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/FileStorageService.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/LocalFileStorageService.java similarity index 79% rename from presto-raptor/src/main/java/com/facebook/presto/raptor/storage/FileStorageService.java rename to presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/LocalFileStorageService.java index e2933e40f5493..69699a8995e8f 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/FileStorageService.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/LocalFileStorageService.java @@ -11,9 +11,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.facebook.presto.raptor.storage; +package com.facebook.presto.raptor.filesystem; import com.facebook.airlift.log.Logger; +import com.facebook.presto.raptor.storage.OrcDataEnvironment; +import com.facebook.presto.raptor.storage.StorageManagerConfig; +import com.facebook.presto.raptor.storage.StorageService; import com.facebook.presto.spi.PrestoException; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -27,6 +30,7 @@ import java.io.File; import java.io.FileFilter; import java.io.IOException; +import java.net.URI; import java.nio.file.Files; import java.util.List; import java.util.Optional; @@ -35,15 +39,15 @@ import java.util.regex.Pattern; import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_ERROR; -import static com.facebook.presto.raptor.storage.LocalOrcDataEnvironment.tryGetLocalFileSystem; +import static com.facebook.presto.raptor.filesystem.LocalOrcDataEnvironment.tryGetLocalFileSystem; import static com.google.common.base.Preconditions.checkState; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; -public class FileStorageService +public class LocalFileStorageService implements StorageService { - private static final Logger log = Logger.get(FileStorageService.class); + private static final Logger log = Logger.get(LocalFileStorageService.class); private static final Pattern HEX_DIRECTORY = Pattern.compile("[0-9a-f]{2}"); private static final String FILE_EXTENSION = ".orc"; @@ -55,17 +59,19 @@ public class FileStorageService private final File baseQuarantineDir; @Inject - public FileStorageService(OrcDataEnvironment environment, StorageManagerConfig config) + public LocalFileStorageService(OrcDataEnvironment environment, StorageManagerConfig storageManagerConfig) { - this(environment, config.getDataDirectory()); + this(environment, storageManagerConfig.getDataDirectory()); } - public FileStorageService(OrcDataEnvironment environment, File dataDirectory) + public LocalFileStorageService(OrcDataEnvironment environment, URI dataDirectory) { Optional fileSystem = tryGetLocalFileSystem(requireNonNull(environment, "environment is null")); - checkState(fileSystem.isPresent(), "FileStorageService has to have local file system"); + checkState(fileSystem.isPresent(), "LocalFileStorageService has to have local file system"); + checkState(dataDirectory.isAbsolute(), "dataDirectory URI is not absolute"); + checkState(dataDirectory.getScheme().equals("file"), "dataDirectory URI is not pointing to local file system"); this.localFileSystem = fileSystem.get(); - File baseDataDir = requireNonNull(dataDirectory, "dataDirectory is null"); + File baseDataDir = requireNonNull(localFileSystem.pathToFile(new Path(dataDirectory)), "dataDirectory is null"); this.baseStorageDir = new File(baseDataDir, "storage"); this.baseStagingDir = new File(baseDataDir, "staging"); this.baseQuarantineDir = new File(baseDataDir, "quarantine"); @@ -118,8 +124,8 @@ public Path getQuarantineFile(UUID shardUuid) public Set getStorageShards() { ImmutableSet.Builder shards = ImmutableSet.builder(); - for (File level1 : listFiles(baseStorageDir, FileStorageService::isHexDirectory)) { - for (File level2 : listFiles(level1, FileStorageService::isHexDirectory)) { + for (File level1 : listFiles(baseStorageDir, LocalFileStorageService::isHexDirectory)) { + for (File level2 : listFiles(level1, LocalFileStorageService::isHexDirectory)) { for (File file : listFiles(level2, path -> true)) { if (file.isFile()) { uuidFromFileName(file.getName()).ifPresent(shards::add); @@ -136,6 +142,22 @@ public void createParents(Path file) createDirectory(file.getParent()); } + @Override + public void promoteFromStagingToStorage(UUID shardUuid) + { + Path stagingFile = getStagingFile(shardUuid); + Path storageFile = getStorageFile(shardUuid); + + createParents(storageFile); + + try { + localFileSystem.rename(stagingFile, storageFile); + } + catch (IOException e) { + throw new PrestoException(RAPTOR_ERROR, "Failed to move shard file", e); + } + } + /** * Generate a file system path for a shard UUID. * This creates a three level deep directory structure where the first diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/LocalFileSystemModule.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/LocalFileSystemModule.java new file mode 100644 index 0000000000000..dda8558334318 --- /dev/null +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/LocalFileSystemModule.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.raptor.filesystem; + +import com.facebook.presto.raptor.storage.OrcDataEnvironment; +import com.facebook.presto.raptor.storage.StorageService; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Scopes; + +public class LocalFileSystemModule + implements Module +{ + @Override + public void configure(Binder binder) + { + binder.bind(StorageService.class).to(LocalFileStorageService.class).in(Scopes.SINGLETON); + binder.bind(OrcDataEnvironment.class).to(LocalOrcDataEnvironment.class).in(Scopes.SINGLETON); + } +} diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/LocalOrcDataEnvironment.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/LocalOrcDataEnvironment.java similarity index 76% rename from presto-raptor/src/main/java/com/facebook/presto/raptor/storage/LocalOrcDataEnvironment.java rename to presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/LocalOrcDataEnvironment.java index d7ad812f0d8be..286fc5adbf842 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/LocalOrcDataEnvironment.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/LocalOrcDataEnvironment.java @@ -11,13 +11,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.facebook.presto.raptor.storage; +package com.facebook.presto.raptor.filesystem; import com.facebook.presto.orc.FileOrcDataSource; import com.facebook.presto.orc.OrcDataSink; import com.facebook.presto.orc.OrcDataSource; import com.facebook.presto.orc.OutputStreamOrcDataSink; -import com.facebook.presto.raptor.filesystem.RaptorLocalFileSystem; +import com.facebook.presto.raptor.storage.OrcDataEnvironment; +import com.facebook.presto.raptor.storage.ReaderAttributes; import com.facebook.presto.spi.PrestoException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -30,13 +31,12 @@ import java.io.IOException; import java.util.Optional; -import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_LOCAL_FILE_SYSTEM_ERROR; +import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_FILE_SYSTEM_ERROR; public class LocalOrcDataEnvironment implements OrcDataEnvironment { private static final Configuration CONFIGURATION = new Configuration(); - private final RawLocalFileSystem localFileSystem; @Inject @@ -46,18 +46,23 @@ public LocalOrcDataEnvironment() this.localFileSystem = new RaptorLocalFileSystem(CONFIGURATION); } catch (IOException e) { - throw new PrestoException(RAPTOR_LOCAL_FILE_SYSTEM_ERROR, "Raptor cannot create local file system", e); + throw new PrestoException(RAPTOR_FILE_SYSTEM_ERROR, "Raptor cannot create local file system", e); } } @Override - public FileSystem getFileSystem() + public FileSystem getFileSystem(FileSystemContext ignore) + { + return localFileSystem; + } + + public RawLocalFileSystem getLocalFileSystem() { return localFileSystem; } @Override - public OrcDataSource createOrcDataSource(Path path, ReaderAttributes readerAttributes) + public OrcDataSource createOrcDataSource(FileSystem ignore, Path path, ReaderAttributes readerAttributes) throws IOException { return new FileOrcDataSource( @@ -69,7 +74,7 @@ public OrcDataSource createOrcDataSource(Path path, ReaderAttributes readerAttri } @Override - public OrcDataSink createOrcDataSink(Path path) + public OrcDataSink createOrcDataSink(FileSystem fileSystem, Path path) throws IOException { return new OutputStreamOrcDataSink(new FileOutputStream(localFileSystem.pathToFile(path))); @@ -78,7 +83,7 @@ public OrcDataSink createOrcDataSink(Path path) public static Optional tryGetLocalFileSystem(OrcDataEnvironment environment) { if (environment instanceof LocalOrcDataEnvironment) { - return Optional.of((RaptorLocalFileSystem) environment.getFileSystem()); + return Optional.of(((LocalOrcDataEnvironment) environment).getLocalFileSystem()); } return Optional.empty(); } diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/RaptorHdfsConfig.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/RaptorHdfsConfig.java new file mode 100644 index 0000000000000..a6f906a3f78ad --- /dev/null +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/RaptorHdfsConfig.java @@ -0,0 +1,197 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.raptor.filesystem; + +import com.facebook.airlift.configuration.Config; +import com.facebook.airlift.configuration.ConfigDescription; +import com.facebook.airlift.configuration.LegacyConfig; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import com.google.common.net.HostAndPort; +import io.airlift.units.DataSize; +import io.airlift.units.Duration; +import io.airlift.units.MaxDataSize; +import io.airlift.units.MinDataSize; +import io.airlift.units.MinDuration; +import org.apache.hadoop.fs.Path; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static io.airlift.units.DataSize.Unit.MEGABYTE; + +public class RaptorHdfsConfig +{ + private HostAndPort socksProxy; + + private Duration ipcPingInterval = new Duration(10, TimeUnit.SECONDS); + private Duration dfsTimeout = new Duration(60, TimeUnit.SECONDS); + private Duration dfsConnectTimeout = new Duration(500, TimeUnit.MILLISECONDS); + private int dfsConnectMaxRetries = 5; + private String domainSocketPath; + + private List resourceConfigFiles = ImmutableList.of(); + + private DataSize textMaxLineLength = new DataSize(100, MEGABYTE); + + private boolean hdfsWireEncryptionEnabled; + + private int fileSystemMaxCacheSize = 1000; + + private Path hdfsPath; + + public HostAndPort getSocksProxy() + { + return socksProxy; + } + + @Config("hive.thrift.client.socks-proxy") + public RaptorHdfsConfig setSocksProxy(HostAndPort socksProxy) + { + this.socksProxy = socksProxy; + return this; + } + + @NotNull + public List getResourceConfigFiles() + { + return resourceConfigFiles; + } + + @Config("hive.config.resources") + public RaptorHdfsConfig setResourceConfigFiles(String files) + { + this.resourceConfigFiles = Splitter.on(',').trimResults().omitEmptyStrings().splitToList(files); + return this; + } + + public RaptorHdfsConfig setResourceConfigFiles(List files) + { + this.resourceConfigFiles = ImmutableList.copyOf(files); + return this; + } + + @NotNull + @MinDuration("1ms") + public Duration getIpcPingInterval() + { + return ipcPingInterval; + } + + @Config("hive.dfs.ipc-ping-interval") + public RaptorHdfsConfig setIpcPingInterval(Duration pingInterval) + { + this.ipcPingInterval = pingInterval; + return this; + } + + @NotNull + @MinDuration("1ms") + public Duration getDfsTimeout() + { + return dfsTimeout; + } + + @Config("hive.dfs-timeout") + public RaptorHdfsConfig setDfsTimeout(Duration dfsTimeout) + { + this.dfsTimeout = dfsTimeout; + return this; + } + + @MinDuration("1ms") + @NotNull + public Duration getDfsConnectTimeout() + { + return dfsConnectTimeout; + } + + @Config("hive.dfs.connect.timeout") + public RaptorHdfsConfig setDfsConnectTimeout(Duration dfsConnectTimeout) + { + this.dfsConnectTimeout = dfsConnectTimeout; + return this; + } + + @Min(0) + public int getDfsConnectMaxRetries() + { + return dfsConnectMaxRetries; + } + + @Config("hive.dfs.connect.max-retries") + public RaptorHdfsConfig setDfsConnectMaxRetries(int dfsConnectMaxRetries) + { + this.dfsConnectMaxRetries = dfsConnectMaxRetries; + return this; + } + + public String getDomainSocketPath() + { + return domainSocketPath; + } + + @Config("hive.dfs.domain-socket-path") + @LegacyConfig("dfs.domain-socket-path") + public RaptorHdfsConfig setDomainSocketPath(String domainSocketPath) + { + this.domainSocketPath = domainSocketPath; + return this; + } + + @MinDataSize("1B") + @MaxDataSize("1GB") + @NotNull + public DataSize getTextMaxLineLength() + { + return textMaxLineLength; + } + + @Config("hive.text.max-line-length") + @ConfigDescription("Maximum line length for text files") + public RaptorHdfsConfig setTextMaxLineLength(DataSize textMaxLineLength) + { + this.textMaxLineLength = textMaxLineLength; + return this; + } + + public boolean isHdfsWireEncryptionEnabled() + { + return hdfsWireEncryptionEnabled; + } + + @Config("hive.hdfs.wire-encryption.enabled") + @ConfigDescription("Should be turned on when HDFS wire encryption is enabled") + public RaptorHdfsConfig setHdfsWireEncryptionEnabled(boolean hdfsWireEncryptionEnabled) + { + this.hdfsWireEncryptionEnabled = hdfsWireEncryptionEnabled; + return this; + } + + public int getFileSystemMaxCacheSize() + { + return fileSystemMaxCacheSize; + } + + @Config("hive.fs.cache.max-size") + @ConfigDescription("Hadoop FileSystem cache size") + public RaptorHdfsConfig setFileSystemMaxCacheSize(int fileSystemMaxCacheSize) + { + this.fileSystemMaxCacheSize = fileSystemMaxCacheSize; + return this; + } +} diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/RaptorHdfsConfiguration.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/RaptorHdfsConfiguration.java new file mode 100644 index 0000000000000..6ae951c5b1a55 --- /dev/null +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/RaptorHdfsConfiguration.java @@ -0,0 +1,23 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.raptor.filesystem; + +import org.apache.hadoop.conf.Configuration; + +import java.net.URI; + +public interface RaptorHdfsConfiguration +{ + Configuration getConfiguration(FileSystemContext context, URI uri); +} diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/RaptorHiveHdfsConfiguration.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/RaptorHiveHdfsConfiguration.java new file mode 100644 index 0000000000000..5ac66f5b4882f --- /dev/null +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/filesystem/RaptorHiveHdfsConfiguration.java @@ -0,0 +1,183 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.raptor.filesystem; + +import com.google.common.collect.ImmutableList; +import com.google.common.net.HostAndPort; +import io.airlift.units.Duration; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.net.SocksSocketFactory; + +import javax.inject.Inject; +import javax.net.SocketFactory; + +import java.net.URI; +import java.util.List; + +import static com.facebook.presto.raptor.filesystem.FileSystemUtil.copy; +import static com.facebook.presto.raptor.filesystem.FileSystemUtil.getInitialConfiguration; +import static com.google.common.base.Preconditions.checkArgument; +import static java.lang.Math.toIntExact; +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_PING_INTERVAL_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SOCKS_SERVER_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; + +public class RaptorHiveHdfsConfiguration + implements RaptorHdfsConfiguration +{ + private static final Configuration INITIAL_CONFIGURATION = getInitialConfiguration(); + + @SuppressWarnings("ThreadLocalNotStaticFinal") + private final ThreadLocal hadoopConfiguration = new ThreadLocal() + { + @Override + protected Configuration initialValue() + { + Configuration configuration = new Configuration(false); + copy(INITIAL_CONFIGURATION, configuration); + initializer.updateConfiguration(configuration); + return configuration; + } + }; + + private final HdfsConfigurationInitializer initializer; + + @Inject + public RaptorHiveHdfsConfiguration(RaptorHdfsConfig config) + { + this.initializer = new HdfsConfigurationInitializer(requireNonNull(config, "config is null")); + } + + // TODO: Support DynamicConfigurationProvider which consumes context and URI + @Override + public Configuration getConfiguration(FileSystemContext context, URI uri) + { + return hadoopConfiguration.get(); + } + + private static class HdfsConfigurationInitializer + { + private final HostAndPort socksProxy; + private final Duration ipcPingInterval; + private final Duration dfsTimeout; + private final Duration dfsConnectTimeout; + private final int dfsConnectMaxRetries; + private final String domainSocketPath; + private final Configuration resourcesConfiguration; + private final int fileSystemMaxCacheSize; + private final boolean isHdfsWireEncryptionEnabled; + private int textMaxLineLength; + + public HdfsConfigurationInitializer(RaptorHdfsConfig config) + { + requireNonNull(config, "config is null"); + checkArgument(config.getDfsTimeout().toMillis() >= 1, "dfsTimeout must be at least 1 ms"); + checkArgument(toIntExact(config.getTextMaxLineLength().toBytes()) >= 1, "textMaxLineLength must be at least 1 byte"); + + this.socksProxy = config.getSocksProxy(); + this.ipcPingInterval = config.getIpcPingInterval(); + this.dfsTimeout = config.getDfsTimeout(); + this.dfsConnectTimeout = config.getDfsConnectTimeout(); + this.dfsConnectMaxRetries = config.getDfsConnectMaxRetries(); + this.domainSocketPath = config.getDomainSocketPath(); + this.resourcesConfiguration = readConfiguration(config.getResourceConfigFiles()); + this.fileSystemMaxCacheSize = config.getFileSystemMaxCacheSize(); + this.isHdfsWireEncryptionEnabled = config.isHdfsWireEncryptionEnabled(); + this.textMaxLineLength = toIntExact(config.getTextMaxLineLength().toBytes()); + } + + private static Configuration readConfiguration(List resourcePaths) + { + Configuration result = new Configuration(false); + + for (String resourcePath : resourcePaths) { + Configuration resourceProperties = new Configuration(false); + resourceProperties.addResource(new Path(resourcePath)); + copy(resourceProperties, result); + } + + return result; + } + + public void updateConfiguration(Configuration config) + { + copy(resourcesConfiguration, config); + + // this is to prevent dfs client from doing reverse DNS lookups to determine whether nodes are rack local + config.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, NoOpDNSToSwitchMapping.class, DNSToSwitchMapping.class); + + if (socksProxy != null) { + config.setClass(HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY, SocksSocketFactory.class, SocketFactory.class); + config.set(HADOOP_SOCKS_SERVER_KEY, socksProxy.toString()); + } + + if (domainSocketPath != null) { + config.setStrings(DFS_DOMAIN_SOCKET_PATH_KEY, domainSocketPath); + } + + // only enable short circuit reads if domain socket path is properly configured + if (!config.get(DFS_DOMAIN_SOCKET_PATH_KEY, "").trim().isEmpty()) { + config.setBooleanIfUnset(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); + } + + config.setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, toIntExact(dfsTimeout.toMillis())); + config.setInt(IPC_PING_INTERVAL_KEY, toIntExact(ipcPingInterval.toMillis())); + config.setInt(IPC_CLIENT_CONNECT_TIMEOUT_KEY, toIntExact(dfsConnectTimeout.toMillis())); + config.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, dfsConnectMaxRetries); + + if (isHdfsWireEncryptionEnabled) { + config.set(HADOOP_RPC_PROTECTION, "privacy"); + config.setBoolean("dfs.encrypt.data.transfer", true); + } + + config.setInt("fs.cache.max-size", fileSystemMaxCacheSize); + + config.setInt(LineRecordReader.MAX_LINE_LENGTH, textMaxLineLength); + } + + public static class NoOpDNSToSwitchMapping + implements DNSToSwitchMapping + { + @Override + public List resolve(List names) + { + // dfs client expects an empty list as an indication that the host->switch mapping for the given names are not known + return ImmutableList.of(); + } + + @Override + public void reloadCachedMappings() + { + // no-op + } + + @Override + public void reloadCachedMappings(List names) + { + // no-op + } + } + } +} diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardCleaner.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardCleaner.java index 249370a5bc9e2..5faa855898de9 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardCleaner.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardCleaner.java @@ -16,6 +16,7 @@ import com.facebook.airlift.log.Logger; import com.facebook.airlift.stats.CounterStat; import com.facebook.presto.raptor.backup.BackupStore; +import com.facebook.presto.raptor.filesystem.FileSystemContext; import com.facebook.presto.raptor.storage.OrcDataEnvironment; import com.facebook.presto.raptor.storage.StorageService; import com.facebook.presto.raptor.util.DaoSupplier; @@ -25,7 +26,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import io.airlift.units.Duration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.weakref.jmx.Managed; import org.weakref.jmx.Nested; @@ -86,7 +86,7 @@ public class ShardCleaner private final Duration backupCleanTime; private final ScheduledExecutorService scheduler; private final ExecutorService backupExecutor; - private final FileSystem fileSystem; + private final OrcDataEnvironment orcDataEnvironment; private final Duration maxCompletedTransactionAge; private final AtomicBoolean started = new AtomicBoolean(); @@ -160,7 +160,7 @@ public ShardCleaner( this.backupCleanTime = requireNonNull(backupCleanTime, "backupCleanTime is null"); this.scheduler = newScheduledThreadPool(2, daemonThreadsNamed("shard-cleaner-%s")); this.backupExecutor = newFixedThreadPool(backupDeletionThreads, daemonThreadsNamed("shard-cleaner-backup-%s")); - this.fileSystem = requireNonNull(environment, "environment is null").getFileSystem(); + this.orcDataEnvironment = requireNonNull(environment, "environment is null"); this.maxCompletedTransactionAge = requireNonNull(maxCompletedTransactionAge, "maxCompletedTransactionAge is null"); } @@ -526,6 +526,6 @@ private static Timestamp maxTimestamp(Duration duration) private void deleteFile(Path file) throws IOException { - fileSystem.delete(file, false); + orcDataEnvironment.getFileSystem(FileSystemContext.DEFAULT_RAPTOR_CONTEXT).delete(file, false); } } diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcDataEnvironment.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcDataEnvironment.java index a93f71cde6189..a76eeaee30930 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcDataEnvironment.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcDataEnvironment.java @@ -15,6 +15,7 @@ import com.facebook.presto.orc.OrcDataSink; import com.facebook.presto.orc.OrcDataSource; +import com.facebook.presto.raptor.filesystem.FileSystemContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -22,11 +23,11 @@ public interface OrcDataEnvironment { - FileSystem getFileSystem(); + FileSystem getFileSystem(FileSystemContext context); - OrcDataSource createOrcDataSource(Path path, ReaderAttributes readerAttributes) + OrcDataSource createOrcDataSource(FileSystem fileSystem, Path path, ReaderAttributes readerAttributes) throws IOException; - OrcDataSink createOrcDataSink(Path path) + OrcDataSink createOrcDataSink(FileSystem fileSystem, Path path) throws IOException; } diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcFileRewriter.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcFileRewriter.java index ba1a26a1fbc86..ce6201bbeaa45 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcFileRewriter.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcFileRewriter.java @@ -90,11 +90,11 @@ public final class OrcFileRewriter this.orcFileTailSource = requireNonNull(orcFileTailSource, "orcFileTailSource is null"); } - public OrcFileInfo rewrite(Map allColumnTypes, Path input, Path output, BitSet rowsToDelete) + public OrcFileInfo rewrite(FileSystem fileSystem, Map allColumnTypes, Path input, Path output, BitSet rowsToDelete) throws IOException { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(FileSystem.class.getClassLoader()); - OrcDataSource dataSource = orcDataEnvironment.createOrcDataSource(input, readerAttributes)) { + OrcDataSource dataSource = orcDataEnvironment.createOrcDataSource(fileSystem, input, readerAttributes)) { OrcReader reader = new OrcReader( dataSource, ORC, @@ -162,7 +162,7 @@ public OrcFileInfo rewrite(Map allColumnTypes, Path input, Path ou } try (Closer recordReader = closer(reader.createBatchRecordReader(readerColumns, TRUE, DEFAULT_STORAGE_TIMEZONE, newSimpleAggregatedMemoryContext(), INITIAL_BATCH_SIZE), OrcBatchRecordReader::close); Closer writer = closer(new OrcWriter( - orcDataEnvironment.createOrcDataSink(output), + orcDataEnvironment.createOrcDataSink(fileSystem, output), writerColumnIds, writerStorageTypes, ORC, diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcStorageManager.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcStorageManager.java index abb8ba207d4e6..5ea4336c176c9 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcStorageManager.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcStorageManager.java @@ -30,6 +30,7 @@ import com.facebook.presto.raptor.RaptorConnectorId; import com.facebook.presto.raptor.backup.BackupManager; import com.facebook.presto.raptor.backup.BackupStore; +import com.facebook.presto.raptor.filesystem.FileSystemContext; import com.facebook.presto.raptor.metadata.ColumnInfo; import com.facebook.presto.raptor.metadata.ColumnStats; import com.facebook.presto.raptor.metadata.ShardDelta; @@ -159,7 +160,6 @@ public class OrcStorageManager private final ExecutorService deletionExecutor; private final ExecutorService commitExecutor; private final OrcDataEnvironment orcDataEnvironment; - private final FileSystem fileSystem; private final OrcFileRewriter fileRewriter; private final OrcWriterStats stats = new OrcWriterStats(); private final OrcFileTailSource orcFileTailSource; @@ -239,9 +239,8 @@ public OrcStorageManager( this.compression = requireNonNull(compression, "compression is null"); this.orcOptimizedWriterStage = requireNonNull(orcOptimizedWriterStage, "orcOptimizedWriterStage is null"); this.orcDataEnvironment = requireNonNull(orcDataEnvironment, "orcDataEnvironment is null"); - this.fileSystem = requireNonNull(orcDataEnvironment.getFileSystem(), "fileSystem is null"); - this.fileRewriter = new OrcFileRewriter(readerAttributes, orcOptimizedWriterStage.equals(ENABLED_AND_VALIDATED), stats, typeManager, orcDataEnvironment, compression, orcFileTailSource); this.orcFileTailSource = requireNonNull(orcFileTailSource, "orcFileTailSource is null"); + this.fileRewriter = new OrcFileRewriter(readerAttributes, orcOptimizedWriterStage.equals(ENABLED_AND_VALIDATED), stats, typeManager, orcDataEnvironment, compression, orcFileTailSource); } @PreDestroy @@ -253,6 +252,7 @@ public void shutdown() @Override public ConnectorPageSource getPageSource( + FileSystemContext fileSystemContext, UUID shardUuid, OptionalInt bucketNumber, List columnIds, @@ -262,7 +262,8 @@ public ConnectorPageSource getPageSource( OptionalLong transactionId, Optional> allColumnTypes) { - OrcDataSource dataSource = openShard(shardUuid, readerAttributes); + FileSystem fileSystem = orcDataEnvironment.getFileSystem(fileSystemContext); + OrcDataSource dataSource = openShard(fileSystem, shardUuid, readerAttributes); AggregatedMemoryContext systemMemoryUsage = newSimpleAggregatedMemoryContext(); @@ -296,7 +297,7 @@ public ConnectorPageSource getPageSource( Optional shardRewriter = Optional.empty(); if (transactionId.isPresent()) { checkState(allColumnTypes.isPresent()); - shardRewriter = Optional.of(createShardRewriter(transactionId.getAsLong(), bucketNumber, shardUuid, allColumnTypes.get())); + shardRewriter = Optional.of(createShardRewriter(fileSystem, transactionId.getAsLong(), bucketNumber, shardUuid, allColumnTypes.get())); } return new OrcPageSource(shardRewriter, recordReader, dataSource, columnIds, columnTypes, columnIndexes.build(), shardUuid, bucketNumber, systemMemoryUsage); @@ -326,21 +327,27 @@ private static int toSpecialIndex(long columnId) } @Override - public StoragePageSink createStoragePageSink(long transactionId, OptionalInt bucketNumber, List columnIds, List columnTypes, boolean checkSpace) + public StoragePageSink createStoragePageSink( + FileSystemContext fileSystemContext, + long transactionId, + OptionalInt bucketNumber, + List columnIds, + List columnTypes, + boolean checkSpace) { if (checkSpace && storageService.getAvailableBytes() < minAvailableSpace.toBytes()) { throw new PrestoException(RAPTOR_LOCAL_DISK_FULL, "Local disk is full on node " + nodeId); } - return new OrcStoragePageSink(transactionId, columnIds, columnTypes, bucketNumber); + return new OrcStoragePageSink(orcDataEnvironment.getFileSystem(fileSystemContext), transactionId, columnIds, columnTypes, bucketNumber); } - private ShardRewriter createShardRewriter(long transactionId, OptionalInt bucketNumber, UUID shardUuid, Map columns) + private ShardRewriter createShardRewriter(FileSystem fileSystem, long transactionId, OptionalInt bucketNumber, UUID shardUuid, Map columns) { return rowsToDelete -> { if (rowsToDelete.isEmpty()) { return completedFuture(ImmutableList.of()); } - return supplyAsync(() -> rewriteShard(transactionId, bucketNumber, shardUuid, columns, rowsToDelete), deletionExecutor); + return supplyAsync(() -> rewriteShard(fileSystem, transactionId, bucketNumber, shardUuid, columns, rowsToDelete), deletionExecutor); }; } @@ -350,21 +357,11 @@ private void writeShard(UUID shardUuid) throw new PrestoException(RAPTOR_ERROR, "Backup does not exist after write"); } - Path stagingFile = storageService.getStagingFile(shardUuid); - Path storageFile = storageService.getStorageFile(shardUuid); - - storageService.createParents(storageFile); - - try { - fileSystem.rename(stagingFile, storageFile); - } - catch (IOException e) { - throw new PrestoException(RAPTOR_ERROR, "Failed to move shard file", e); - } + storageService.promoteFromStagingToStorage(shardUuid); } @VisibleForTesting - OrcDataSource openShard(UUID shardUuid, ReaderAttributes readerAttributes) + OrcDataSource openShard(FileSystem fileSystem, UUID shardUuid, ReaderAttributes readerAttributes) { Path file = storageService.getStorageFile(shardUuid); @@ -397,28 +394,33 @@ OrcDataSource openShard(UUID shardUuid, ReaderAttributes readerAttributes) } try { - return orcDataEnvironment.createOrcDataSource(file, readerAttributes); + return orcDataEnvironment.createOrcDataSource(fileSystem, file, readerAttributes); } catch (IOException e) { throw new PrestoException(RAPTOR_ERROR, "Failed to open shard file: " + file, e); } } - private ShardInfo createShardInfo(UUID shardUuid, OptionalInt bucketNumber, Path file, Set nodes, long rowCount, long uncompressedSize) + private ShardInfo createShardInfo(FileSystem fileSystem, UUID shardUuid, OptionalInt bucketNumber, Path file, Set nodes, long rowCount, long uncompressedSize) { try { - return new ShardInfo(shardUuid, bucketNumber, nodes, computeShardStats(file), rowCount, fileSystem.getFileStatus(file).getLen(), uncompressedSize, xxhash64(fileSystem, file)); + return new ShardInfo(shardUuid, bucketNumber, nodes, computeShardStats(fileSystem, file), rowCount, fileSystem.getFileStatus(file).getLen(), uncompressedSize, xxhash64(fileSystem, file)); } catch (IOException e) { throw new PrestoException(RAPTOR_ERROR, "Failed to get file status: " + file, e); } } - private List computeShardStats(Path file) + private List computeShardStats(FileSystem fileSystem, Path file) { - try (OrcDataSource dataSource = orcDataEnvironment.createOrcDataSource(file, defaultReaderAttributes)) { - OrcReader reader = new OrcReader(dataSource, ORC, defaultReaderAttributes.getMaxMergeDistance(), defaultReaderAttributes.getTinyStripeThreshold(), HUGE_MAX_READ_BLOCK_SIZE, orcFileTailSource); - + try (OrcDataSource dataSource = orcDataEnvironment.createOrcDataSource(fileSystem, file, defaultReaderAttributes)) { + OrcReader reader = new OrcReader( + dataSource, + ORC, + defaultReaderAttributes.getMaxMergeDistance(), + defaultReaderAttributes.getTinyStripeThreshold(), + HUGE_MAX_READ_BLOCK_SIZE, + orcFileTailSource); ImmutableList.Builder list = ImmutableList.builder(); for (ColumnInfo info : getColumnInfo(reader)) { computeColumnStats(reader, info.getColumnId(), info.getType()).ifPresent(list::add); @@ -431,7 +433,7 @@ private List computeShardStats(Path file) } @VisibleForTesting - Collection rewriteShard(long transactionId, OptionalInt bucketNumber, UUID shardUuid, Map columns, BitSet rowsToDelete) + Collection rewriteShard(FileSystem fileSystem, long transactionId, OptionalInt bucketNumber, UUID shardUuid, Map columns, BitSet rowsToDelete) { if (rowsToDelete.isEmpty()) { return ImmutableList.of(); @@ -441,7 +443,7 @@ Collection rewriteShard(long transactionId, OptionalInt bucketNumber, UUI Path input = storageService.getStorageFile(shardUuid); Path output = storageService.getStagingFile(newShardUuid); - OrcFileInfo info = rewriteFile(columns, input, output, rowsToDelete); + OrcFileInfo info = rewriteFile(fileSystem, columns, input, output, rowsToDelete); long rowCount = info.getRowCount(); if (rowCount == 0) { @@ -456,7 +458,7 @@ Collection rewriteShard(long transactionId, OptionalInt bucketNumber, UUI Set nodes = ImmutableSet.of(nodeId); long uncompressedSize = info.getUncompressedSize(); - ShardInfo shard = createShardInfo(newShardUuid, bucketNumber, output, nodes, rowCount, uncompressedSize); + ShardInfo shard = createShardInfo(fileSystem, newShardUuid, bucketNumber, output, nodes, rowCount, uncompressedSize); writeShard(newShardUuid); @@ -470,10 +472,10 @@ private static Collection shardDelta(UUID oldShardUuid, Optional columns, Path input, Path output, BitSet rowsToDelete) + private OrcFileInfo rewriteFile(FileSystem fileSystem, Map columns, Path input, Path output, BitSet rowsToDelete) { try { - return fileRewriter.rewrite(columns, input, output, rowsToDelete); + return fileRewriter.rewrite(fileSystem, columns, input, output, rowsToDelete); } catch (IOException e) { throw new PrestoException(RAPTOR_ERROR, "Failed to rewrite shard file: " + input, e); @@ -592,17 +594,20 @@ private class OrcStoragePageSink private final List stagingFiles = new ArrayList<>(); private final List shards = new ArrayList<>(); private final List> futures = new ArrayList<>(); + private final FileSystem fileSystem; private boolean committed; private FileWriter writer; private UUID shardUuid; public OrcStoragePageSink( + FileSystem fileSystem, long transactionId, List columnIds, List columnTypes, OptionalInt bucketNumber) { + this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); this.transactionId = transactionId; this.columnIds = ImmutableList.copyOf(requireNonNull(columnIds, "columnIds is null")); this.columnTypes = ImmutableList.copyOf(requireNonNull(columnTypes, "columnTypes is null")); @@ -652,7 +657,7 @@ public void flush() long rowCount = writer.getRowCount(); long uncompressedSize = writer.getUncompressedSize(); - shards.add(createShardInfo(shardUuid, bucketNumber, stagingFile, nodes, rowCount, uncompressedSize)); + shards.add(createShardInfo(fileSystem, shardUuid, bucketNumber, stagingFile, nodes, rowCount, uncompressedSize)); writer = null; shardUuid = null; @@ -723,7 +728,7 @@ private void createWriterIfNecessary() stagingFiles.add(stagingFile); OrcDataSink sink; try { - sink = orcDataEnvironment.createOrcDataSink(stagingFile); + sink = orcDataEnvironment.createOrcDataSink(fileSystem, stagingFile); } catch (IOException e) { throw new PrestoException(RAPTOR_ERROR, format("Failed to create staging file %s", stagingFile), e); diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/ShardEjector.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/ShardEjector.java index bd5b24a0aeaf0..3b2ec0c97d219 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/ShardEjector.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/ShardEjector.java @@ -49,7 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; -import static com.facebook.presto.raptor.storage.LocalOrcDataEnvironment.tryGetLocalFileSystem; +import static com.facebook.presto.raptor.filesystem.LocalOrcDataEnvironment.tryGetLocalFileSystem; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Maps.filterKeys; diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/ShardRecoveryManager.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/ShardRecoveryManager.java index 93a342d8cf827..7775d73603bba 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/ShardRecoveryManager.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/ShardRecoveryManager.java @@ -62,7 +62,7 @@ import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_ERROR; import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_RECOVERY_ERROR; import static com.facebook.presto.raptor.filesystem.FileSystemUtil.xxhash64; -import static com.facebook.presto.raptor.storage.LocalOrcDataEnvironment.tryGetLocalFileSystem; +import static com.facebook.presto.raptor.filesystem.LocalOrcDataEnvironment.tryGetLocalFileSystem; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkState; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageManager.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageManager.java index 47e5bd981e3be..f2147cd285a38 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageManager.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageManager.java @@ -14,6 +14,7 @@ package com.facebook.presto.raptor.storage; import com.facebook.presto.raptor.RaptorColumnHandle; +import com.facebook.presto.raptor.filesystem.FileSystemContext; import com.facebook.presto.spi.ConnectorPageSource; import com.facebook.presto.spi.predicate.TupleDomain; import com.facebook.presto.spi.type.Type; @@ -28,6 +29,7 @@ public interface StorageManager { default ConnectorPageSource getPageSource( + FileSystemContext fileSystemContext, UUID shardUuid, OptionalInt bucketNumber, List columnIds, @@ -35,10 +37,11 @@ default ConnectorPageSource getPageSource( TupleDomain effectivePredicate, ReaderAttributes readerAttributes) { - return getPageSource(shardUuid, bucketNumber, columnIds, columnTypes, effectivePredicate, readerAttributes, OptionalLong.empty(), Optional.empty()); + return getPageSource(fileSystemContext, shardUuid, bucketNumber, columnIds, columnTypes, effectivePredicate, readerAttributes, OptionalLong.empty(), Optional.empty()); } ConnectorPageSource getPageSource( + FileSystemContext fileSystemContext, UUID shardUuid, OptionalInt bucketNumber, List columnIds, @@ -49,6 +52,7 @@ ConnectorPageSource getPageSource( Optional> allColumnTypes); StoragePageSink createStoragePageSink( + FileSystemContext fileSystemContext, long transactionId, OptionalInt bucketNumber, List columnIds, diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageManagerConfig.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageManagerConfig.java index 6f904ebc81ce8..01fc6600f4b4e 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageManagerConfig.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageManagerConfig.java @@ -30,7 +30,7 @@ import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; -import java.io.File; +import java.net.URI; import java.util.TimeZone; import java.util.concurrent.TimeUnit; @@ -44,7 +44,8 @@ @DefunctConfig("storage.backup-directory") public class StorageManagerConfig { - private File dataDirectory; + private URI dataDirectory; + private String fileSystemProvider = "file"; private DataSize minAvailableSpace = new DataSize(0, BYTE); private Duration shardRecoveryTimeout = new Duration(30, TimeUnit.SECONDS); private Duration missingShardDiscoveryInterval = new Duration(5, TimeUnit.MINUTES); @@ -73,16 +74,30 @@ public class StorageManagerConfig private int maxAllowedFilesPerWriter = Integer.MAX_VALUE; @NotNull - public File getDataDirectory() + public URI getDataDirectory() { return dataDirectory; } @Config("storage.data-directory") - @ConfigDescription("Base directory to use for storing shard data") - public StorageManagerConfig setDataDirectory(File dataDirectory) + @ConfigDescription("Base URI to use for storing shard data") + public StorageManagerConfig setDataDirectory(URI dataURI) { - this.dataDirectory = dataDirectory; + this.dataDirectory = dataURI; + return this; + } + + @NotNull + public String getFileSystemProvider() + { + return fileSystemProvider; + } + + @Config("storage.file-system") + @ConfigDescription("File system used for storage (e.g. file, hdfs)") + public StorageManagerConfig setFileSystemProvider(String fileSystemProvider) + { + this.fileSystemProvider = fileSystemProvider; return this; } diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageModule.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageModule.java index c660b6e15e3cf..6208734d3e1e0 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageModule.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageModule.java @@ -76,8 +76,6 @@ public void configure(Binder binder) binder.bind(Ticker.class).toInstance(Ticker.systemTicker()); binder.bind(StorageManager.class).to(OrcStorageManager.class).in(Scopes.SINGLETON); - binder.bind(StorageService.class).to(FileStorageService.class).in(Scopes.SINGLETON); - binder.bind(OrcDataEnvironment.class).to(LocalOrcDataEnvironment.class).in(Scopes.SINGLETON); binder.bind(ShardManager.class).to(DatabaseShardManager.class).in(Scopes.SINGLETON); binder.bind(ShardRecorder.class).to(DatabaseShardRecorder.class).in(Scopes.SINGLETON); binder.bind(DatabaseShardManager.class).in(Scopes.SINGLETON); diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageService.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageService.java index dbe432d26b060..98bdd62307e8f 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageService.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageService.java @@ -33,4 +33,6 @@ public interface StorageService Path getQuarantineFile(UUID shardUuid); Set getStorageShards(); + + void promoteFromStagingToStorage(UUID shardUuid); } diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardCompactor.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardCompactor.java index f7e31903a5f6e..0ceebd0c76668 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardCompactor.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardCompactor.java @@ -15,6 +15,7 @@ import com.facebook.airlift.stats.CounterStat; import com.facebook.airlift.stats.DistributionStat; +import com.facebook.presto.raptor.filesystem.FileSystemContext; import com.facebook.presto.raptor.metadata.ColumnInfo; import com.facebook.presto.raptor.metadata.ShardInfo; import com.facebook.presto.raptor.storage.ReaderAttributes; @@ -75,7 +76,7 @@ public List compact(long transactionId, OptionalInt bucketNumber, Set List columnIds = columns.stream().map(ColumnInfo::getColumnId).collect(toList()); List columnTypes = columns.stream().map(ColumnInfo::getType).collect(toList()); - StoragePageSink storagePageSink = storageManager.createStoragePageSink(transactionId, bucketNumber, columnIds, columnTypes, false); + StoragePageSink storagePageSink = storageManager.createStoragePageSink(FileSystemContext.DEFAULT_RAPTOR_CONTEXT, transactionId, bucketNumber, columnIds, columnTypes, false); List shardInfos; try { @@ -94,7 +95,7 @@ private List compact(StoragePageSink storagePageSink, OptionalInt buc throws IOException { for (UUID uuid : uuids) { - try (ConnectorPageSource pageSource = storageManager.getPageSource(uuid, bucketNumber, columnIds, columnTypes, TupleDomain.all(), readerAttributes)) { + try (ConnectorPageSource pageSource = storageManager.getPageSource(FileSystemContext.DEFAULT_RAPTOR_CONTEXT, uuid, bucketNumber, columnIds, columnTypes, TupleDomain.all(), readerAttributes)) { while (!pageSource.isFinished()) { Page page = pageSource.getNextPage(); if (isNullOrEmptyPage(page)) { @@ -127,10 +128,10 @@ public List compactSorted(long transactionId, OptionalInt bucketNumbe .collect(toList()); Queue rowSources = new PriorityQueue<>(); - StoragePageSink outputPageSink = storageManager.createStoragePageSink(transactionId, bucketNumber, columnIds, columnTypes, false); + StoragePageSink outputPageSink = storageManager.createStoragePageSink(FileSystemContext.DEFAULT_RAPTOR_CONTEXT, transactionId, bucketNumber, columnIds, columnTypes, false); try { for (UUID uuid : uuids) { - ConnectorPageSource pageSource = storageManager.getPageSource(uuid, bucketNumber, columnIds, columnTypes, TupleDomain.all(), readerAttributes); + ConnectorPageSource pageSource = storageManager.getPageSource(FileSystemContext.DEFAULT_RAPTOR_CONTEXT, uuid, bucketNumber, columnIds, columnTypes, TupleDomain.all(), readerAttributes); SortedPageSource rowSource = new SortedPageSource(pageSource, columnTypes, sortIndexes, sortOrders); rowSources.add(rowSource); } diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/RaptorQueryRunner.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/RaptorQueryRunner.java index f29bece861860..9b0848e302665 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/RaptorQueryRunner.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/RaptorQueryRunner.java @@ -43,16 +43,17 @@ public final class RaptorQueryRunner private RaptorQueryRunner() {} - public static DistributedQueryRunner createRaptorQueryRunner(Map extraProperties, boolean loadTpch, boolean bucketed) + public static DistributedQueryRunner createRaptorQueryRunner(Map extraProperties, boolean loadTpch, boolean bucketed, boolean useHdfs) throws Exception { - return createRaptorQueryRunner(extraProperties, loadTpch, bucketed, ImmutableMap.of()); + return createRaptorQueryRunner(extraProperties, loadTpch, bucketed, useHdfs, ImmutableMap.of()); } public static DistributedQueryRunner createRaptorQueryRunner( Map extraProperties, boolean loadTpch, boolean bucketed, + boolean useHdfs, Map extraRaptorProperties) throws Exception { @@ -63,16 +64,26 @@ public static DistributedQueryRunner createRaptorQueryRunner( queryRunner.installPlugin(new RaptorPlugin()); File baseDir = queryRunner.getCoordinator().getBaseDataDir().toFile(); - Map raptorProperties = ImmutableMap.builder() - .putAll(extraRaptorProperties) + + ImmutableMap.Builder builder = ImmutableMap.builder(); + builder.putAll(extraRaptorProperties) .put("metadata.db.type", "h2") .put("metadata.db.connections.max", "100") .put("metadata.db.filename", new File(baseDir, "db").getAbsolutePath()) - .put("storage.data-directory", new File(baseDir, "data").getAbsolutePath()) - .put("storage.max-shard-rows", "2000") - .put("backup.provider", "file") - .put("backup.directory", new File(baseDir, "backup").getAbsolutePath()) - .build(); + .put("storage.max-shard-rows", "2000"); + + if (useHdfs) { + builder.put("storage.file-system", "hdfs") + .put("storage.data-directory", queryRunner.getCoordinator().getBaseDataDir().resolve("hive_data").toFile().toURI().toString()); + } + else { + builder.put("backup.provider", "file") + .put("backup.directory", new File(baseDir, "backup").getAbsolutePath()) + .put("storage.file-system", "file") + .put("storage.data-directory", new File(baseDir, "data").toURI().toString()); + } + + Map raptorProperties = builder.build(); queryRunner.createCatalog("raptor", "raptor", raptorProperties); @@ -145,7 +156,7 @@ public static void main(String[] args) { Logging.initialize(); Map properties = ImmutableMap.of("http-server.http.port", "8080"); - DistributedQueryRunner queryRunner = createRaptorQueryRunner(properties, false, false); + DistributedQueryRunner queryRunner = createRaptorQueryRunner(properties, true, false, false); Thread.sleep(10); Logger log = Logger.get(RaptorQueryRunner.class); log.info("======== SERVER STARTED ========"); diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/TestRaptorPlugin.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/TestRaptorPlugin.java index 2e5c94fc101fa..a09a2897ca3bd 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/TestRaptorPlugin.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/TestRaptorPlugin.java @@ -45,7 +45,7 @@ public void testPlugin() Map config = ImmutableMap.builder() .put("metadata.db.type", "h2") .put("metadata.db.filename", tmpDir.getAbsolutePath()) - .put("storage.data-directory", tmpDir.getAbsolutePath()) + .put("storage.data-directory", tmpDir.toURI().toString()) .build(); factory.create("test", config, new TestingConnectorContext()); diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/backup/TestBackupManager.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/backup/TestBackupManager.java index 598ee6fdde6c0..2edf1f9325f61 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/backup/TestBackupManager.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/backup/TestBackupManager.java @@ -13,9 +13,9 @@ */ package com.facebook.presto.raptor.backup; +import com.facebook.presto.raptor.filesystem.LocalFileStorageService; +import com.facebook.presto.raptor.filesystem.LocalOrcDataEnvironment; import com.facebook.presto.raptor.storage.BackupStats; -import com.facebook.presto.raptor.storage.FileStorageService; -import com.facebook.presto.raptor.storage.LocalOrcDataEnvironment; import com.facebook.presto.spi.PrestoException; import com.google.common.io.Files; import org.apache.hadoop.fs.Path; @@ -56,7 +56,7 @@ public class TestBackupManager private File temporary; private BackupStore backupStore; - private FileStorageService storageService; + private LocalFileStorageService storageService; private BackupManager backupManager; @BeforeMethod @@ -68,7 +68,7 @@ public void setup() fileStore.start(); backupStore = new TestingBackupStore(fileStore); - storageService = new FileStorageService(new LocalOrcDataEnvironment(), new File(temporary, "data")); + storageService = new LocalFileStorageService(new LocalOrcDataEnvironment(), new File(temporary, "data").toURI()); storageService.start(); backupManager = new BackupManager(Optional.of(backupStore), storageService, new LocalOrcDataEnvironment(), 5); diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorDistributedQueries.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorDistributedQueries.java index 62467a93d98fe..7b6be430cb662 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorDistributedQueries.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorDistributedQueries.java @@ -24,7 +24,7 @@ public class TestRaptorDistributedQueries @SuppressWarnings("unused") public TestRaptorDistributedQueries() { - this(() -> createRaptorQueryRunner(ImmutableMap.of(), true, false, ImmutableMap.of("storage.orc.optimized-writer-stage", "ENABLED_AND_VALIDATED"))); + this(() -> createRaptorQueryRunner(ImmutableMap.of(), true, false, false, ImmutableMap.of("storage.orc.optimized-writer-stage", "ENABLED_AND_VALIDATED"))); } protected TestRaptorDistributedQueries(QueryRunnerSupplier supplier) diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorDistributedQueriesBucketed.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorDistributedQueriesBucketed.java index e1dec16bc3a53..413df5135347d 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorDistributedQueriesBucketed.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorDistributedQueriesBucketed.java @@ -22,7 +22,7 @@ public class TestRaptorDistributedQueriesBucketed { public TestRaptorDistributedQueriesBucketed() { - super(() -> createRaptorQueryRunner(ImmutableMap.of(), true, true, ImmutableMap.of("storage.orc.optimized-writer-stage", "ENABLED_AND_VALIDATED"))); + super(() -> createRaptorQueryRunner(ImmutableMap.of(), true, true, false, ImmutableMap.of("storage.orc.optimized-writer-stage", "ENABLED_AND_VALIDATED"))); } @Override diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorIntegrationSmokeTest.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorIntegrationSmokeTest.java index f8018f6cfef0b..6e191dbe56835 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorIntegrationSmokeTest.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorIntegrationSmokeTest.java @@ -66,6 +66,7 @@ public TestRaptorIntegrationSmokeTest() ImmutableMap.of(), true, false, + false, ImmutableMap.of("storage.orc.optimized-writer-stage", "ENABLED_AND_VALIDATED"))); } diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorIntegrationSmokeTestBucketed.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorIntegrationSmokeTestBucketed.java index 800fc2f3fdccb..d2863f3f38f23 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorIntegrationSmokeTestBucketed.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorIntegrationSmokeTestBucketed.java @@ -23,7 +23,7 @@ public class TestRaptorIntegrationSmokeTestBucketed { public TestRaptorIntegrationSmokeTestBucketed() { - super(() -> createRaptorQueryRunner(ImmutableMap.of(), true, true, ImmutableMap.of("storage.orc.optimized-writer-stage", "ENABLED_AND_VALIDATED"))); + super(() -> createRaptorQueryRunner(ImmutableMap.of(), true, true, false, ImmutableMap.of("storage.orc.optimized-writer-stage", "ENABLED_AND_VALIDATED"))); } @Test diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorIntegrationSmokeTestHdfs.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorIntegrationSmokeTestHdfs.java new file mode 100644 index 0000000000000..5b74cf271f600 --- /dev/null +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorIntegrationSmokeTestHdfs.java @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.raptor.integration; + +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import static com.facebook.presto.raptor.RaptorQueryRunner.createRaptorQueryRunner; + +public class TestRaptorIntegrationSmokeTestHdfs + extends TestRaptorIntegrationSmokeTest +{ + public TestRaptorIntegrationSmokeTestHdfs() + { + super(() -> createRaptorQueryRunner(ImmutableMap.of(), true, true, true, ImmutableMap.of())); + } + + @Test + public void testShardsSystemTableBucketNumber() + { + assertQuery("" + + "SELECT count(DISTINCT bucket_number)\n" + + "FROM system.shards\n" + + "WHERE table_schema = 'tpch'\n" + + " AND table_name = 'orders'", + "SELECT 25"); + } +} diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorIntegrationSmokeTestMySql.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorIntegrationSmokeTestMySql.java index 5d02d32cb26fa..eba1e9c8df190 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorIntegrationSmokeTestMySql.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/integration/TestRaptorIntegrationSmokeTestMySql.java @@ -74,7 +74,7 @@ private static DistributedQueryRunner createRaptorMySqlQueryRunner(String mysqlU Map raptorProperties = ImmutableMap.builder() .put("metadata.db.type", "mysql") .put("metadata.db.url", mysqlUrl) - .put("storage.data-directory", new File(baseDir, "data").getAbsolutePath()) + .put("storage.data-directory", new File(baseDir, "data").toURI().toString()) .put("storage.max-shard-rows", "2000") .put("backup.provider", "file") .put("backup.directory", new File(baseDir, "backup").getAbsolutePath()) diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestShardCleaner.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestShardCleaner.java index b1a12855b9cfd..ce3868114521d 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestShardCleaner.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestShardCleaner.java @@ -16,8 +16,8 @@ import com.facebook.airlift.testing.TestingTicker; import com.facebook.presto.raptor.backup.BackupStore; import com.facebook.presto.raptor.backup.FileBackupStore; -import com.facebook.presto.raptor.storage.FileStorageService; -import com.facebook.presto.raptor.storage.LocalOrcDataEnvironment; +import com.facebook.presto.raptor.filesystem.LocalFileStorageService; +import com.facebook.presto.raptor.filesystem.LocalOrcDataEnvironment; import com.facebook.presto.raptor.storage.StorageService; import com.facebook.presto.raptor.util.DaoSupplier; import com.facebook.presto.raptor.util.UuidUtil.UuidArgumentFactory; @@ -79,7 +79,7 @@ public void setup() temporary = createTempDir(); File directory = new File(temporary, "data"); - storageService = new FileStorageService(new LocalOrcDataEnvironment(), directory); + storageService = new LocalFileStorageService(new LocalOrcDataEnvironment(), directory.toURI()); storageService.start(); File backupDirectory = new File(temporary, "backup"); diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/security/TestRaptorFileBasedSecurity.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/security/TestRaptorFileBasedSecurity.java index 6ed3076905731..8e4a7e1e3144b 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/security/TestRaptorFileBasedSecurity.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/security/TestRaptorFileBasedSecurity.java @@ -39,6 +39,7 @@ public void setUp() ImmutableMap.of(), true, false, + false, ImmutableMap.of("security.config-file", path, "raptor.security", "file")); } diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/security/TestRaptorReadOnlySecurity.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/security/TestRaptorReadOnlySecurity.java index fba26fb2ec90f..af424a162a5d7 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/security/TestRaptorReadOnlySecurity.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/security/TestRaptorReadOnlySecurity.java @@ -29,7 +29,7 @@ public class TestRaptorReadOnlySecurity public void setUp() throws Exception { - queryRunner = createRaptorQueryRunner(ImmutableMap.of(), false, false, ImmutableMap.of("raptor.security", "read-only")); + queryRunner = createRaptorQueryRunner(ImmutableMap.of(), false, false, false, ImmutableMap.of("raptor.security", "read-only")); } @AfterClass(alwaysRun = true) diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestFileStorageService.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestLocalFileStorageService.java similarity index 91% rename from presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestFileStorageService.java rename to presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestLocalFileStorageService.java index 3bc8023441f75..d2efac0653faa 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestFileStorageService.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestLocalFileStorageService.java @@ -13,6 +13,8 @@ */ package com.facebook.presto.raptor.storage; +import com.facebook.presto.raptor.filesystem.LocalFileStorageService; +import com.facebook.presto.raptor.filesystem.LocalOrcDataEnvironment; import com.google.common.collect.ImmutableSet; import org.apache.hadoop.fs.Path; import org.testng.annotations.AfterMethod; @@ -23,7 +25,7 @@ import java.util.Set; import java.util.UUID; -import static com.facebook.presto.raptor.storage.FileStorageService.getFileSystemPath; +import static com.facebook.presto.raptor.filesystem.LocalFileStorageService.getFileSystemPath; import static com.google.common.io.Files.createTempDir; import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; @@ -36,16 +38,16 @@ import static org.testng.FileAssert.assertFile; @Test(singleThreaded = true) -public class TestFileStorageService +public class TestLocalFileStorageService { private File temporary; - private FileStorageService store; + private LocalFileStorageService store; @BeforeMethod public void setup() { temporary = createTempDir(); - store = new FileStorageService(new LocalOrcDataEnvironment(), temporary); + store = new LocalFileStorageService(new LocalOrcDataEnvironment(), temporary.toURI()); store.start(); } diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestOrcFileRewriter.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestOrcFileRewriter.java index 165fa2a6e25f2..371f6ce66423a 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestOrcFileRewriter.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestOrcFileRewriter.java @@ -22,6 +22,8 @@ import com.facebook.presto.orc.OrcWriterStats; import com.facebook.presto.orc.OutputStreamOrcDataSink; import com.facebook.presto.orc.StorageOrcFileTailSource; +import com.facebook.presto.raptor.filesystem.FileSystemContext; +import com.facebook.presto.raptor.filesystem.LocalOrcDataEnvironment; import com.facebook.presto.raptor.metadata.TableColumn; import com.facebook.presto.spi.ConnectorPageSource; import com.facebook.presto.spi.Page; @@ -40,6 +42,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; import io.airlift.units.DataSize; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; @@ -64,7 +67,7 @@ import static com.facebook.presto.RowPagesBuilder.rowPagesBuilder; import static com.facebook.presto.orc.OrcEncoding.ORC; import static com.facebook.presto.orc.metadata.CompressionKind.ZSTD; -import static com.facebook.presto.raptor.storage.FileStorageService.getFileSystemPath; +import static com.facebook.presto.raptor.filesystem.LocalFileStorageService.getFileSystemPath; import static com.facebook.presto.raptor.storage.OrcTestingUtil.createReader; import static com.facebook.presto.raptor.storage.OrcTestingUtil.fileOrcDataSource; import static com.facebook.presto.raptor.storage.TestOrcStorageManager.createOrcStorageManager; @@ -224,7 +227,8 @@ public void testRewrite() rowsToDelete.set(4); File newFile = new File(temporary, randomUUID().toString()); - OrcFileInfo info = createFileRewriter().rewrite(getColumnTypes(columnIds, columnTypes), path(file), path(newFile), rowsToDelete); + FileSystem fileSystem = new LocalOrcDataEnvironment().getFileSystem(FileSystemContext.DEFAULT_RAPTOR_CONTEXT); + OrcFileInfo info = createFileRewriter().rewrite(fileSystem, getColumnTypes(columnIds, columnTypes), path(file), path(newFile), rowsToDelete); assertEquals(info.getRowCount(), 2); assertBetweenInclusive(info.getUncompressedSize(), 94L, 118L * 2); @@ -339,7 +343,8 @@ public void testRewriteWithoutMetadata() rowsToDelete.set(1); File newFile = new File(temporary, randomUUID().toString()); - OrcFileInfo info = createFileRewriter().rewrite(getColumnTypes(columnIds, columnTypes), path(file), path(newFile), rowsToDelete); + FileSystem fileSystem = new LocalOrcDataEnvironment().getFileSystem(FileSystemContext.DEFAULT_RAPTOR_CONTEXT); + OrcFileInfo info = createFileRewriter().rewrite(fileSystem, getColumnTypes(columnIds, columnTypes), path(file), path(newFile), rowsToDelete); assertEquals(info.getRowCount(), 1); assertBetweenInclusive(info.getUncompressedSize(), 13L, 13L * 2); @@ -383,7 +388,8 @@ public void testRewriteAllRowsDeleted() rowsToDelete.set(1); File newFile = new File(temporary, randomUUID().toString()); - OrcFileInfo info = createFileRewriter().rewrite(getColumnTypes(columnIds, columnTypes), path(file), path(newFile), rowsToDelete); + FileSystem fileSystem = new LocalOrcDataEnvironment().getFileSystem(FileSystemContext.DEFAULT_RAPTOR_CONTEXT); + OrcFileInfo info = createFileRewriter().rewrite(fileSystem, getColumnTypes(columnIds, columnTypes), path(file), path(newFile), rowsToDelete); assertEquals(info.getRowCount(), 0); assertEquals(info.getUncompressedSize(), 0); @@ -405,7 +411,8 @@ public void testRewriteNoRowsDeleted() BitSet rowsToDelete = new BitSet(); File newFile = new File(temporary, randomUUID().toString()); - OrcFileInfo info = createFileRewriter().rewrite(getColumnTypes(columnIds, columnTypes), path(file), path(newFile), rowsToDelete); + FileSystem fileSystem = new LocalOrcDataEnvironment().getFileSystem(FileSystemContext.DEFAULT_RAPTOR_CONTEXT); + OrcFileInfo info = createFileRewriter().rewrite(fileSystem, getColumnTypes(columnIds, columnTypes), path(file), path(newFile), rowsToDelete); assertEquals(info.getRowCount(), 2); assertBetweenInclusive(info.getUncompressedSize(), 16L, 16L * 2); assertEquals(readAllBytes(newFile.toPath()), readAllBytes(file.toPath())); @@ -429,7 +436,8 @@ public void testUncompressedSize() } File newFile = new File(temporary, randomUUID().toString()); - OrcFileInfo info = createFileRewriter().rewrite(getColumnTypes(columnIds, columnTypes), path(file), path(newFile), new BitSet()); + FileSystem fileSystem = new LocalOrcDataEnvironment().getFileSystem(FileSystemContext.DEFAULT_RAPTOR_CONTEXT); + OrcFileInfo info = createFileRewriter().rewrite(fileSystem, getColumnTypes(columnIds, columnTypes), path(file), path(newFile), new BitSet()); assertEquals(info.getRowCount(), 3); assertBetweenInclusive(info.getUncompressedSize(), 55L, 55L * 2); } @@ -465,7 +473,9 @@ public void testRewriterDropThenAddDifferentColumns() // Add a column File newFile1 = new File(temporary, randomUUID().toString()); + FileSystem fileSystem = new LocalOrcDataEnvironment().getFileSystem(FileSystemContext.DEFAULT_RAPTOR_CONTEXT); OrcFileInfo info = createFileRewriter().rewrite( + fileSystem, getColumnTypes(ImmutableList.of(3L, 7L, 10L), ImmutableList.of(BIGINT, createVarcharType(20), DOUBLE)), path(file), path(newFile1), @@ -476,6 +486,7 @@ public void testRewriterDropThenAddDifferentColumns() // Drop a column File newFile2 = new File(temporary, randomUUID().toString()); info = createFileRewriter().rewrite( + fileSystem, getColumnTypes(ImmutableList.of(7L, 10L), ImmutableList.of(createVarcharType(20), DOUBLE)), path(newFile1), path(newFile2), @@ -489,6 +500,7 @@ public void testRewriterDropThenAddDifferentColumns() // Add a column with the different ID with different type File newFile3 = new File(temporary, randomUUID().toString()); info = createFileRewriter().rewrite( + fileSystem, getColumnTypes(ImmutableList.of(7L, 10L, 13L), ImmutableList.of(createVarcharType(20), DOUBLE, createVarcharType(5))), path(newFile2), path(newFile3), @@ -510,6 +522,7 @@ public void testRewriterDropThenAddDifferentColumns() rowsToDelete.set(1); rowsToDelete.set(3); info = createFileRewriter().rewrite( + fileSystem, getColumnTypes(ImmutableList.of(7L, 13L, 18L), ImmutableList.of(createVarcharType(20), createVarcharType(5), INTEGER)), path(newFile3), path(newFile4), @@ -517,6 +530,7 @@ public void testRewriterDropThenAddDifferentColumns() assertEquals(info.getRowCount(), 1); ConnectorPageSource source = storageManager.getPageSource( + FileSystemContext.DEFAULT_RAPTOR_CONTEXT, uuid, OptionalInt.empty(), ImmutableList.of(13L, 7L, 18L), @@ -545,6 +559,7 @@ public void testRewriterDropThenAddDifferentColumns() // Remove all the columns File newFile5 = new File(temporary, randomUUID().toString()); info = createFileRewriter().rewrite( + fileSystem, getColumnTypes(ImmutableList.of(13L, 18L), ImmutableList.of(createVarcharType(5), INTEGER)), path(newFile4), path(newFile5), @@ -587,7 +602,9 @@ public void testRewriterDropThenAddSameColumns() // Add a column File newFile1 = new File(temporary, randomUUID().toString()); + FileSystem fileSystem = new LocalOrcDataEnvironment().getFileSystem(FileSystemContext.DEFAULT_RAPTOR_CONTEXT); OrcFileInfo info = createFileRewriter().rewrite( + fileSystem, getColumnTypes(ImmutableList.of(3L, 7L, 10L), ImmutableList.of(BIGINT, createVarcharType(20), DOUBLE)), path(file), path(newFile1), @@ -597,6 +614,7 @@ public void testRewriterDropThenAddSameColumns() // Drop a column File newFile2 = new File(temporary, randomUUID().toString()); info = createFileRewriter().rewrite( + fileSystem, getColumnTypes(ImmutableList.of(7L, 10L), ImmutableList.of(createVarcharType(20), DOUBLE)), path(newFile1), path(newFile2), @@ -606,6 +624,7 @@ public void testRewriterDropThenAddSameColumns() // Add a column with the same ID but different type File newFile3 = new File(temporary, randomUUID().toString()); info = createFileRewriter().rewrite( + fileSystem, getColumnTypes(ImmutableList.of(7L, 10L, 3L), ImmutableList.of(createVarcharType(20), DOUBLE, createVarcharType(5))), path(newFile2), path(newFile3), @@ -622,6 +641,7 @@ public void testRewriterDropThenAddSameColumns() // Drop a column and add a column info = createFileRewriter().rewrite( + fileSystem, getColumnTypes(ImmutableList.of(7L, 3L, 8L), ImmutableList.of(createVarcharType(20), createVarcharType(5), INTEGER)), path(newFile3), path(newFile4), @@ -629,6 +649,7 @@ public void testRewriterDropThenAddSameColumns() assertEquals(info.getRowCount(), 1); ConnectorPageSource source = storageManager.getPageSource( + FileSystemContext.DEFAULT_RAPTOR_CONTEXT, uuid, OptionalInt.empty(), ImmutableList.of(3L, 7L, 8L), diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestOrcStorageManager.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestOrcStorageManager.java index 071884cf19751..1b9cc5966049e 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestOrcStorageManager.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestOrcStorageManager.java @@ -20,6 +20,9 @@ import com.facebook.presto.raptor.backup.BackupManager; import com.facebook.presto.raptor.backup.BackupStore; import com.facebook.presto.raptor.backup.FileBackupStore; +import com.facebook.presto.raptor.filesystem.FileSystemContext; +import com.facebook.presto.raptor.filesystem.LocalFileStorageService; +import com.facebook.presto.raptor.filesystem.LocalOrcDataEnvironment; import com.facebook.presto.raptor.filesystem.RaptorLocalFileSystem; import com.facebook.presto.raptor.metadata.ColumnStats; import com.facebook.presto.raptor.metadata.ShardDelta; @@ -48,6 +51,7 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.joda.time.DateTime; import org.joda.time.Days; @@ -61,6 +65,7 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.Arrays; import java.util.BitSet; import java.util.Collection; @@ -145,8 +150,8 @@ public class TestOrcStorageManager public void setup() { temporary = createTempDir(); - File directory = new File(temporary, "data"); - storageService = new FileStorageService(new LocalOrcDataEnvironment(), directory); + URI directory = new File(temporary, "data").toURI(); + storageService = new LocalFileStorageService(new LocalOrcDataEnvironment(), directory); storageService.start(); File backupDirectory = new File(temporary, "backup"); @@ -229,7 +234,8 @@ public void testWriter() recoveryManager.restoreFromBackup(shardUuid, shardInfo.getCompressedSize(), OptionalLong.of(shardInfo.getXxhash64())); - try (OrcDataSource dataSource = manager.openShard(shardUuid, READER_ATTRIBUTES)) { + FileSystem fileSystem = new LocalOrcDataEnvironment().getFileSystem(FileSystemContext.DEFAULT_RAPTOR_CONTEXT); + try (OrcDataSource dataSource = manager.openShard(fileSystem, shardUuid, READER_ATTRIBUTES)) { OrcBatchRecordReader reader = createReader(dataSource, columnIds, columnTypes); assertEquals(reader.nextBatch(), 2); @@ -327,6 +333,7 @@ public void testRewriter() throws Exception { OrcStorageManager manager = createOrcStorageManager(); + FileSystem fileSystem = new LocalOrcDataEnvironment().getFileSystem(FileSystemContext.DEFAULT_RAPTOR_CONTEXT); long transactionId = TRANSACTION_ID; List columnIds = ImmutableList.of(3L, 7L); @@ -347,6 +354,7 @@ public void testRewriter() BitSet rowsToDelete = new BitSet(); rowsToDelete.set(0); Collection fragments = manager.rewriteShard( + fileSystem, transactionId, OptionalInt.empty(), shards.get(0).getShardUuid(), @@ -567,13 +575,13 @@ private static ConnectorPageSource getPageSource( UUID uuid, TupleDomain tupleDomain) { - return manager.getPageSource(uuid, OptionalInt.empty(), columnIds, columnTypes, tupleDomain, READER_ATTRIBUTES); + return manager.getPageSource(FileSystemContext.DEFAULT_RAPTOR_CONTEXT, uuid, OptionalInt.empty(), columnIds, columnTypes, tupleDomain, READER_ATTRIBUTES); } private static StoragePageSink createStoragePageSink(StorageManager manager, List columnIds, List columnTypes) { long transactionId = TRANSACTION_ID; - return manager.createStoragePageSink(transactionId, OptionalInt.empty(), columnIds, columnTypes, false); + return manager.createStoragePageSink(FileSystemContext.DEFAULT_RAPTOR_CONTEXT, transactionId, OptionalInt.empty(), columnIds, columnTypes, false); } private OrcStorageManager createOrcStorageManager() @@ -593,8 +601,8 @@ public static OrcStorageManager createOrcStorageManager(IDBI dbi, File temporary public static OrcStorageManager createOrcStorageManager(IDBI dbi, File temporary, int maxShardRows) { - File directory = new File(temporary, "data"); - StorageService storageService = new FileStorageService(new LocalOrcDataEnvironment(), directory); + URI directory = new File(temporary, "data").toURI(); + StorageService storageService = new LocalFileStorageService(new LocalOrcDataEnvironment(), directory); storageService.start(); File backupDirectory = new File(temporary, "backup"); diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestRaptorHdfsConfig.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestRaptorHdfsConfig.java new file mode 100644 index 0000000000000..22697cfc467ce --- /dev/null +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestRaptorHdfsConfig.java @@ -0,0 +1,77 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.raptor.storage; + +import com.facebook.airlift.configuration.testing.ConfigAssertions; +import com.facebook.presto.raptor.filesystem.RaptorHdfsConfig; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.net.HostAndPort; +import io.airlift.units.DataSize; +import io.airlift.units.DataSize.Unit; +import io.airlift.units.Duration; +import org.testng.annotations.Test; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class TestRaptorHdfsConfig +{ + @Test + public void testDefaults() + { + ConfigAssertions.assertRecordedDefaults(ConfigAssertions.recordDefaults(RaptorHdfsConfig.class) + .setSocksProxy(null) + .setDfsTimeout(new Duration(60, TimeUnit.SECONDS)) + .setIpcPingInterval(new Duration(10, TimeUnit.SECONDS)) + .setDfsConnectTimeout(new Duration(500, TimeUnit.MILLISECONDS)) + .setDfsConnectMaxRetries(5) + .setDomainSocketPath(null) + .setTextMaxLineLength(new DataSize(100, Unit.MEGABYTE)) + .setResourceConfigFiles("") + .setFileSystemMaxCacheSize(1000) + .setHdfsWireEncryptionEnabled(false)); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = new ImmutableMap.Builder() + .put("hive.thrift.client.socks-proxy", "localhost:1080") + .put("hive.dfs.ipc-ping-interval", "34s") + .put("hive.dfs-timeout", "33s") + .put("hive.dfs.connect.timeout", "20s") + .put("hive.dfs.connect.max-retries", "10") + .put("hive.dfs.domain-socket-path", "/foo") + .put("hive.text.max-line-length", "13MB") + .put("hive.fs.cache.max-size", "1010") + .put("hive.hdfs.wire-encryption.enabled", "true") + .put("hive.config.resources", "a,b,c") + .build(); + + RaptorHdfsConfig expected = new RaptorHdfsConfig() + .setSocksProxy(HostAndPort.fromParts("localhost", 1080)) + .setIpcPingInterval(new Duration(34, TimeUnit.SECONDS)) + .setDfsTimeout(new Duration(33, TimeUnit.SECONDS)) + .setDfsConnectTimeout(new Duration(20, TimeUnit.SECONDS)) + .setDfsConnectMaxRetries(10) + .setDomainSocketPath("/foo") + .setTextMaxLineLength(new DataSize(13, Unit.MEGABYTE)) + .setFileSystemMaxCacheSize(1010) + .setResourceConfigFiles(ImmutableList.of("a", "b", "c")) + .setHdfsWireEncryptionEnabled(true); + + ConfigAssertions.assertFullMapping(properties, expected); + } +} diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardEjector.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardEjector.java index da846741399a5..24d2b068a7921 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardEjector.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardEjector.java @@ -16,6 +16,8 @@ import com.facebook.presto.client.NodeVersion; import com.facebook.presto.metadata.InternalNode; import com.facebook.presto.raptor.backup.BackupStore; +import com.facebook.presto.raptor.filesystem.LocalFileStorageService; +import com.facebook.presto.raptor.filesystem.LocalOrcDataEnvironment; import com.facebook.presto.raptor.metadata.ColumnInfo; import com.facebook.presto.raptor.metadata.MetadataDao; import com.facebook.presto.raptor.metadata.ShardInfo; @@ -76,7 +78,7 @@ public void setup() shardManager = createShardManager(dbi); dataDir = createTempDir(); - storageService = new FileStorageService(new LocalOrcDataEnvironment(), dataDir); + storageService = new LocalFileStorageService(new LocalOrcDataEnvironment(), dataDir.toURI()); storageService.start(); } diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardRecovery.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardRecovery.java index f0a6e0c0e4bc2..4fea67745ff00 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardRecovery.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardRecovery.java @@ -15,6 +15,8 @@ import com.facebook.presto.raptor.backup.BackupStore; import com.facebook.presto.raptor.backup.FileBackupStore; +import com.facebook.presto.raptor.filesystem.LocalFileStorageService; +import com.facebook.presto.raptor.filesystem.LocalOrcDataEnvironment; import com.facebook.presto.raptor.filesystem.RaptorLocalFileSystem; import com.facebook.presto.raptor.metadata.ShardManager; import com.facebook.presto.spi.PrestoException; @@ -72,7 +74,7 @@ public void setup() File backupDirectory = new File(temporary, "backup"); backupStore = new FileBackupStore(backupDirectory); backupStore.start(); - storageService = new FileStorageService(new LocalOrcDataEnvironment(), directory); + storageService = new LocalFileStorageService(new LocalOrcDataEnvironment(), directory.toURI()); storageService.start(); IDBI dbi = new DBI("jdbc:h2:mem:test" + System.nanoTime()); diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestStorageManagerConfig.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestStorageManagerConfig.java index c9ad1efa1d7ce..97944fd8d92f0 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestStorageManagerConfig.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestStorageManagerConfig.java @@ -19,15 +19,12 @@ import io.airlift.units.Duration; import org.testng.annotations.Test; -import javax.validation.constraints.NotNull; - -import java.io.File; +import java.net.URI; import java.util.Map; import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping; import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static com.facebook.airlift.configuration.testing.ConfigAssertions.recordDefaults; -import static com.facebook.airlift.testing.ValidationAssertions.assertFailsValidation; import static com.facebook.presto.orc.metadata.CompressionKind.SNAPPY; import static com.facebook.presto.orc.metadata.CompressionKind.ZSTD; import static com.facebook.presto.raptor.storage.StorageManagerConfig.OrcOptimizedWriterStage.ENABLED; @@ -50,6 +47,7 @@ public void testDefaults() { assertRecordedDefaults(recordDefaults(StorageManagerConfig.class) .setDataDirectory(null) + .setFileSystemProvider("file") .setMinAvailableSpace(new DataSize(0, BYTE)) .setOrcMaxMergeDistance(new DataSize(1, MEGABYTE)) .setOrcMaxReadSize(new DataSize(8, MEGABYTE)) @@ -79,9 +77,11 @@ public void testDefaults() @Test public void testExplicitPropertyMappings() + throws Exception { Map properties = new ImmutableMap.Builder() - .put("storage.data-directory", "/data") + .put("storage.data-directory", "file:///data") + .put("storage.file-system", "hdfs") .put("storage.min-available-space", "123GB") .put("storage.orc.max-merge-distance", "16kB") .put("storage.orc.max-read-size", "16kB") @@ -110,7 +110,8 @@ public void testExplicitPropertyMappings() .build(); StorageManagerConfig expected = new StorageManagerConfig() - .setDataDirectory(new File("/data")) + .setDataDirectory(new URI("file:///data")) + .setFileSystemProvider("hdfs") .setMinAvailableSpace(new DataSize(123, GIGABYTE)) .setOrcMaxMergeDistance(new DataSize(16, KILOBYTE)) .setOrcMaxReadSize(new DataSize(16, KILOBYTE)) @@ -139,10 +140,4 @@ public void testExplicitPropertyMappings() assertFullMapping(properties, expected); } - - @Test - public void testValidations() - { - assertFailsValidation(new StorageManagerConfig().setDataDirectory(null), "dataDirectory", "may not be null", NotNull.class); - } } diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardCompactor.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardCompactor.java index d80caafe23ffa..164bdeebabc17 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardCompactor.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardCompactor.java @@ -16,6 +16,7 @@ import com.facebook.presto.PagesIndexPageSorter; import com.facebook.presto.SequencePageBuilder; import com.facebook.presto.operator.PagesIndex; +import com.facebook.presto.raptor.filesystem.FileSystemContext; import com.facebook.presto.raptor.metadata.ColumnInfo; import com.facebook.presto.raptor.metadata.ShardInfo; import com.facebook.presto.raptor.storage.ReaderAttributes; @@ -263,7 +264,7 @@ private MaterializedResult getMaterializedRows(StorageManager storageManager, Li private ConnectorPageSource getPageSource(StorageManager storageManager, List columnIds, List columnTypes, UUID uuid) { - return storageManager.getPageSource(uuid, OptionalInt.empty(), columnIds, columnTypes, TupleDomain.all(), READER_ATTRIBUTES); + return storageManager.getPageSource(FileSystemContext.DEFAULT_RAPTOR_CONTEXT, uuid, OptionalInt.empty(), columnIds, columnTypes, TupleDomain.all(), READER_ATTRIBUTES); } private static List createSortedShards(StorageManager storageManager, List columnIds, List columnTypes, List sortChannels, List sortOrders, int shardCount) @@ -307,7 +308,7 @@ private static List createShards(StorageManager storageManager, List< private static StoragePageSink createStoragePageSink(StorageManager manager, List columnIds, List columnTypes) { long transactionId = 1; - return manager.createStoragePageSink(transactionId, OptionalInt.empty(), columnIds, columnTypes, false); + return manager.createStoragePageSink(FileSystemContext.DEFAULT_RAPTOR_CONTEXT, transactionId, OptionalInt.empty(), columnIds, columnTypes, false); } private static List createPages(List columnTypes)