-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ADLS integration #120
base: kai/azure-sandbox
Are you sure you want to change the base?
ADLS integration #120
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neat PR, all the interfaces are nicely implemented! left some comments. Please use BaseStorage and BaseStorageClient so that code replication can be removed.
cluster/storage/build.gradle
Outdated
@@ -9,4 +9,6 @@ plugins { | |||
dependencies { | |||
implementation project(':cluster:configs') | |||
implementation 'org.springframework.boot:spring-boot-autoconfigure:' + spring_web_version | |||
implementation 'org.apache.iceberg:iceberg-azure:1.5.2' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be good to move these to conventions file like aws-conventions
|
||
private DataLakeFileClient dataLake; | ||
|
||
private static final String DEFAULT_ENDPOINT = "abfs:/"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
endpoint needs to be= "abfs://"
default_rootpath/bucket can be = "tmp"
reason: we want all object stores to have this pattern so that provisioning bucket becomes easier
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to configure these defaults here?
if (storageProperties.getTypes() != null && !storageProperties.getTypes().isEmpty()) { | ||
|
||
// fail if properties are invalid | ||
Preconditions.checkArgument( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be able to call BaseStorageClient:validateProperties() instead of re-defining logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -14,6 +14,10 @@ dependencies { | |||
implementation project(':cluster:storage') | |||
implementation project(':cluster:metrics') | |||
|
|||
// for ADLS interfacing | |||
implementation 'org.apache.iceberg:iceberg-azure:1.5.2' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same conventions comment as before
@@ -33,9 +33,15 @@ public class FileIOManager { | |||
@Qualifier("LocalFileIO") | |||
FileIO localFileIO; | |||
|
|||
@Autowired(required = false) // doesn't inject if null | |||
@Qualifier("ADLSFileIO") // avoid ambiguity with multiple beans of the same type | |||
FileIO adlsFileIO; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let it autowire ADLSFileIO
directly instead of qualifier + FileIO
.
Reason why we used qualifier before: Because HDFS and Local use same HdfsFileIO
* | ||
* @return ADLSFileIO bean for ADLS storage type, or null if ADLS storage type is not configured | ||
*/ | ||
@Bean("ADLSFileIO") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets take out the qualifier, explained the reasoning in other comment.
|
||
@Autowired @Lazy | ||
private ADLSStorageClient | ||
adlsStorageClient; // declare client class to interact with ADLS filesystem |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please be consistent with the comment placement? The rest of code places comment on the line before the function.
private ADLSStorageClient | ||
adlsStorageClient; // declare client class to interact with ADLS filesystem | ||
|
||
// do we need an isConfigured method? to check if ADLS is configured. Leaving out for now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can leave it out.
import org.springframework.context.annotation.Lazy; | ||
import org.springframework.stereotype.Component; | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment does not match the implementation below.
The implementation is using DataLakeFileClient but the comments mentions BlobClient,
import org.springframework.stereotype.Component; | ||
|
||
/** | ||
* ABSStorageClient is an implementation of the StorageClient interface for Azure Blob Storage. It |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typo. ADLSStorageClient. Also, should we name it AdlsStorageClient to be consistent with Hdfs?
|
||
private DataLakeFileClient dataLake; | ||
|
||
private static final String DEFAULT_ENDPOINT = "abfs:/"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to configure these defaults here?
|
||
private static final String DEFAULT_ROOTPATH = "/tmp/"; | ||
|
||
private String endpoint; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, why do you need these? Endpoint and rootPath should come from the configuration.
if (storageProperties.getTypes() != null && !storageProperties.getTypes().isEmpty()) { | ||
|
||
// fail if properties are invalid | ||
Preconditions.checkArgument( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Map properties = | ||
new HashMap(storageProperties.getTypes().get(ADLS_TYPE.getValue()).getParameters); | ||
|
||
endpoint = storageProperties.getTypes().get(LOCAL_TYPE.getValue()).getEndpoint(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validation ensures that endpoint and rootPath are configured. You shouldn't need to override it.
|
||
@Autowired private StorageProperties storageProperties; | ||
|
||
private DataLakeFileClient dataLake; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please name this dataLakeClient.
|
||
@Override | ||
public String getEndpoint() { | ||
return endpoint; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend getting rid of these class-local variables.
Also, no need to override getEndpoint() and getRootPrefix(). You can use those defines in the BaseStorageClient.
7c18a8c
to
e716a20
Compare
1e930e6
to
65115d8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments on code layout and copied code
@@ -0,0 +1,337 @@ | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this file needed?
@@ -0,0 +1,25 @@ | |||
|
|||
Apache Iceberg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.azure; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the azure code in org.Apache.iceberg package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it is the azure integration with iceberg https://github.com/apache/iceberg/tree/main/azure
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.azure.adlsv2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto. @jiang95-dev / @abhisheknath2011 can you help with the packaging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can have package as com.linkedin.openhouse.azure.adlsv2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nvm. I got the context from Kai why we are copying code. Either way is ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I asked Kai to send explore git submodules instead of copying code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, sounds good. Yes copying is required due to iceberg version compatibility issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm..I am interested in this as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For an update, it seems like you can only submodule an entire repository, not a subfolder. Is that still worth it? There were some sketchy solutions that I tried to get just a subfolder, but none of them worked for me.
} | ||
|
||
@Override | ||
public int read(byte[] b, int off, int len) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is all this code copied? Is it also available in Iceberg library artifacts to use as a library dependency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The submodules is difficult because I need to change a slight thing in the iceberg azure code in order for it to work. I am not going to include them for now but then create a new PR that fixes the root issues with the Tables URI which will then allow me to add the submodules as well.
cfaf8f9
to
141511e
Compare
import org.testcontainers.containers.GenericContainer; | ||
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; | ||
|
||
public class AzuriteContainer extends GenericContainer<AzuriteContainer> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is found in iceberg/azure/src/test not iceberg/azure/src/main
# minioS3: | ||
# image: minio/minio | ||
# environment: | ||
# - MINIO_ROOT_USER=admin | ||
# - MINIO_ROOT_PASSWORD=password | ||
# ports: | ||
# - 9871:9001 | ||
# - 9870:9000 | ||
# command: [ "server", "/data", "--console-address", ":9001" ] | ||
# mc: | ||
# depends_on: | ||
# - minioS3 | ||
# image: minio/mc | ||
# environment: | ||
# - AWS_ACCESS_KEY_ID=admin | ||
# - AWS_SECRET_ACCESS_KEY=password | ||
# - AWS_REGION=us-east-1 | ||
# entrypoint: > | ||
# /bin/sh -c " | ||
# until (/usr/bin/mc config host add minio http://minioS3:9000 admin password) do echo '...waiting...' && sleep 1; done; | ||
# /usr/bin/mc rm -r --force minio/openhouse-bucket; | ||
# /usr/bin/mc mb minio/openhouse-bucket; | ||
# /usr/bin/mc policy set public minio/openhouse-bucket; | ||
# tail -f /dev/null | ||
# " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# minioS3: | |
# image: minio/minio | |
# environment: | |
# - MINIO_ROOT_USER=admin | |
# - MINIO_ROOT_PASSWORD=password | |
# ports: | |
# - 9871:9001 | |
# - 9870:9000 | |
# command: [ "server", "/data", "--console-address", ":9001" ] | |
# mc: | |
# depends_on: | |
# - minioS3 | |
# image: minio/mc | |
# environment: | |
# - AWS_ACCESS_KEY_ID=admin | |
# - AWS_SECRET_ACCESS_KEY=password | |
# - AWS_REGION=us-east-1 | |
# entrypoint: > | |
# /bin/sh -c " | |
# until (/usr/bin/mc config host add minio http://minioS3:9000 admin password) do echo '...waiting...' && sleep 1; done; | |
# /usr/bin/mc rm -r --force minio/openhouse-bucket; | |
# /usr/bin/mc mb minio/openhouse-bucket; | |
# /usr/bin/mc policy set public minio/openhouse-bucket; | |
# tail -f /dev/null | |
# " |
* Support</a> | ||
*/ | ||
class ADLSLocation { | ||
private static final Pattern URI_PATTERN = Pattern.compile("^(abfss?://)?([^/?#]+)(.*)?$"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the optional 'abfs' to fix the bug where the table location was dropping the prefix. Feels a bit hacky though -- any other ideas for solutions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix this in a new PR -- dealing with the constructTablePath
function to change its return type to a URI
@@ -181,6 +181,11 @@ private static long safeParseLong(String keyName, Map<String, String> megaProps) | |||
* in HTS and client-visible table location. | |||
*/ | |||
static String getSchemeLessPath(String rawPath) { | |||
return URI.create(rawPath).getPath(); | |||
URI uri = URI.create(rawPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also added this to allow for locations with and without the prefix to be equal. However, also feels a bit hacky -- is this the best way to go about it or does anyone see a better solution?
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,com.linkedin.openhouse.spark.extensions.OpenhouseSparkSessionExtensions \ | ||
--conf spark.sql.catalog.openhouse=org.apache.iceberg.spark.SparkCatalog \ | ||
--conf spark.sql.catalog.openhouse.catalog-impl=com.linkedin.openhouse.spark.OpenHouseCatalog \ | ||
--conf spark.sql.catalog.openhouse.metrics-reporter-impl=com.linkedin.openhouse.javaclient.OpenHouseMetricsReporter \ | ||
--conf spark.sql.catalog.openhouse.uri=http://openhouse-tables:8080 \ | ||
--conf spark.sql.catalog.openhouse.auth-token=$(cat /var/config/$(whoami).token) \ | ||
--conf spark.sql.catalog.openhouse.cluster=LocalHadoopCluster | ||
--conf spark.sql.catalog.openhouse.cluster=LocalFSCluster \ | ||
--conf spark.sql.catalog.openhouse.io-impl=org.apache.iceberg.azure.adlsv2.ADLSFileIO \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed only for the ADLS storage.
My recommendation will be to provide different spark-shell commands here for different recipes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to the above recommendation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am actually not sure why io-impl
is needed to be passed as part of spark conf. An instance of this class should be returned by OpenHouseTableOperations. CC: @HotSushi
id 'openhouse.iceberg-conventions' | ||
id 'openhouse.maven-publish' | ||
} | ||
|
||
dependencies { | ||
implementation project(':cluster:configs') | ||
implementation project(':iceberg:azure') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure this is needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes because we need to import the copied files. I will revisit if I get the submodules to work
@Component | ||
public class AdlsStorage extends BaseStorage { | ||
|
||
// Declare client class to interact with ADLS filesystem |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/filesystem/storage
|
||
// Declare client class to interact with ADLS filesystem | ||
@Autowired @Lazy private AdlsStorageClient adlsStorageClient; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add javadoc comments.
|
||
private DataLakeFileClient dataLakeClient; | ||
|
||
@Getter private ADLSFileIO fileIO; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need FileIO in the storage client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to return the fileIO
object in provideADLSFileIO()
in the FileIOConfig.java
file, i had to initialize it in the storage client. Unlike other storage methods like s3, the DataLakeClient didn't natively contain the properties
to initialize the ADLSFileIO
, so I had to do it before than then pass the object. Let me know if I should try to find a different solution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to @jainlavina's question
I don't see FileIO in the S3 storage client https://github.com/linkedin/openhouse/blob/main/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/s3/S3StorageClient.java
Can we have consistency in layering of objects? @HotSushi can you help @kmcclenn here.
|
||
validateProperties(); | ||
|
||
endpoint = storageProperties.getTypes().get(ADLS_TYPE.getValue()).getEndpoint(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use getEndpoint() from BaseStorageClient.
Line 26 in 397e483
public String getEndpoint() { |
|
||
URI uri; | ||
Map properties; | ||
String endpoint; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to declare these and initialize them separately.
IF you remove the if block because the validation is done in validateProperties(), then you can have code that looks like:
String endpoint = getEndpoint();
or alternatively, just use getEndpoint() directly.
if (dataLakeClient == null) { | ||
this.fileIO = new ADLSFileIO(); | ||
fileIO.initialize(properties); | ||
DataLakeFileSystemClient client = fileIO.client(uri.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is FileIO the only way to create a ADLS client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think so but it provides a nice abstraction that lets us do it, plus we likely need to create the ADLSFileIO anyway to use in FileIOConfig. Let me know if I should find another way though
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.azure; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.azure.adlsv2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm..I am interested in this as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job @kmcclenn! Thanks for debugging and fixing the issues. Left come comments. It would be good to capture the below details on the PR:
- Azure specific libs are are introduced here and the version incompatibility with Iceberg. Approach that was followed to integrate.
- Details of the certain lib exclusions that are needed to test the integration on client/server side.
- Azure integration doc. Need not be in this PR.
``` | ||
bin/spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.2.0 \ | ||
--jars openhouse-spark-runtime_2.12-*-all.jar \ | ||
bin/spark-shell --packages org.apache.iceberg:iceberg-azure:1.5.0,org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:1.2.0 \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we still need to pass Iceberg-azure as part of packages instead of dependency on spark-runtime as OH currently supports iceberg v1.2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so because version 1.2 doesn't have the iceberg-azure implementation
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,com.linkedin.openhouse.spark.extensions.OpenhouseSparkSessionExtensions \ | ||
--conf spark.sql.catalog.openhouse=org.apache.iceberg.spark.SparkCatalog \ | ||
--conf spark.sql.catalog.openhouse.catalog-impl=com.linkedin.openhouse.spark.OpenHouseCatalog \ | ||
--conf spark.sql.catalog.openhouse.metrics-reporter-impl=com.linkedin.openhouse.javaclient.OpenHouseMetricsReporter \ | ||
--conf spark.sql.catalog.openhouse.uri=http://openhouse-tables:8080 \ | ||
--conf spark.sql.catalog.openhouse.auth-token=$(cat /var/config/$(whoami).token) \ | ||
--conf spark.sql.catalog.openhouse.cluster=LocalHadoopCluster | ||
--conf spark.sql.catalog.openhouse.cluster=LocalFSCluster \ | ||
--conf spark.sql.catalog.openhouse.io-impl=org.apache.iceberg.azure.adlsv2.ADLSFileIO \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to the above recommendation.
@@ -6,6 +6,6 @@ dependencies { | |||
implementation('org.apache.iceberg:iceberg-bundled-guava:' + icebergVersion) | |||
implementation('org.apache.iceberg:iceberg-data:' + icebergVersion) | |||
implementation('org.apache.iceberg:iceberg-core:' + icebergVersion) | |||
|
|||
testImplementation('org.apache.iceberg:iceberg-common:' + icebergVersion) | |||
implementation('org.apache.iceberg:iceberg-common:' + icebergVersion) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So iceberg common is needed as implementation
with ADLS integration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As of right now, yes. If I do the submodules later I may reevaluate
@@ -43,3 +43,6 @@ MAPRED_CONF_yarn_app_mapreduce_am_env=HADOOP_MAPRED_HOME=/opt/hadoop-3.2.1/ | |||
MAPRED_CONF_mapreduce_map_env=HADOOP_MAPRED_HOME=/opt/hadoop-3.2.1/ | |||
MAPRED_CONF_mapreduce_reduce_env=HADOOP_MAPRED_HOME=/opt/hadoop-3.2.1/ | |||
|
|||
AZURE_CLIENT_ID=<client-id> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So these Azure specific details need to be manually populated, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are these needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are not actually. Good catch
@@ -37,6 +38,11 @@ dependencies { | |||
runtimeOnly "org.antlr:antlr4-runtime:4.7.1" | |||
antlr "org.antlr:antlr4:4.7.1" | |||
|
|||
fatJarRuntimeDependencies("com.azure:azure-storage-file-datalake:12.19.1") { | |||
exclude group: 'io.netty' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some details in the PR description regarding the netty version brought in by these changes and the libs that are excluded. Good to add some details in PR on the iceberg azure libs are are inegrated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looking good. Left some minor comments. There are some open items related to handling URI, code azure those are copied etc. Please add a comment in the PR description as you are planning to handle in the next PR and also update the PR description based on my last review.
} else { | ||
throw new IllegalArgumentException("Unknown storage type: " + type); | ||
throw new IllegalArgumentException("Unknown storage type " + type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new IllegalArgumentException("Unknown storage type " + type); | |
throw new IllegalArgumentException("Unknown storage type: " + type); |
iceberg/azure/build.gradle
Outdated
id 'openhouse.iceberg-azure-conventions' | ||
} | ||
|
||
dependencies { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this section empty? if so we can remove this.
import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
import org.apache.iceberg.util.PropertyUtil; | ||
|
||
public class AzureProperties implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we put some class level comment here? Although this is copied it would be helpful.
@@ -33,9 +36,15 @@ public class FileIOManager { | |||
@Qualifier("LocalFileIO") | |||
FileIO localFileIO; | |||
|
|||
@Autowired(required = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Autowired(required = false) | |
@Qualifier("ADLSFileIO") | |
@Autowired(required = false) |
Do we need qualifier here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sushant said we don't because we only have one storage type using ADLSFileIO
so it Autowires directly. See this comment: #120 (comment)
--conf spark.sql.catalog.openhouse.adls.auth.shared-key.account.name= <account name> \ | ||
--conf spark.sql.catalog.openhouse.adls.auth.shared-key.account.key= <account key> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again is this something we need user to pass or something we were letting the catalog retrieve. How were we thinking of this for s3/minio? CC: @HotSushi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seemed like in the s3 PR variables were passed in a similar fashion.
// Ideally, we have these, but they are only supported for iceberg version >= 1.4.0, which is not compatible | ||
// with the current Openhouse implementation. | ||
// implementation('org.apache.iceberg:iceberg-azure:' + icebergAzureVersion) | ||
// implementation('org.apache.iceberg:iceberg-azure-bundle:' + icebergAzureVersion) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Ideally, we have these, but they are only supported for iceberg version >= 1.4.0, which is not compatible | |
// with the current Openhouse implementation. | |
// implementation('org.apache.iceberg:iceberg-azure:' + icebergAzureVersion) | |
// implementation('org.apache.iceberg:iceberg-azure-bundle:' + icebergAzureVersion) | |
// Ideally, we have these, but they are only supported for iceberg version >= 1.4.0, which is not compatible | |
// with the current Openhouse implementation. | |
implementation('org.apache.iceberg:iceberg-azure:' + icebergAzureVersion) | |
implementation('org.apache.iceberg:iceberg-azure-bundle:' + icebergAzureVersion) |
If we do this, then do we need to copy code? It might be ok to pull in library for iceberg-azure at version 1.5.2 for use in openhouse repo. Thoughts @HotSushi ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this - it gives more dependency issues.
ADLSFileIO fileIO = | ||
((AdlsStorageClient) storageManager.getStorage(StorageType.ADLS).getClient()).getFileIO(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we create the ADLSFileIO object here, like we do S3FileIO?
Line 78 in 36aba11
return new S3FileIO(() -> s3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, ADLSFileIO doesnt have the same constructor that allows us to pass in a created client. Instead, we have to initialize it with the inputted properties. https://github.com/kmcclenn/openhouse/blob/ed0d2293904b995178c2614679b7c6aae3eb0d33/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/adls/AdlsStorageClient.java#L65
5e22664
to
02bf559
Compare
ef0d92e
to
7d74714
Compare
Summary
Adding integration for ADLS with openhouse deployed locally on a docker container. Using an ABS image for docker.
Changes
Packages introduced/removed
OpenHouse is currently using Iceberg version
1.2.0
. However, this version does not have the Iceberg/Azure implementation code that we need for this PR (the code was introduced in version1.4.0
). Thus, these files were copied into theiceberg/azure
folder so they can be utilized for ADLS integration. There are two future changes that will allow us to remove these copied files:>= 1.4.0
, we can simply use theorg.apache.iceberg:iceberg-azure
package.We also use the packages
com.azure:azure-storage-file-datalake:12.19.1
andcom.azure:azure-identity:1.12.1
to get the objects associated with the Data Lake client and the client identity verification. However, these packages usenetty
version4.1.108.Final
, yet the OpenHouse repo requires anetty
version of4.1.75.Final
. Thus, we need to make sure to exclude the groupsio.netty
andio.projectreactor.netty
when these Azure SDKs are imported.Testing Done
Was able to run the openhouse-tables container locally on docker by using the
docker compose up -d
command in theinfra/recipes/docker-compose/oh-abs-spark/
folder. Was then able to make requests using Postman that created and modified a table that was reflected in my Azure account. Was also able to make spark sql calls in the spark-shell that successfully updated the Azure storage account. See video below.Screen.Recording.2024-06-21.at.7.44.45.PM.mov
Additional Information
To replicate this PR, make sure to create your own storage account and blob container. Make sure to allow for anonymous read access for containers and blobs (as in this link).
Future PRs
constructTablePath
function and change its response type toURI
so the schema is included in the Table location. This will allow me to add submodules for the iceberg/azure code because I won't need the temporary modification of theADLSLocation.java
file anymore.