-
Notifications
You must be signed in to change notification settings - Fork 405
Make GC work with EMR 7.0.0 #9013
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 87 commits
2b4eb7c
0a811fc
0883db7
c5fe3b7
8dba842
8742b36
867b44b
f053195
c6cf6a3
ef04b3a
1125925
4e7c055
2a12d48
bbe0927
689ce97
476e7bb
5d34dec
3f0aac1
b257af7
b26632b
3279cfb
5a42aea
7188598
b7b2891
63aaf68
4ae55b3
3afd435
a03179f
62f28db
bd7f9e3
fe00178
b17d0b0
91c6faa
f82a3aa
ebe8a62
8526186
4184cf0
bfc34d6
eb2d3d4
bb55c31
045b1a3
f758199
b9eb652
d648480
b12b32f
bdd3222
bf900dc
2858223
51a03e7
bb4bb10
fa7bc5a
03a0be3
4707004
d94a4b0
128b355
6c631dd
84ae17b
4026128
da64484
6c83598
a76f231
2eb2275
4bf02e1
d76b199
5e26f65
19c8467
fd46fd0
473d09d
6daf089
40ccf5d
30a9973
cf65eb1
6365eac
8c28317
e46bb6f
4ce21bf
83215f6
e27db78
48389b8
1060a68
1dbf653
3c87229
04b792a
ba732d3
467d198
dc57c79
b48df4c
4a1de2b
0fa17d4
bdf7b44
6a0409d
54cfc27
4174480
fb24bff
be6e8ec
0ea188b
9eea78b
a079871
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -99,16 +99,28 @@ class EntryRecordReader[Proto <: GeneratedMessage with scalapb.Message[Proto]]( | |
|
|
||
| val gravelerSplit = split.asInstanceOf[GravelerSplit] | ||
|
|
||
| // Log the path before processing | ||
| logger.info(s"Processing file: ${gravelerSplit.path}") | ||
|
|
||
| val fs = gravelerSplit.path.getFileSystem(context.getConfiguration) | ||
| fs.copyToLocalFile(false, gravelerSplit.path, new Path(localFile.getAbsolutePath), true) | ||
| // TODO(johnnyaug) should we cache this? | ||
| sstableReader = new SSTableReader(localFile.getAbsolutePath, companion, true) | ||
| if (!gravelerSplit.isValidated) { | ||
| // this file may not be a valid range file, validate it | ||
| val props = sstableReader.getProperties | ||
| logger.debug(s"Props: $props") | ||
| if (new String(props("type")) != "ranges" || props.contains("entity")) { | ||
| return | ||
| try { | ||
| val props = sstableReader.getProperties | ||
| logger.debug(s"Props: $props") | ||
| if (new String(props("type")) != "ranges" || props.contains("entity")) { | ||
| return | ||
| } | ||
| } catch { | ||
| case e: io.treeverse.jpebble.BadFileFormatException => | ||
| logger.error(s"File format validation failed for: ${gravelerSplit.path}", e) | ||
|
||
| throw new io.treeverse.jpebble.BadFileFormatException( | ||
| s"Bad file format in ${gravelerSplit.path}: ${e.getMessage}", | ||
| e | ||
| ) | ||
| } | ||
| } | ||
| rangeID = gravelerSplit.rangeID | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,10 @@ | ||
| package io.treeverse.clients | ||
|
|
||
| import com.amazonaws.auth.AWSCredentialsProvider | ||
| import com.amazonaws.auth.{ | ||
| AWSCredentialsProvider, | ||
| DefaultAWSCredentialsProviderChain, | ||
| STSAssumeRoleSessionCredentialsProvider | ||
| } | ||
| import com.amazonaws.client.builder.AwsClientBuilder | ||
| import com.amazonaws.retry.PredefinedRetryPolicies.SDKDefaultRetryCondition | ||
| import com.amazonaws.retry.RetryUtils | ||
|
|
@@ -11,6 +15,7 @@ import org.slf4j.{Logger, LoggerFactory} | |
|
|
||
| import java.net.URI | ||
| import java.util.concurrent.TimeUnit | ||
| import java.util.UUID | ||
|
|
||
| object StorageUtils { | ||
| val StorageTypeS3 = "s3" | ||
|
|
@@ -91,7 +96,7 @@ object StorageUtils { | |
|
|
||
| def createAndValidateS3Client( | ||
| configuration: ClientConfiguration, | ||
| credentialsProvider: Option[AWSCredentialsProvider], | ||
| credentialsProvider: Option[Any], | ||
| awsS3ClientBuilder: AmazonS3ClientBuilder, | ||
| endpoint: String, | ||
| region: String, | ||
|
|
@@ -127,7 +132,7 @@ object StorageUtils { | |
|
|
||
| private def initializeS3Client( | ||
| configuration: ClientConfiguration, | ||
| credentialsProvider: Option[AWSCredentialsProvider], | ||
| credentialsProvider: Option[Any], | ||
|
||
| awsS3ClientBuilder: AmazonS3ClientBuilder, | ||
| endpoint: String, | ||
| region: String = null | ||
|
|
@@ -143,10 +148,42 @@ object StorageUtils { | |
| builder.withRegion(region) | ||
| else | ||
| builder | ||
| val builderWithCredentials = credentialsProvider match { | ||
| case Some(cp) => builderWithEndpoint.withCredentials(cp) | ||
| case None => builderWithEndpoint | ||
| } | ||
|
|
||
| // Check for Hadoop's assumed role configuration | ||
| val roleArn = System.getProperty("spark.hadoop.fs.s3a.assumed.role.arn") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please move to consts file we have |
||
|
|
||
| // Apply credentials based on configuration | ||
| val builderWithCredentials = | ||
| if (roleArn != null && !roleArn.isEmpty) { | ||
| // If we have a role ARN configured, assume that role | ||
| logger.info(s"Assuming role: $roleArn for S3 client") | ||
| try { | ||
| val sessionName = "lakefs-gc-" + UUID.randomUUID().toString | ||
| val stsProvider = | ||
| new STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, sessionName) | ||
Isan-Rivkin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| .withLongLivedCredentialsProvider(new DefaultAWSCredentialsProviderChain()) | ||
| .build() | ||
|
|
||
| builderWithEndpoint.withCredentials(stsProvider) | ||
| } catch { | ||
| case e: Exception => | ||
| logger.warn(s"Failed to assume role $roleArn: ${e.getMessage}", e) | ||
| logger.info("Falling back to DefaultAWSCredentialsProviderChain") | ||
| builderWithEndpoint.withCredentials(new DefaultAWSCredentialsProviderChain()) | ||
| } | ||
| } else if ( | ||
| credentialsProvider.isDefined && credentialsProvider.get | ||
| .isInstanceOf[AWSCredentialsProvider] | ||
| ) { | ||
| // Use standard AWSCredentialsProvider if available | ||
| builderWithEndpoint.withCredentials( | ||
| credentialsProvider.get.asInstanceOf[AWSCredentialsProvider] | ||
| ) | ||
| } else { | ||
| // Use default credential chain | ||
| logger.info("Using DefaultAWSCredentialsProviderChain for S3 client") | ||
| builderWithEndpoint.withCredentials(new DefaultAWSCredentialsProviderChain()) | ||
| } | ||
| builderWithCredentials.build | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this bump (hadoop + aws-java-sdk-bundle) backward compatible? was it tested?
i.e in hadoop 3.2.1 and aws-sdk 1.12.194?
backward compatibility is a hard requirement right, we mentioned breaking compatability as a long-term work with progress.