Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ public Optional<Long> adlsWriteBlockSize() {
return Optional.ofNullable(adlsWriteBlockSize);
}

/**
* Applies configuration to the {@link DataLakeFileSystemClientBuilder} to provide the endpoint
* and credentials required to create an instance of the client.
*
* <p>The default endpoint is constructed in the form {@code
* https://{account}.dfs.core.windows.net} and default credentials are provided via the {@link
* com.azure.identity.DefaultAzureCredential}.
*
* @param account the service account name
* @param builder the builder instance
*/
public void applyClientConfiguration(String account, DataLakeFileSystemClientBuilder builder) {
String sasToken = adlsSasTokens.get(account);
if (sasToken != null && !sasToken.isEmpty()) {
Expand All @@ -93,7 +104,7 @@ public void applyClientConfiguration(String account, DataLakeFileSystemClientBui
if (connectionString != null && !connectionString.isEmpty()) {
builder.endpoint(connectionString);
} else {
builder.endpoint("https://" + account);
builder.endpoint("https://" + account + ".dfs.core.windows.net");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,26 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* This class represents a fully qualified location in Azure expressed as a URI.
* This class represents a fully qualified location to a file or directory in Azure Data Lake
* Storage Gen2 storage.
*
* <p>Locations follow the conventions used by Hadoop's Azure support, i.e.
* <p>Locations follow a URI like structure to identify resources
*
* <pre>{@code abfs[s]://[<container>@]<storage account host>/<file path>}</pre>
* <pre>{@code abfs[s]://[<container>@]<storageAccount>.dfs.core.windows.net/<path>}</pre>
*
* <p>See <a href="https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html">Hadoop Azure
* Support</a>
* or
*
* <pre>{@code wasb[s]://<container>@<storageAccount>.blob.core.windows.net/<path>}</pre>
*
* For compatibility, locations using the wasb scheme are also accepted but will use the Azure Data
* Lake Storage Gen2 REST APIs instead of the Blob Storage REST APIs.
*
* <p>See <a
* href="https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri#uri-syntax">Azure
* Data Lake Storage URI</a>
*/
class ADLSLocation {
private static final Pattern URI_PATTERN = Pattern.compile("^abfss?://([^/?#]+)(.*)?$");
private static final Pattern URI_PATTERN = Pattern.compile("^(abfss?|wasbs?)://([^/?#]+)(.*)?$");

private final String storageAccount;
private final String container;
Expand All @@ -53,19 +62,19 @@ class ADLSLocation {

ValidationException.check(matcher.matches(), "Invalid ADLS URI: %s", location);

String authority = matcher.group(1);
String authority = matcher.group(2);
String[] parts = authority.split("@", -1);
if (parts.length > 1) {
this.container = parts[0];
this.storageAccount = parts[1];
String host = parts[1];
this.storageAccount = host.split("\\.", -1)[0];
} else {
this.container = null;
this.storageAccount = authority;
this.storageAccount = authority.split("\\.", -1)[0];
}

String uriPath = matcher.group(2);
uriPath = uriPath == null ? "" : uriPath.startsWith("/") ? uriPath.substring(1) : uriPath;
this.path = uriPath.split("\\?", -1)[0].split("#", -1)[0];
String uriPath = matcher.group(3);
this.path = uriPath == null ? "" : uriPath.startsWith("/") ? uriPath.substring(1) : uriPath;
}

/** Returns Azure storage account. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,13 @@ public void testNoSasToken() {
@Test
public void testWithConnectionString() {
AzureProperties props =
new AzureProperties(ImmutableMap.of("adls.connection-string.account1", "http://endpoint"));
new AzureProperties(
ImmutableMap.of(
"adls.connection-string.account1", "https://account1.dfs.core.usgovcloudapi.net"));

DataLakeFileSystemClientBuilder clientBuilder = mock(DataLakeFileSystemClientBuilder.class);
props.applyClientConfiguration("account1", clientBuilder);
verify(clientBuilder).endpoint("http://endpoint");
verify(clientBuilder).endpoint("https://account1.dfs.core.usgovcloudapi.net");
}

@Test
Expand All @@ -111,7 +113,7 @@ public void testNoMatchingConnectionString() {

DataLakeFileSystemClientBuilder clientBuilder = mock(DataLakeFileSystemClientBuilder.class);
props.applyClientConfiguration("account1", clientBuilder);
verify(clientBuilder).endpoint("https://account1");
verify(clientBuilder).endpoint("https://account1.dfs.core.windows.net");
}

@Test
Expand All @@ -120,7 +122,7 @@ public void testNoConnectionString() {

DataLakeFileSystemClientBuilder clientBuilder = mock(DataLakeFileSystemClientBuilder.class);
props.applyClientConfiguration("account", clientBuilder);
verify(clientBuilder).endpoint("https://account");
verify(clientBuilder).endpoint("https://account.dfs.core.windows.net");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,18 @@ public void testLocationParsing(String scheme) {
String p1 = scheme + "://[email protected]/path/to/file";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("path/to/file");
}

@ParameterizedTest
@ValueSource(strings = {"wasb", "wasbs"})
public void testWasbLocatonParsing(String scheme) {
String p1 = scheme + "://[email protected]/path/to/file";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("path/to/file");
}
Expand All @@ -43,7 +54,7 @@ public void testEncodedString() {
String p1 = "abfs://[email protected]/path%20to%20file";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("path%20to%20file");
}
Expand All @@ -67,7 +78,7 @@ public void testNoContainer() {
String p1 = "abfs://account.dfs.core.windows.net/path/to/file";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().isPresent()).isFalse();
assertThat(location.path()).isEqualTo("path/to/file");
}
Expand All @@ -77,28 +88,16 @@ public void testNoPath() {
String p1 = "abfs://[email protected]";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("");
}

@Test
public void testQueryAndFragment() {
String p1 = "abfs://[email protected]/path/to/file?query=foo#123";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("path/to/file");
}

@Test
public void testQueryAndFragmentNoPath() {
String p1 = "abfs://[email protected]?query=foo#123";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("");
@ParameterizedTest
@ValueSource(strings = {"file?.txt", "file%3F.txt"})
public void testQuestionMarkInFileName(String path) {
String fullPath = String.format("abfs://[email protected]/%s", path);
ADLSLocation location = new ADLSLocation(fullPath);
assertThat(location.path()).contains(path);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ public class ResolvingFileIO implements HadoopConfigurable, DelegateFileIO {
"s3n", S3_FILE_IO_IMPL,
"gs", GCS_FILE_IO_IMPL,
"abfs", ADLS_FILE_IO_IMPL,
"abfss", ADLS_FILE_IO_IMPL);
"abfss", ADLS_FILE_IO_IMPL,
"wasb", ADLS_FILE_IO_IMPL,
"wasbs", ADLS_FILE_IO_IMPL);

private final Map<String, DelegateFileIO> ioInstances = Maps.newConcurrentMap();
private final AtomicBoolean isClosed = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ public List<Namespace> listNamespaces(SessionContext context, Namespace namespac

Map<String, String> queryParams = Maps.newHashMap();
if (!namespace.isEmpty()) {
queryParams.put("parent", RESTUtil.encodeNamespace(namespace));
queryParams.put("parent", RESTUtil.NAMESPACE_JOINER.join(namespace.levels()));
}

ImmutableList.Builder<Namespace> namespaces = ImmutableList.builder();
Expand Down
16 changes: 3 additions & 13 deletions core/src/main/java/org/apache/iceberg/rest/RESTUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,14 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

public class RESTUtil {
private static final char NAMESPACE_SEPARATOR = '\u001f';
public static final Joiner NAMESPACE_JOINER = Joiner.on(NAMESPACE_SEPARATOR);
public static final Splitter NAMESPACE_SPLITTER = Splitter.on(NAMESPACE_SEPARATOR);
private static final String NAMESPACE_ESCAPED_SEPARATOR = "%1F";
private static final Joiner NAMESPACE_ESCAPED_JOINER = Joiner.on(NAMESPACE_ESCAPED_SEPARATOR);
private static final Splitter NAMESPACE_ESCAPED_SPLITTER =
Splitter.on(NAMESPACE_ESCAPED_SEPARATOR);

/**
* @deprecated since 1.7.0, will be made private in 1.8.0; use {@link
* RESTUtil#encodeNamespace(Namespace)} instead.
*/
@Deprecated public static final Joiner NAMESPACE_JOINER = Joiner.on(NAMESPACE_ESCAPED_SEPARATOR);

/**
* @deprecated since 1.7.0, will be made private in 1.8.0; use {@link
* RESTUtil#decodeNamespace(String)} instead.
*/
@Deprecated
public static final Splitter NAMESPACE_SPLITTER = Splitter.on(NAMESPACE_ESCAPED_SEPARATOR);

private RESTUtil() {}

public static String stripTrailingSlash(String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,11 @@ public <T extends RESTResponse> T handleRequest(
if (asNamespaceCatalog != null) {
Namespace ns;
if (vars.containsKey("parent")) {
ns = RESTUtil.decodeNamespace(vars.get("parent"));
ns =
Namespace.of(
RESTUtil.NAMESPACE_SPLITTER
.splitToStream(vars.get("parent"))
.toArray(String[]::new));
} else {
ns = Namespace.empty();
}
Expand Down
9 changes: 9 additions & 0 deletions docs/docs/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,9 @@ See [`migrate`](#migrate) to replace an existing table with an Iceberg table.
| `properties` | ️ | map<string, string> | Properties to add to the newly created table |
| `parallelism` | | int | Number of threads to use for file reading (defaults to 1) |

!!! warning
There's a [known issue with `parallelism > 1`](https://github.com/apache/iceberg/issues/11147) that is scheduled to be fixed in the next release.

#### Output

| Output Name | Type | Description |
Expand Down Expand Up @@ -629,6 +632,9 @@ By default, the original table is retained with the name `table_BACKUP_`.
| `backup_table_name` | | string | Name of the table that will be retained as backup (defaults to `table_BACKUP_`) |
| `parallelism` | | int | Number of threads to use for file reading (defaults to 1) |

!!! warning
There's a [known issue with `parallelism > 1`](https://github.com/apache/iceberg/issues/11147) that is scheduled to be fixed in the next release.

#### Output

| Output Name | Type | Description |
Expand Down Expand Up @@ -675,6 +681,9 @@ Warning : Schema is not validated, adding files with different schema to the Ice

Warning : Files added by this method can be physically deleted by Iceberg operations

!!! warning
There's a [known issue with `parallelism > 1`](https://github.com/apache/iceberg/issues/11147) that is scheduled to be fixed in the next release.

#### Output

| Output Name | Type | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,27 +217,27 @@ public void testPrimitiveColumns() throws Exception {

Row binaryCol =
Row.of(
55L,
52L,
4L,
2L,
null,
Base64.getDecoder().decode("1111"),
Base64.getDecoder().decode("2222"));
Row booleanCol = Row.of(36L, 4L, 0L, null, false, true);
Row decimalCol = Row.of(91L, 4L, 1L, null, new BigDecimal("1.00"), new BigDecimal("2.00"));
Row doubleCol = Row.of(91L, 4L, 0L, 1L, 1.0D, 2.0D);
Row booleanCol = Row.of(32L, 4L, 0L, null, false, true);
Row decimalCol = Row.of(85L, 4L, 1L, null, new BigDecimal("1.00"), new BigDecimal("2.00"));
Row doubleCol = Row.of(85L, 4L, 0L, 1L, 1.0D, 2.0D);
Row fixedCol =
Row.of(
47L,
44L,
4L,
2L,
null,
Base64.getDecoder().decode("1111"),
Base64.getDecoder().decode("2222"));
Row floatCol = Row.of(77L, 4L, 0L, 2L, 0f, 0f);
Row intCol = Row.of(77L, 4L, 0L, null, 1, 2);
Row longCol = Row.of(85L, 4L, 0L, null, 1L, 2L);
Row stringCol = Row.of(85L, 4L, 0L, null, "1", "2");
Row floatCol = Row.of(71L, 4L, 0L, 2L, 0f, 0f);
Row intCol = Row.of(71L, 4L, 0L, null, 1, 2);
Row longCol = Row.of(79L, 4L, 0L, null, 1L, 2L);
Row stringCol = Row.of(79L, 4L, 0L, null, "1", "2");

List<Row> expected =
Lists.newArrayList(
Expand Down Expand Up @@ -289,8 +289,8 @@ public void testSelectNestedValues() throws Exception {
public void testNestedValues() throws Exception {
createNestedTable();

Row leafDoubleCol = Row.of(50L, 3L, 1L, 1L, 0.0D, 0.0D);
Row leafLongCol = Row.of(57L, 3L, 1L, null, 0L, 1L);
Row leafDoubleCol = Row.of(46L, 3L, 1L, 1L, 0.0D, 0.0D);
Row leafLongCol = Row.of(54L, 3L, 1L, null, 0L, 1L);
Row metrics = Row.of(Row.of(leafDoubleCol, leafLongCol));

TestHelpers.assertRows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,27 +217,27 @@ public void testPrimitiveColumns() throws Exception {

Row binaryCol =
Row.of(
55L,
52L,
4L,
2L,
null,
Base64.getDecoder().decode("1111"),
Base64.getDecoder().decode("2222"));
Row booleanCol = Row.of(36L, 4L, 0L, null, false, true);
Row decimalCol = Row.of(91L, 4L, 1L, null, new BigDecimal("1.00"), new BigDecimal("2.00"));
Row doubleCol = Row.of(91L, 4L, 0L, 1L, 1.0D, 2.0D);
Row booleanCol = Row.of(32L, 4L, 0L, null, false, true);
Row decimalCol = Row.of(85L, 4L, 1L, null, new BigDecimal("1.00"), new BigDecimal("2.00"));
Row doubleCol = Row.of(85L, 4L, 0L, 1L, 1.0D, 2.0D);
Row fixedCol =
Row.of(
47L,
44L,
4L,
2L,
null,
Base64.getDecoder().decode("1111"),
Base64.getDecoder().decode("2222"));
Row floatCol = Row.of(77L, 4L, 0L, 2L, 0f, 0f);
Row intCol = Row.of(77L, 4L, 0L, null, 1, 2);
Row longCol = Row.of(85L, 4L, 0L, null, 1L, 2L);
Row stringCol = Row.of(85L, 4L, 0L, null, "1", "2");
Row floatCol = Row.of(71L, 4L, 0L, 2L, 0f, 0f);
Row intCol = Row.of(71L, 4L, 0L, null, 1, 2);
Row longCol = Row.of(79L, 4L, 0L, null, 1L, 2L);
Row stringCol = Row.of(79L, 4L, 0L, null, "1", "2");

List<Row> expected =
Lists.newArrayList(
Expand Down Expand Up @@ -289,8 +289,8 @@ public void testSelectNestedValues() throws Exception {
public void testNestedValues() throws Exception {
createNestedTable();

Row leafDoubleCol = Row.of(50L, 3L, 1L, 1L, 0.0D, 0.0D);
Row leafLongCol = Row.of(57L, 3L, 1L, null, 0L, 1L);
Row leafDoubleCol = Row.of(46L, 3L, 1L, 1L, 0.0D, 0.0D);
Row leafLongCol = Row.of(54L, 3L, 1L, null, 0L, 1L);
Row metrics = Row.of(Row.of(leafDoubleCol, leafLongCol));

TestHelpers.assertRows(
Expand Down
Loading