From 680a43fc9becd4ec872a3f7ea75389e5bf864978 Mon Sep 17 00:00:00 2001 From: Jinyang Li Date: Wed, 29 Jul 2020 12:57:09 -0700 Subject: [PATCH] Handle create page source for Druid segment on s3 --- .../presto/druid/DruidPageSourceProvider.java | 11 +- .../druid/metadata/DruidSegmentInfo.java | 60 ++++++++- .../presto/druid/TestDruidSegmentInfo.java | 114 ++++++++++++++++++ 3 files changed, 174 insertions(+), 11 deletions(-) create mode 100644 presto-druid/src/test/java/com/facebook/presto/druid/TestDruidSegmentInfo.java diff --git a/presto-druid/src/main/java/com/facebook/presto/druid/DruidPageSourceProvider.java b/presto-druid/src/main/java/com/facebook/presto/druid/DruidPageSourceProvider.java index 8dff942928124..dc787a3e7580f 100644 --- a/presto-druid/src/main/java/com/facebook/presto/druid/DruidPageSourceProvider.java +++ b/presto-druid/src/main/java/com/facebook/presto/druid/DruidPageSourceProvider.java @@ -73,13 +73,14 @@ public ConnectorPageSource createPageSource( columns, druidClient); } + DruidSegmentInfo segmentInfo = druidSplit.getSegmentInfo().get(); try { - Path hdfsPath = new Path(segmentInfo.getDeepStoragePath()); - FileSystem fileSystem = hdfsPath.getFileSystem(hadoopConfiguration); - long fileSize = fileSystem.getFileStatus(hdfsPath).getLen(); - FSDataInputStream inputStream = fileSystem.open(hdfsPath); - DataInputSourceId dataInputSourceId = new DataInputSourceId(hdfsPath.toString()); + Path segmentPath = new Path(segmentInfo.getDeepStoragePath()); + FileSystem fileSystem = segmentPath.getFileSystem(hadoopConfiguration); + long fileSize = fileSystem.getFileStatus(segmentPath).getLen(); + FSDataInputStream inputStream = fileSystem.open(segmentPath); + DataInputSourceId dataInputSourceId = new DataInputSourceId(segmentPath.toString()); HdfsDataInputSource dataInputSource = new HdfsDataInputSource(dataInputSourceId, inputStream, fileSize); IndexFileSource indexFileSource = new ZipIndexFileSource(dataInputSource); SegmentColumnSource segmentColumnSource = new SmooshedColumnSource(indexFileSource); diff --git a/presto-druid/src/main/java/com/facebook/presto/druid/metadata/DruidSegmentInfo.java b/presto-druid/src/main/java/com/facebook/presto/druid/metadata/DruidSegmentInfo.java index e18ab7f8d80d0..ca38860637b1c 100644 --- a/presto-druid/src/main/java/com/facebook/presto/druid/metadata/DruidSegmentInfo.java +++ b/presto-druid/src/main/java/com/facebook/presto/druid/metadata/DruidSegmentInfo.java @@ -19,6 +19,10 @@ import javax.annotation.Nullable; +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLEncoder; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -31,7 +35,11 @@ public class DruidSegmentInfo { private static final String DEEP_STORAGE_TYPE_KEY = "type"; - private static final String SEGMENT_PATH_KEY = "path"; + private static final String DEEP_STORAGE_S3_SCHEMA_KEY = "S3Schema"; + private static final String DEEP_STORAGE_BUCKET_KEY = "bucket"; + private static final String DEEP_STORAGE_PATH_KEY = "path"; + private static final String S3A_SCHEMA = "s3a"; + private static final String S3N_SCHEMA = "s3n"; private final String dataSource; private final String version; @@ -42,7 +50,10 @@ public class DruidSegmentInfo public enum DeepStorageType { - HDFS("hdfs"); + HDFS("hdfs"), + S3("s3_zip"), + GCS("google"), + LOCAL("local"); private final String type; @@ -50,6 +61,16 @@ public enum DeepStorageType { this.type = type; } + + static DeepStorageType fromType(String type) + { + for (DeepStorageType deepStorageType : DeepStorageType.values()) { + if (deepStorageType.type.equalsIgnoreCase(type)) { + return deepStorageType; + } + } + throw new IllegalArgumentException("Unknown deep storage type: " + type); + } } @JsonCreator @@ -109,14 +130,41 @@ public DeepStorageType getDeepStorageType() { Map loadSpecification = getLoadSpecification() .orElseThrow(() -> new PrestoException(DRUID_METADATA_ERROR, format("Malformed segment loadSpecification: %s", getLoadSpecification()))); - return DeepStorageType.valueOf(loadSpecification.get(DEEP_STORAGE_TYPE_KEY).toUpperCase()); + return DeepStorageType.fromType(loadSpecification.get(DEEP_STORAGE_TYPE_KEY)); } - public String getDeepStoragePath() + public URI getDeepStoragePath() { - Map loadSpecification = getLoadSpecification() + final Map loadSpec = getLoadSpecification() .orElseThrow(() -> new PrestoException(DRUID_METADATA_ERROR, format("Malformed segment loadSpecification: %s", getLoadSpecification()))); - return loadSpecification.get(SEGMENT_PATH_KEY); + final String type = loadSpec.get(DEEP_STORAGE_TYPE_KEY); + final URI segmentLocURI; + try { + switch (DeepStorageType.fromType(type)) { + case S3: + final String s3schema = S3A_SCHEMA.equals(loadSpec.get(DEEP_STORAGE_S3_SCHEMA_KEY)) ? S3A_SCHEMA : S3N_SCHEMA; + segmentLocURI = URI.create(format("%s://%s/%s", s3schema, loadSpec.get(DEEP_STORAGE_BUCKET_KEY), loadSpec.get("key"))); + break; + case HDFS: + segmentLocURI = URI.create(loadSpec.get(DEEP_STORAGE_PATH_KEY)); + break; + case GCS: + segmentLocURI = URI.create( + format("gs://%s/%s", + loadSpec.get(DEEP_STORAGE_BUCKET_KEY), + URLEncoder.encode(loadSpec.get(DEEP_STORAGE_PATH_KEY), "UTF-8"))); + break; + case LOCAL: + segmentLocURI = new URI("file", null, loadSpec.get(DEEP_STORAGE_PATH_KEY), null, null); + break; + default: + throw new PrestoException(DRUID_METADATA_ERROR, format("Unsupported segment filesystem: %s", type)); + } + } + catch (URISyntaxException | UnsupportedEncodingException e) { + throw new PrestoException(DRUID_METADATA_ERROR, e); + } + return segmentLocURI; } @Override diff --git a/presto-druid/src/test/java/com/facebook/presto/druid/TestDruidSegmentInfo.java b/presto-druid/src/test/java/com/facebook/presto/druid/TestDruidSegmentInfo.java new file mode 100644 index 0000000000000..b9419807d8a54 --- /dev/null +++ b/presto-druid/src/test/java/com/facebook/presto/druid/TestDruidSegmentInfo.java @@ -0,0 +1,114 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.druid; + +import com.facebook.presto.druid.metadata.DruidSegmentInfo; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Optional; + +import static java.lang.String.format; +import static org.testng.Assert.assertEquals; + +public class TestDruidSegmentInfo +{ + @Test + public void testDeepStoragePathOnS3() throws URISyntaxException + { + final String bucket = "newBucket"; + final String segmentPath = "foo/bar/index.zip"; + + for (String s3Schema : Arrays.asList("s3a", "s3n")) { + DruidSegmentInfo druidSegmentInfo = new DruidSegmentInfo( + "testDatasource", + "v1", + Optional.of(ImmutableMap.of( + "type", "s3_zip", + "bucket", bucket, + "key", segmentPath, + "S3Schema", s3Schema)), + Optional.empty(), + 0, + 0); + + assertEquals(druidSegmentInfo.getDeepStorageType(), DruidSegmentInfo.DeepStorageType.S3); + assertEquals(druidSegmentInfo.getDeepStoragePath(), new URI(format("%s://%s/%s", s3Schema, bucket, segmentPath))); + } + } + + @Test + public void testDeepStoragePathOnHdfs() throws URISyntaxException + { + final String segmentPath = "hdfs://foo/bar/index.zip"; + + DruidSegmentInfo druidSegmentInfo = new DruidSegmentInfo( + "testDatasource", + "v1", + Optional.of(ImmutableMap.of( + "type", "hdfs", + "path", segmentPath)), + Optional.empty(), + 0, + 0); + + assertEquals(druidSegmentInfo.getDeepStorageType(), DruidSegmentInfo.DeepStorageType.HDFS); + assertEquals(druidSegmentInfo.getDeepStoragePath(), new URI(segmentPath)); + } + + @Test + public void testDeepStoragePathOnGS() throws URISyntaxException + { + final String bucket = "newBucket"; + final String segmentPath = "foo:bar"; + + DruidSegmentInfo druidSegmentInfo = new DruidSegmentInfo( + "testDatasource", + "v1", + Optional.of(ImmutableMap.of( + "type", "google", + "bucket", bucket, + "path", segmentPath)), + Optional.empty(), + 0, + 0); + + assertEquals(druidSegmentInfo.getDeepStorageType(), DruidSegmentInfo.DeepStorageType.GCS); + // Verify ":" get escaped as Google Cloud Storage support ":" but hadoop does not. + assertEquals(druidSegmentInfo.getDeepStoragePath(), + new URI(format("gs://%s/%s", bucket, segmentPath.replace(":", "%3A")))); + } + + @Test + public void testDeepStoragePathOnLocal() + { + final String path = "/foo/bar"; + DruidSegmentInfo druidSegmentInfo = new DruidSegmentInfo( + "testDatasource", + "v1", + Optional.of(ImmutableMap.of( + "type", "local", + "path", path)), + Optional.empty(), + 0, + 0); + + assertEquals(druidSegmentInfo.getDeepStorageType(), DruidSegmentInfo.DeepStorageType.LOCAL); + assertEquals(druidSegmentInfo.getDeepStoragePath(), new File(path).toURI()); + } +}