From 5da8c7f6613bf435c2a183fe3a72ee6718bc23ba Mon Sep 17 00:00:00 2001 From: Marc Cenac Date: Wed, 9 Oct 2024 23:04:47 -0500 Subject: [PATCH 1/3] Azure: Accept wasb[s] paths in ADLSLocation --- .../iceberg/azure/adlsv2/ADLSLocation.java | 42 ++++++----- .../azure/adlsv2/ADLSLocationTest.java | 69 ++++++++++++++----- 2 files changed, 75 insertions(+), 36 deletions(-) diff --git a/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSLocation.java b/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSLocation.java index e73093512b82..d72ecc71a772 100644 --- a/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSLocation.java +++ b/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSLocation.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.azure.adlsv2; +import java.net.URI; +import java.net.URISyntaxException; import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -25,17 +27,25 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** - * This class represents a fully qualified location in Azure expressed as a URI. + * This class represents a fully qualified location in Azure Data Lake Storage, expressed as a URI. * *

Locations follow the conventions used by Hadoop's Azure support, i.e. * - *

{@code abfs[s]://[@]/}
+ *
{@code abfs[s]://[@].dfs.core.windows.net/}
* - *

See Hadoop Azure - * Support + * or + * + *

{@code wasb[s]://@.blob.core.windows.net/}
+ * + * For compatibility, paths using the wasb scheme are also accepted but will be processed via the + * Azure Data Lake Storage Gen2 APIs and not the Blob Storage APIs. + * + *

See Hadoop + * Azure Support */ class ADLSLocation { - private static final Pattern URI_PATTERN = Pattern.compile("^abfss?://([^/?#]+)(.*)?$"); + private static final Pattern URI_PATTERN = Pattern.compile("^(abfss?|wasbs?)://([^/?#]+)(.*)?$"); private final String storageAccount; private final String container; @@ -53,19 +63,17 @@ class ADLSLocation { ValidationException.check(matcher.matches(), "Invalid ADLS URI: %s", location); - String authority = matcher.group(1); - String[] parts = authority.split("@", -1); - if (parts.length > 1) { - this.container = parts[0]; - this.storageAccount = parts[1]; - } else { - this.container = null; - this.storageAccount = authority; + try { + URI uri = new URI(location); + this.container = uri.getUserInfo(); + // storage account name is the first part of the host + int accountSplit = uri.getHost().indexOf('.'); + String storageAccountName = uri.getHost().substring(0, accountSplit); + this.storageAccount = String.format("%s.dfs.core.windows.net", storageAccountName); + this.path = uri.getPath().length() > 1 ? uri.getRawPath().substring(1) : ""; + } catch (URISyntaxException e) { + throw new ValidationException("Invalid URI: %s", location); } - - String uriPath = matcher.group(2); - uriPath = uriPath == null ? "" : uriPath.startsWith("/") ? uriPath.substring(1) : uriPath; - this.path = uriPath.split("\\?", -1)[0].split("#", -1)[0]; } /** Returns Azure storage account. */ diff --git a/azure/src/test/java/org/apache/iceberg/azure/adlsv2/ADLSLocationTest.java b/azure/src/test/java/org/apache/iceberg/azure/adlsv2/ADLSLocationTest.java index 867b54b4c7e3..a97245c33fb1 100644 --- a/azure/src/test/java/org/apache/iceberg/azure/adlsv2/ADLSLocationTest.java +++ b/azure/src/test/java/org/apache/iceberg/azure/adlsv2/ADLSLocationTest.java @@ -38,11 +38,26 @@ public void testLocationParsing(String scheme) { assertThat(location.path()).isEqualTo("path/to/file"); } - @Test - public void testEncodedString() { - String p1 = "abfs://container@account.dfs.core.windows.net/path%20to%20file"; + @ParameterizedTest + @ValueSource(strings = {"wasb", "wasbs"}) + public void testWasbLocationParsing(String scheme) { + String p1 = scheme + "://container@account.blob.core.windows.net/path/to/file"; ADLSLocation location = new ADLSLocation(p1); + assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net"); + assertThat(location.container().get()).isEqualTo("container"); + assertThat(location.path()).isEqualTo("path/to/file"); + } + + @ParameterizedTest + @ValueSource( + strings = { + "abfs://container@account.dfs.core.windows.net/path%20to%20file", + "wasb://container@account.blob.core.windows.net/path%20to%20file" + }) + public void testEncodedString(String path) { + ADLSLocation location = new ADLSLocation(path); + assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net"); assertThat(location.container().get()).isEqualTo("container"); assertThat(location.path()).isEqualTo("path%20to%20file"); @@ -62,40 +77,56 @@ public void testInvalidScheme() { .hasMessage("Invalid ADLS URI: s3://bucket/path/to/file"); } - @Test - public void testNoContainer() { - String p1 = "abfs://account.dfs.core.windows.net/path/to/file"; - ADLSLocation location = new ADLSLocation(p1); + @ParameterizedTest + @ValueSource( + strings = { + "abfs://account.dfs.core.windows.net/path/to/file", + "wasb://account.blob.core.windows.net/path/to/file" + }) + public void testNoContainer(String path) { + ADLSLocation location = new ADLSLocation(path); assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net"); assertThat(location.container().isPresent()).isFalse(); assertThat(location.path()).isEqualTo("path/to/file"); } - @Test - public void testNoPath() { - String p1 = "abfs://container@account.dfs.core.windows.net"; - ADLSLocation location = new ADLSLocation(p1); + @ParameterizedTest + @ValueSource( + strings = { + "abfs://container@account.dfs.core.windows.net", + "wasb://container@account.blob.core.windows.net" + }) + public void testNoPath(String path) { + ADLSLocation location = new ADLSLocation(path); assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net"); assertThat(location.container().get()).isEqualTo("container"); assertThat(location.path()).isEqualTo(""); } - @Test - public void testQueryAndFragment() { - String p1 = "abfs://container@account.dfs.core.windows.net/path/to/file?query=foo#123"; - ADLSLocation location = new ADLSLocation(p1); + @ParameterizedTest + @ValueSource( + strings = { + "abfs://container@account.dfs.core.windows.net/path/to/file?query=foo#123", + "wasb://container@account.blob.core.windows.net/path/to/file?query=foo#123" + }) + public void testQueryAndFragment(String path) { + ADLSLocation location = new ADLSLocation(path); assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net"); assertThat(location.container().get()).isEqualTo("container"); assertThat(location.path()).isEqualTo("path/to/file"); } - @Test - public void testQueryAndFragmentNoPath() { - String p1 = "abfs://container@account.dfs.core.windows.net?query=foo#123"; - ADLSLocation location = new ADLSLocation(p1); + @ParameterizedTest + @ValueSource( + strings = { + "abfs://container@account.dfs.core.windows.net?query=foo#123", + "wasb://container@account.blob.core.windows.net?query=foo#123" + }) + public void testQueryAndFragmentNoPath(String path) { + ADLSLocation location = new ADLSLocation(path); assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net"); assertThat(location.container().get()).isEqualTo("container"); From 3095ff689ae960cc52549e8b884b163caf7d9885 Mon Sep 17 00:00:00 2001 From: Marc Cenac Date: Thu, 10 Oct 2024 07:53:22 -0500 Subject: [PATCH 2/3] Core: Resolve wasb[s] scheme to ADLSFileIO via ResolvingFileIO --- core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java index a858045aab8b..a8adf979f85a 100644 --- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java +++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java @@ -62,7 +62,9 @@ public class ResolvingFileIO implements HadoopConfigurable, DelegateFileIO { "s3n", S3_FILE_IO_IMPL, "gs", GCS_FILE_IO_IMPL, "abfs", ADLS_FILE_IO_IMPL, - "abfss", ADLS_FILE_IO_IMPL); + "abfss", ADLS_FILE_IO_IMPL, + "wasb", ADLS_FILE_IO_IMPL, + "wasbs", ADLS_FILE_IO_IMPL); private final Map ioInstances = Maps.newConcurrentMap(); private final AtomicBoolean isClosed = new AtomicBoolean(false); From 7f4ad691cbaf946ee7511b6f2dc33f6e885384a9 Mon Sep 17 00:00:00 2001 From: Marc Cenac Date: Fri, 11 Oct 2024 15:28:03 -0500 Subject: [PATCH 3/3] Addressing PR comments --- .../iceberg/azure/adlsv2/ADLSFileIO.java | 2 +- .../iceberg/azure/adlsv2/ADLSLocation.java | 33 ++++++++------ .../azure/adlsv2/ADLSLocationTest.java | 44 ++++++++++++------- 3 files changed, 49 insertions(+), 30 deletions(-) diff --git a/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSFileIO.java b/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSFileIO.java index 0bfce9d6055b..555b395e0d0e 100644 --- a/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSFileIO.java +++ b/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSFileIO.java @@ -111,7 +111,7 @@ DataLakeFileSystemClient client(ADLSLocation location) { new DataLakeFileSystemClientBuilder().httpClient(HTTP); location.container().ifPresent(clientBuilder::fileSystemName); - azureProperties.applyClientConfiguration(location.storageAccount(), clientBuilder); + azureProperties.applyClientConfiguration(location.storageEndpoint(), clientBuilder); return clientBuilder.buildClient(); } diff --git a/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSLocation.java b/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSLocation.java index d72ecc71a772..e024a5149343 100644 --- a/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSLocation.java +++ b/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSLocation.java @@ -23,7 +23,6 @@ import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** @@ -45,9 +44,9 @@ * Azure Support */ class ADLSLocation { - private static final Pattern URI_PATTERN = Pattern.compile("^(abfss?|wasbs?)://([^/?#]+)(.*)?$"); + private static final Pattern URI_PATTERN = Pattern.compile("^(abfss?|wasbs?)://[^/?#]+.*$"); - private final String storageAccount; + private final String storageEndpoint; private final String container; private final String path; @@ -60,25 +59,23 @@ class ADLSLocation { Preconditions.checkArgument(location != null, "Invalid location: null"); Matcher matcher = URI_PATTERN.matcher(location); - - ValidationException.check(matcher.matches(), "Invalid ADLS URI: %s", location); + if (!matcher.matches()) { + throw new IllegalArgumentException(String.format("Invalid ADLS URI: %s", location)); + } try { URI uri = new URI(location); this.container = uri.getUserInfo(); - // storage account name is the first part of the host - int accountSplit = uri.getHost().indexOf('.'); - String storageAccountName = uri.getHost().substring(0, accountSplit); - this.storageAccount = String.format("%s.dfs.core.windows.net", storageAccountName); - this.path = uri.getPath().length() > 1 ? uri.getRawPath().substring(1) : ""; + this.storageEndpoint = uri.getHost(); + this.path = stripLeadingSlash(uri.getRawPath()); } catch (URISyntaxException e) { - throw new ValidationException("Invalid URI: %s", location); + throw new IllegalArgumentException(String.format("Invalid ADLS URI: %s", location), e); } } - /** Returns Azure storage account. */ - public String storageAccount() { - return storageAccount; + /** Returns Azure storage service endpoint. */ + public String storageEndpoint() { + return storageEndpoint; } /** Returns Azure container name. */ @@ -90,4 +87,12 @@ public Optional container() { public String path() { return path; } + + private static String stripLeadingSlash(String path) { + if (path.startsWith("/")) { + return path.substring(1); + } else { + return path; + } + } } diff --git a/azure/src/test/java/org/apache/iceberg/azure/adlsv2/ADLSLocationTest.java b/azure/src/test/java/org/apache/iceberg/azure/adlsv2/ADLSLocationTest.java index a97245c33fb1..6edede187153 100644 --- a/azure/src/test/java/org/apache/iceberg/azure/adlsv2/ADLSLocationTest.java +++ b/azure/src/test/java/org/apache/iceberg/azure/adlsv2/ADLSLocationTest.java @@ -21,7 +21,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import org.apache.iceberg.exceptions.ValidationException; +import java.net.URI; +import java.net.URISyntaxException; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -33,7 +34,7 @@ public void testLocationParsing(String scheme) { String p1 = scheme + "://container@account.dfs.core.windows.net/path/to/file"; ADLSLocation location = new ADLSLocation(p1); - assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net"); + assertThat(location.storageEndpoint()).isEqualTo("account.dfs.core.windows.net"); assertThat(location.container().get()).isEqualTo("container"); assertThat(location.path()).isEqualTo("path/to/file"); } @@ -44,7 +45,7 @@ public void testWasbLocationParsing(String scheme) { String p1 = scheme + "://container@account.blob.core.windows.net/path/to/file"; ADLSLocation location = new ADLSLocation(p1); - assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net"); + assertThat(location.storageEndpoint()).isEqualTo("account.blob.core.windows.net"); assertThat(location.container().get()).isEqualTo("container"); assertThat(location.path()).isEqualTo("path/to/file"); } @@ -55,10 +56,11 @@ public void testWasbLocationParsing(String scheme) { "abfs://container@account.dfs.core.windows.net/path%20to%20file", "wasb://container@account.blob.core.windows.net/path%20to%20file" }) - public void testEncodedString(String path) { + public void testEncodedString(String path) throws URISyntaxException { ADLSLocation location = new ADLSLocation(path); + String expectedEndpoint = new URI(path).getHost(); - assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net"); + assertThat(location.storageEndpoint()).isEqualTo(expectedEndpoint); assertThat(location.container().get()).isEqualTo("container"); assertThat(location.path()).isEqualTo("path%20to%20file"); } @@ -66,27 +68,36 @@ public void testEncodedString(String path) { @Test public void testMissingScheme() { assertThatThrownBy(() -> new ADLSLocation("/path/to/file")) - .isInstanceOf(ValidationException.class) + .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid ADLS URI: /path/to/file"); } @Test public void testInvalidScheme() { assertThatThrownBy(() -> new ADLSLocation("s3://bucket/path/to/file")) - .isInstanceOf(ValidationException.class) + .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid ADLS URI: s3://bucket/path/to/file"); } + @Test + public void testInvalidURI() { + String invalidUri = "abfs://container@account.dfs.core.windows.net/#invalidPath#"; + assertThatThrownBy(() -> new ADLSLocation(invalidUri)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage(String.format("Invalid ADLS URI: %s", invalidUri)); + } + @ParameterizedTest @ValueSource( strings = { "abfs://account.dfs.core.windows.net/path/to/file", "wasb://account.blob.core.windows.net/path/to/file" }) - public void testNoContainer(String path) { + public void testNoContainer(String path) throws URISyntaxException { ADLSLocation location = new ADLSLocation(path); + String expectedEndpoint = new URI(path).getHost(); - assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net"); + assertThat(location.storageEndpoint()).isEqualTo(expectedEndpoint); assertThat(location.container().isPresent()).isFalse(); assertThat(location.path()).isEqualTo("path/to/file"); } @@ -97,10 +108,11 @@ public void testNoContainer(String path) { "abfs://container@account.dfs.core.windows.net", "wasb://container@account.blob.core.windows.net" }) - public void testNoPath(String path) { + public void testNoPath(String path) throws URISyntaxException { ADLSLocation location = new ADLSLocation(path); + String expectedEndpoint = new URI(path).getHost(); - assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net"); + assertThat(location.storageEndpoint()).isEqualTo(expectedEndpoint); assertThat(location.container().get()).isEqualTo("container"); assertThat(location.path()).isEqualTo(""); } @@ -111,10 +123,11 @@ public void testNoPath(String path) { "abfs://container@account.dfs.core.windows.net/path/to/file?query=foo#123", "wasb://container@account.blob.core.windows.net/path/to/file?query=foo#123" }) - public void testQueryAndFragment(String path) { + public void testQueryAndFragment(String path) throws URISyntaxException { ADLSLocation location = new ADLSLocation(path); + String expectedEndpoint = new URI(path).getHost(); - assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net"); + assertThat(location.storageEndpoint()).isEqualTo(expectedEndpoint); assertThat(location.container().get()).isEqualTo("container"); assertThat(location.path()).isEqualTo("path/to/file"); } @@ -125,10 +138,11 @@ public void testQueryAndFragment(String path) { "abfs://container@account.dfs.core.windows.net?query=foo#123", "wasb://container@account.blob.core.windows.net?query=foo#123" }) - public void testQueryAndFragmentNoPath(String path) { + public void testQueryAndFragmentNoPath(String path) throws URISyntaxException { ADLSLocation location = new ADLSLocation(path); + String expectedEndpoint = new URI(path).getHost(); - assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net"); + assertThat(location.storageEndpoint()).isEqualTo(expectedEndpoint); assertThat(location.container().get()).isEqualTo("container"); assertThat(location.path()).isEqualTo(""); }