Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADLS integration #120

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2d59981
code for ADLS integration; untested
Jun 6, 2024
61a679b
added code for remote connection to adls, URI not fully functional
kmcclenn Jun 14, 2024
d895da8
Renameed files for compatibility
kmcclenn Jun 14, 2024
16f99ce
Successful integration with Azure Storage
kmcclenn Jun 19, 2024
cf80ec1
More formatting fixes, cleaned up code
kmcclenn Jun 20, 2024
bb71ee3
formatting fixes
kmcclenn Jun 20, 2024
0c27c50
removed s3 references so tests/build passes
kmcclenn Jun 20, 2024
bd9d60e
changes to fix client side, still running into slight errors
kmcclenn Jun 22, 2024
ae2092b
changes to fix client side
kmcclenn Jun 22, 2024
a04e81f
working integration for client and server
kmcclenn Jun 24, 2024
399aaef
added dependency to pass tests
kmcclenn Jun 24, 2024
a900689
minor changes for PR
kmcclenn Jun 27, 2024
240102b
PR fixes:
kmcclenn Jun 28, 2024
692149c
Minor formatting/style fixes
kmcclenn Jun 28, 2024
c9ba0cc
table uri uses allocateTableLocation now
kmcclenn Jul 15, 2024
b7e62b1
minor changes
kmcclenn Jul 15, 2024
ed3fbed
removed unnecesary config, cleaned code
kmcclenn Jul 25, 2024
347384b
Fixed error message formatting for storage properties
kmcclenn Jul 25, 2024
913a37c
fixed deleted s3 issue
kmcclenn Jul 25, 2024
3027a1f
changed cluster name
kmcclenn Jul 30, 2024
321c335
moved fileIO creation to fileIOConfig
kmcclenn Jul 30, 2024
169135a
removed jar package for iceberg
kmcclenn Jul 30, 2024
d42ce94
more small fixes
kmcclenn Jul 30, 2024
5861448
modified spark shell commands
kmcclenn Jul 30, 2024
8a521f4
small settings fix
kmcclenn Jul 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion SETUP.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,9 @@ docker exec -it local.spark-master /bin/bash
```

Start `spark-shell` with the following command: Available users are `openhouse` and `u_tableowner`.

```
bin/spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.2.0 \
bin/spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.2.0 \
--jars openhouse-spark-runtime_2.12-*-all.jar \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,com.linkedin.openhouse.spark.extensions.OpenhouseSparkSessionExtensions \
--conf spark.sql.catalog.openhouse=org.apache.iceberg.spark.SparkCatalog \
Expand All @@ -244,6 +245,23 @@ bin/spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.2
--conf spark.sql.catalog.openhouse.cluster=LocalHadoopCluster
```

If you are integrating with ADLS, use this `spark-shell` command instead:

```
bin/spark-shell --packages org.apache.iceberg:iceberg-azure:1.5.0,org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.2.0 \
jiang95-dev marked this conversation as resolved.
Show resolved Hide resolved
--jars openhouse-spark-apps_2.12-*-all.jar,openhouse-spark-runtime_2.12-latest-all.jar \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,com.linkedin.openhouse.spark.extensions.OpenhouseSparkSessionExtensions \
--conf spark.sql.catalog.openhouse=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.openhouse.catalog-impl=com.linkedin.openhouse.spark.OpenHouseCatalog \
--conf spark.sql.catalog.openhouse.metrics-reporter-impl=com.linkedin.openhouse.javaclient.OpenHouseMetricsReporter \
--conf spark.sql.catalog.openhouse.uri=http://openhouse-tables:8080 \
--conf spark.sql.catalog.openhouse.auth-token=$(cat /var/config/$(whoami).token) \
--conf spark.sql.catalog.openhouse.cluster=LocalABSCluster \
--conf spark.sql.catalog.openhouse.io-impl=org.apache.iceberg.azure.adlsv2.ADLSFileIO \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed only for the ADLS storage.

My recommendation will be to provide different spark-shell commands here for different recipes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to the above recommendation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, fixed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am actually not sure why io-impl is needed to be passed as part of spark conf. An instance of this class should be returned by OpenHouseTableOperations. CC: @HotSushi

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not really sure why, but the spark cluster only successfully completes the requests when it is initiated with it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am actually not sure why io-impl is needed to be passed as part of spark conf

Currently the code to map storageType -> client FileIO on tableOperations does not exist, but will be added later when supporting multi-storage per cluster. Till then, this is a cleaner way of supporting a fileio per cluster

--conf spark.sql.catalog.openhouse.adls.auth.shared-key.account.name= <account name> \
--conf spark.sql.catalog.openhouse.adls.auth.shared-key.account.key= <account key>
Comment on lines +261 to +262
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again is this something we need user to pass or something we were letting the catalog retrieve. How were we thinking of this for s3/minio? CC: @HotSushi

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seemed like in the s3 PR variables were passed in a similar fashion.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again is this something we need user to pass or something we were letting the catalog retrieve

configuring a fileIO on client side needs such details to be provided at the client side. Its similar to hdfs, OH server doesn't communicate all confs necessary to instantiate hdfs client completely. Its assumed that client has the right hdfs communication already established.

```

#### Create a table

```
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
ext {
icebergAzureVersion = '1.5.2'
}

