diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 249aa6ee67..968febf696 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -641,6 +641,16 @@ object CometConf extends ShimCometConf { .longConf .createWithDefault(3000L) + val COMET_LIBHDFS_SCHEMES_KEY = "fs.comet.libhdfs.schemes" + + val COMET_LIBHDFS_SCHEMES: OptionalConfigEntry[String] = + conf(s"spark.hadoop.$COMET_LIBHDFS_SCHEMES_KEY") + .doc( + "Defines filesystem schemes (e.g., hdfs, webhdfs) that the native side accesses " + + "via libhdfs, separated by commas. Valid only when built with hdfs feature enabled.") + .stringConf + .createOptional + /** Create a config to enable a specific operator */ private def createExecEnabledConfig( exec: String, diff --git a/common/src/main/scala/org/apache/comet/objectstore/NativeConfig.scala b/common/src/main/scala/org/apache/comet/objectstore/NativeConfig.scala index aebe31ed15..b930aea17a 100644 --- a/common/src/main/scala/org/apache/comet/objectstore/NativeConfig.scala +++ b/common/src/main/scala/org/apache/comet/objectstore/NativeConfig.scala @@ -22,8 +22,11 @@ package org.apache.comet.objectstore import java.net.URI import java.util.Locale +import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration +import org.apache.comet.CometConf.COMET_LIBHDFS_SCHEMES_KEY + object NativeConfig { private val objectStoreConfigPrefixes = Map( @@ -55,16 +58,22 @@ object NativeConfig { def extractObjectStoreOptions(hadoopConf: Configuration, uri: URI): Map[String, String] = { val scheme = uri.getScheme.toLowerCase(Locale.ROOT) + import scala.collection.JavaConverters._ + val options = scala.collection.mutable.Map[String, String]() + + // The schemes will use libhdfs + val libhdfsSchemes = hadoopConf.get(COMET_LIBHDFS_SCHEMES_KEY) + if (StringUtils.isNotBlank(libhdfsSchemes)) { + options(COMET_LIBHDFS_SCHEMES_KEY) = libhdfsSchemes + } + // Get prefixes for this scheme, return early if none found val prefixes = objectStoreConfigPrefixes.get(scheme) if (prefixes.isEmpty) { - return Map.empty[String, String] + return options.toMap } - import scala.collection.JavaConverters._ - // Extract all configurations that match the object store prefixes - val options = scala.collection.mutable.Map[String, String]() hadoopConf.iterator().asScala.foreach { entry => val key = entry.getKey val value = entry.getValue diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index bbfdf29844..0582c2902f 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -90,4 +90,5 @@ Comet provides the following configuration settings. | spark.comet.shuffle.preferDictionary.ratio | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 | | spark.comet.shuffle.sizeInBytesMultiplier | Comet reports smaller sizes for shuffle due to using Arrow's columnar memory format and this can result in Spark choosing a different join strategy due to the estimated size of the exchange being smaller. Comet will multiple sizeInBytes by this amount to avoid regressions in join strategy. | 1.0 | | spark.comet.sparkToColumnar.supportedOperatorList | A comma-separated list of operators that will be converted to Arrow columnar format when 'spark.comet.sparkToColumnar.enabled' is true | Range,InMemoryTableScan | +| spark.hadoop.fs.comet.libhdfs.schemes | Defines filesystem schemes (e.g., hdfs, webhdfs) that the native side accesses via libhdfs, separated by commas. Valid only when built with hdfs feature enabled. | | diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index e8118a5550..00208e3161 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -336,6 +336,17 @@ fn value_field(entries_field: &FieldRef) -> Option { } } +fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap) -> bool { + const COMET_LIBHDFS_SCHEMES_KEY: &str = "fs.comet.libhdfs.schemes"; + let scheme = url.scheme(); + if let Some(libhdfs_schemes) = object_store_configs.get(COMET_LIBHDFS_SCHEMES_KEY) { + use itertools::Itertools; + libhdfs_schemes.split(",").contains(scheme) + } else { + scheme == "hdfs" + } +} + // Mirrors object_store::parse::parse_url for the hdfs object store #[cfg(feature = "hdfs")] fn parse_hdfs_url(url: &Url) -> Result<(Box, Path), object_store::Error> { @@ -406,8 +417,9 @@ pub(crate) fn prepare_object_store_with_configs( ) -> Result<(ObjectStoreUrl, Path), ExecutionError> { let mut url = Url::parse(url.as_str()) .map_err(|e| ExecutionError::GeneralError(format!("Error parsing URL {url}: {e}")))?; + let is_hdfs_scheme = is_hdfs_scheme(&url, object_store_configs); let mut scheme = url.scheme(); - if scheme == "s3a" { + if !is_hdfs_scheme && scheme == "s3a" { scheme = "s3"; url.set_scheme("s3").map_err(|_| { ExecutionError::GeneralError("Could not convert scheme from s3a to s3".to_string()) @@ -419,7 +431,7 @@ pub(crate) fn prepare_object_store_with_configs( &url[url::Position::BeforeHost..url::Position::AfterPort], ); - let (object_store, object_store_path): (Box, Path) = if scheme == "hdfs" { + let (object_store, object_store_path): (Box, Path) = if is_hdfs_scheme { parse_hdfs_url(&url) } else if scheme == "s3" { objectstore::s3::create_store(&url, object_store_configs, Duration::from_secs(300)) diff --git a/spark/src/test/java/org/apache/comet/hadoop/fs/FakeHDFSFileSystem.java b/spark/src/test/java/org/apache/comet/hadoop/fs/FakeHDFSFileSystem.java new file mode 100644 index 0000000000..d109840430 --- /dev/null +++ b/spark/src/test/java/org/apache/comet/hadoop/fs/FakeHDFSFileSystem.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.hadoop.fs; + +import java.net.URI; + +import org.apache.hadoop.fs.RawLocalFileSystem; + +public class FakeHDFSFileSystem extends RawLocalFileSystem { + + public static final String PREFIX = "fake://fake-bucket"; + + public FakeHDFSFileSystem() { + // Avoid `URI scheme is not "file"` error on + // RawLocalFileSystem$DeprecatedRawLocalFileStatus.getOwner + RawLocalFileSystem.useStatIfAvailable(); + } + + @Override + public String getScheme() { + return "fake"; + } + + @Override + public URI getUri() { + return URI.create(PREFIX); + } +} diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala new file mode 100644 index 0000000000..cec7345689 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet + +import java.io.File +import java.nio.file.Files +import java.util.UUID + +import org.apache.commons.io.FileUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql.{CometTestBase, DataFrame, SaveMode} +import org.apache.spark.sql.comet.CometNativeScanExec +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.functions.{col, sum} + +import org.apache.comet.CometConf +import org.apache.comet.hadoop.fs.FakeHDFSFileSystem + +class ParquetReadFromFakeHadoopFsSuite extends CometTestBase with AdaptiveSparkPlanHelper { + + private var fake_root_dir: File = _ + + override protected def sparkConf: SparkConf = { + val conf = super.sparkConf + conf.set("spark.hadoop.fs.fake.impl", "org.apache.comet.hadoop.fs.FakeHDFSFileSystem") + conf.set("spark.hadoop.fs.defaultFS", FakeHDFSFileSystem.PREFIX) + conf.set(CometConf.COMET_LIBHDFS_SCHEMES.key, "fake,hdfs") + } + + override def beforeAll(): Unit = { + // Initialize fake root dir + fake_root_dir = Files.createTempDirectory(s"comet_fake_${UUID.randomUUID().toString}").toFile + // Initialize Spark session + super.beforeAll() + } + + protected override def afterAll(): Unit = { + if (fake_root_dir != null) FileUtils.deleteDirectory(fake_root_dir) + super.afterAll() + } + + private def writeTestParquetFile(filePath: String): Unit = { + val df = spark.range(0, 1000) + df.write.format("parquet").mode(SaveMode.Overwrite).save(filePath) + } + + private def assertCometNativeScanOnFakeFs(df: DataFrame): Unit = { + val scans = collect(df.queryExecution.executedPlan) { case p: CometNativeScanExec => + p + } + assert(scans.size == 1) + assert( + scans.head.nativeOp.getNativeScan + .getFilePartitions(0) + .getPartitionedFile(0) + .getFilePath + .startsWith(FakeHDFSFileSystem.PREFIX)) + } + + test("test native_datafusion scan on fake fs") { + val testFilePath = + s"${FakeHDFSFileSystem.PREFIX}${fake_root_dir.getAbsolutePath}/data/test-file.parquet" + writeTestParquetFile(testFilePath) + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) { + val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id"))) + assertCometNativeScanOnFakeFs(df) + assert(df.first().getLong(0) == 499500) + } + } +}