Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ DataLakeFileSystemClient client(ADLSLocation location) {
new DataLakeFileSystemClientBuilder().httpClient(HTTP);

location.container().ifPresent(clientBuilder::fileSystemName);
azureProperties.applyClientConfiguration(location.storageEndpoint(), clientBuilder);
azureProperties.applyClientConfiguration(location.storageAccount(), clientBuilder);

return clientBuilder.buildClient();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,26 @@
*/
package org.apache.iceberg.azure.adlsv2;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* This class represents a fully qualified location in Azure Data Lake Storage, expressed as a URI.
* This class represents a fully qualified location in Azure expressed as a URI.
*
* <p>Locations follow the conventions used by Hadoop's Azure support, i.e.
*
* <pre>{@code abfs[s]://[<container>@]<storageAccount>.dfs.core.windows.net/<path>}</pre>
* <pre>{@code abfs[s]://[<container>@]<storage account host>/<file path>}</pre>
*
* or
*
* <pre>{@code wasb[s]://<container>@<storageAccount>.blob.core.windows.net/<path>}</pre>
*
* For compatibility, paths using the wasb scheme are also accepted but will be processed via the
* Azure Data Lake Storage Gen2 APIs and not the Blob Storage APIs.
*
* <p>See <a
* href="https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri#uri-syntax">Hadoop
* Azure Support</a>
* <p>See <a href="https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html">Hadoop Azure
* Support</a>
*/
class ADLSLocation {
private static final Pattern URI_PATTERN = Pattern.compile("^(abfss?|wasbs?)://[^/?#]+.*$");
private static final Pattern URI_PATTERN = Pattern.compile("^abfss?://([^/?#]+)(.*)?$");

private final String storageEndpoint;
private final String storageAccount;
private final String container;
private final String path;

Expand All @@ -59,23 +50,27 @@ class ADLSLocation {
Preconditions.checkArgument(location != null, "Invalid location: null");

Matcher matcher = URI_PATTERN.matcher(location);
if (!matcher.matches()) {
throw new IllegalArgumentException(String.format("Invalid ADLS URI: %s", location));
}

try {
URI uri = new URI(location);
this.container = uri.getUserInfo();
this.storageEndpoint = uri.getHost();
this.path = stripLeadingSlash(uri.getRawPath());
} catch (URISyntaxException e) {
throw new IllegalArgumentException(String.format("Invalid ADLS URI: %s", location), e);
ValidationException.check(matcher.matches(), "Invalid ADLS URI: %s", location);

String authority = matcher.group(1);
String[] parts = authority.split("@", -1);
if (parts.length > 1) {
this.container = parts[0];
this.storageAccount = parts[1];
} else {
this.container = null;
this.storageAccount = authority;
}

String uriPath = matcher.group(2);
uriPath = uriPath == null ? "" : uriPath.startsWith("/") ? uriPath.substring(1) : uriPath;
this.path = uriPath.split("\\?", -1)[0].split("#", -1)[0];
}

/** Returns Azure storage service endpoint. */
public String storageEndpoint() {
return storageEndpoint;
/** Returns Azure storage account. */
public String storageAccount() {
return storageAccount;
}

/** Returns Azure container name. */
Expand All @@ -87,12 +82,4 @@ public Optional<String> container() {
public String path() {
return path;
}

private static String stripLeadingSlash(String path) {
if (path.startsWith("/")) {
return path.substring(1);
} else {
return path;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.net.URI;
import java.net.URISyntaxException;
import org.apache.iceberg.exceptions.ValidationException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand All @@ -34,115 +33,71 @@ public void testLocationParsing(String scheme) {
String p1 = scheme + "://[email protected]/path/to/file";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageEndpoint()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("path/to/file");
}

@ParameterizedTest
@ValueSource(strings = {"wasb", "wasbs"})
public void testWasbLocationParsing(String scheme) {
String p1 = scheme + "://[email protected]/path/to/file";
@Test
public void testEncodedString() {
String p1 = "abfs://[email protected]/path%20to%20file";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageEndpoint()).isEqualTo("account.blob.core.windows.net");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("path/to/file");
}

@ParameterizedTest
@ValueSource(
strings = {
"abfs://[email protected]/path%20to%20file",
"wasb://[email protected]/path%20to%20file"
})
public void testEncodedString(String path) throws URISyntaxException {
ADLSLocation location = new ADLSLocation(path);
String expectedEndpoint = new URI(path).getHost();

assertThat(location.storageEndpoint()).isEqualTo(expectedEndpoint);
assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("path%20to%20file");
}

@Test
public void testMissingScheme() {
assertThatThrownBy(() -> new ADLSLocation("/path/to/file"))
.isInstanceOf(IllegalArgumentException.class)
.isInstanceOf(ValidationException.class)
.hasMessage("Invalid ADLS URI: /path/to/file");
}

@Test
public void testInvalidScheme() {
assertThatThrownBy(() -> new ADLSLocation("s3://bucket/path/to/file"))
.isInstanceOf(IllegalArgumentException.class)
.isInstanceOf(ValidationException.class)
.hasMessage("Invalid ADLS URI: s3://bucket/path/to/file");
}

@Test
public void testInvalidURI() {
String invalidUri = "abfs://[email protected]/#invalidPath#";
assertThatThrownBy(() -> new ADLSLocation(invalidUri))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(String.format("Invalid ADLS URI: %s", invalidUri));
}
public void testNoContainer() {
String p1 = "abfs://account.dfs.core.windows.net/path/to/file";
ADLSLocation location = new ADLSLocation(p1);

@ParameterizedTest
@ValueSource(
strings = {
"abfs://account.dfs.core.windows.net/path/to/file",
"wasb://account.blob.core.windows.net/path/to/file"
})
public void testNoContainer(String path) throws URISyntaxException {
ADLSLocation location = new ADLSLocation(path);
String expectedEndpoint = new URI(path).getHost();

assertThat(location.storageEndpoint()).isEqualTo(expectedEndpoint);
assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.container().isPresent()).isFalse();
assertThat(location.path()).isEqualTo("path/to/file");
}

@ParameterizedTest
@ValueSource(
strings = {
"abfs://[email protected]",
"wasb://[email protected]"
})
public void testNoPath(String path) throws URISyntaxException {
ADLSLocation location = new ADLSLocation(path);
String expectedEndpoint = new URI(path).getHost();

assertThat(location.storageEndpoint()).isEqualTo(expectedEndpoint);
@Test
public void testNoPath() {
String p1 = "abfs://[email protected]";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("");
}

@ParameterizedTest
@ValueSource(
strings = {
"abfs://[email protected]/path/to/file?query=foo#123",
"wasb://[email protected]/path/to/file?query=foo#123"
})
public void testQueryAndFragment(String path) throws URISyntaxException {
ADLSLocation location = new ADLSLocation(path);
String expectedEndpoint = new URI(path).getHost();

assertThat(location.storageEndpoint()).isEqualTo(expectedEndpoint);
@Test
public void testQueryAndFragment() {
String p1 = "abfs://[email protected]/path/to/file?query=foo#123";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("path/to/file");
}

@ParameterizedTest
@ValueSource(
strings = {
"abfs://[email protected]?query=foo#123",
"wasb://[email protected]?query=foo#123"
})
public void testQueryAndFragmentNoPath(String path) throws URISyntaxException {
ADLSLocation location = new ADLSLocation(path);
String expectedEndpoint = new URI(path).getHost();

assertThat(location.storageEndpoint()).isEqualTo(expectedEndpoint);
@Test
public void testQueryAndFragmentNoPath() {
String p1 = "abfs://[email protected]?query=foo#123";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ public class ResolvingFileIO implements HadoopConfigurable, DelegateFileIO {
"s3n", S3_FILE_IO_IMPL,
"gs", GCS_FILE_IO_IMPL,
"abfs", ADLS_FILE_IO_IMPL,
"abfss", ADLS_FILE_IO_IMPL,
"wasb", ADLS_FILE_IO_IMPL,
"wasbs", ADLS_FILE_IO_IMPL);
"abfss", ADLS_FILE_IO_IMPL);

private final Map<String, DelegateFileIO> ioInstances = Maps.newConcurrentMap();
private final AtomicBoolean isClosed = new AtomicBoolean(false);
Expand Down