dependencies {
// Ideally, we have these, but they are only supported for iceberg version >= 1.4.0, which is not compatible
// with the current Openhouse implementation.
// implementation('org.apache.iceberg:iceberg-azure:' + icebergAzureVersion)
// implementation('org.apache.iceberg:iceberg-azure-bundle:' + icebergAzureVersion)
Comment on lines +6 to +9
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Ideally, we have these, but they are only supported for iceberg version >= 1.4.0, which is not compatible
// with the current Openhouse implementation.
// implementation('org.apache.iceberg:iceberg-azure:' + icebergAzureVersion)
// implementation('org.apache.iceberg:iceberg-azure-bundle:' + icebergAzureVersion)
// Ideally, we have these, but they are only supported for iceberg version >= 1.4.0, which is not compatible
// with the current Openhouse implementation.
implementation('org.apache.iceberg:iceberg-azure:' + icebergAzureVersion)
implementation('org.apache.iceberg:iceberg-azure-bundle:' + icebergAzureVersion)

If we do this, then do we need to copy code? It might be ok to pull in library for iceberg-azure at version 1.5.2 for use in openhouse repo. Thoughts @HotSushi ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this - it gives more dependency issues.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we gave this a shot. Compatibility mismatch because FileIO interface has evolved from 1.2 -> 1.4


implementation("com.azure:azure-storage-file-datalake:12.19.1") {
exclude group: 'io.netty'
exclude group: 'io.projectreactor.netty'
}

implementation("com.azure:azure-identity:1.12.1") {
exclude group: 'io.netty'
exclude group: 'io.projectreactor.netty'
}

}
4 changes: 2 additions & 2 deletions buildSrc/src/main/groovy/openhouse.iceberg-conventions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ dependencies {
implementation('org.apache.iceberg:iceberg-bundled-guava:' + icebergVersion)
implementation('org.apache.iceberg:iceberg-data:' + icebergVersion)
implementation('org.apache.iceberg:iceberg-core:' + icebergVersion)

testImplementation('org.apache.iceberg:iceberg-common:' + icebergVersion)
implementation('org.apache.iceberg:iceberg-common:' + icebergVersion)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So iceberg common is needed as implementation with ADLS integration?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of right now, yes. If I do the submodules later I may reevaluate

implementation('org.testcontainers:testcontainers:1.19.8')
}
2 changes: 2 additions & 0 deletions cluster/storage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ plugins {
id 'openhouse.java-conventions'
id 'openhouse.hadoop-conventions'
id 'openhouse.iceberg-aws-conventions'
id 'openhouse.iceberg-azure-conventions'
id 'openhouse.iceberg-conventions'
id 'openhouse.maven-publish'
}

dependencies {
implementation project(':cluster:configs')
implementation project(':iceberg:azure')
jiang95-dev marked this conversation as resolved.
Show resolved Hide resolved
implementation 'org.springframework.boot:spring-boot-autoconfigure:' + spring_web_version
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class StorageType {
public static final Type HDFS = new Type("hdfs");
public static final Type LOCAL = new Type("local");
public static final Type S3 = new Type("s3");
public static final Type ADLS = new Type("adls");

@AllArgsConstructor
@EqualsAndHashCode
Expand All @@ -33,6 +34,8 @@ public Type fromString(String type) {
return LOCAL;
} else if (S3.getValue().equals(type)) {
return S3;
} else if (ADLS.getValue().equals(type)) {
return ADLS;
} else {
throw new IllegalArgumentException("Unknown storage type: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.linkedin.openhouse.cluster.storage.adls;

import com.linkedin.openhouse.cluster.storage.BaseStorage;
import com.linkedin.openhouse.cluster.storage.StorageClient;
import com.linkedin.openhouse.cluster.storage.StorageType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

/**
* The ADLS Storage class is an implementation of the Storage interface for cloud storage with Azure
* Data Lake Storage.
*/
@Component
public class AdlsStorage extends BaseStorage {

// Declare client class to interact with ADLS filesystems/storage
@Autowired @Lazy private AdlsStorageClient adlsStorageClient;

jiang95-dev marked this conversation as resolved.
Show resolved Hide resolved
/**
* Get the type of the ADLS storage.
*
* @return the type of the ADLS storage
*/
@Override
public StorageType.Type getType() {
return StorageType.ADLS;
}

/**
* Get the ADLS storage client.
*
* @return the ADLS storage client
*/
@Override
public StorageClient<?> getClient() {
return adlsStorageClient;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.linkedin.openhouse.cluster.storage.adls;

import com.azure.storage.file.datalake.DataLakeFileClient;
import com.linkedin.openhouse.cluster.storage.BaseStorageClient;
import com.linkedin.openhouse.cluster.storage.StorageType;
import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

/**
* AdlsStorageClient is an implementation of the StorageClient interface for Azure Data Lake
* Storage. It uses the {@link DataLakeFileClient} to interact with Data Lake Storage.
*/
@Slf4j
@Lazy
@Component
public class AdlsStorageClient extends BaseStorageClient<DataLakeFileClient> {

private static final StorageType.Type ADLS_TYPE = StorageType.ADLS;

@Autowired private StorageProperties storageProperties;

private Map properties;

// TODO: Instantiate DLFC: https://github.com/linkedin/openhouse/issues/148
private DataLakeFileClient dataLakeClient = null;

/** Intialize the ADLS Client when the bean is accessed the first time. */
@PostConstruct
public synchronized void init() {

validateProperties();

// Gets the parameters from the ADLS storage type
properties =
new HashMap(storageProperties.getTypes().get(ADLS_TYPE.getValue()).getParameters());

// Try to create a URI with the endpoint and rootpath
URI uri;
try {
uri = new URI(getEndpoint() + getRootPrefix());
} catch (URISyntaxException e) {
throw new IllegalArgumentException(
String.format(
"Bad storage properties provided: [endpoint: %s, rootPrefix: %s] for type [%s]",
getEndpoint(), getRootPrefix(), ADLS_TYPE.getValue()),
e);
}
}

@Override
public DataLakeFileClient getNativeClient() {
return dataLakeClient;
}

@Override
public StorageType.Type getStorageType() {
return ADLS_TYPE;
}
}
Loading
Loading