Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/ParquetObjectRange.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;

public class ParquetObjectRange {
public CompletableFuture<ByteBuffer> getByteBuffer() {
return byteBuffer;
}

public void setByteBuffer(CompletableFuture<ByteBuffer> byteBuffer) {
this.byteBuffer = byteBuffer;
}

public long getOffset() {
return offset;
}

public void setOffset(long offset) {
this.offset = offset;
}

public int getLength() {
return length;
}

public void setLength(int length) {
this.length = length;
}

private CompletableFuture<ByteBuffer> byteBuffer;
private long offset;
private int length;

public ParquetObjectRange(CompletableFuture<ByteBuffer> byteBuffer, long offset, int length) {
this.byteBuffer = byteBuffer;
this.offset = offset;
this.length = length;
}
}
13 changes: 13 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/SeekableInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.IntFunction;

/**
* {@code SeekableInputStream} is an interface with the methods needed to read data from a file or
Expand All @@ -43,4 +46,14 @@ public abstract class SeekableInputStream extends InputStream {
* @throws IOException If the underlying stream throws IOException
*/
public abstract void seek(long newPos) throws IOException;

public void readVectored(List<ParquetObjectRange> ranges, IntFunction<ByteBuffer> allocate)
throws IOException {
throw new UnsupportedOperationException(
"Default iceberg stream doesn't support read vector io");
}

public boolean readVectoredAvailable(IntFunction<ByteBuffer> allocate) {
return false;
}
}
6 changes: 1 addition & 5 deletions aws-bundle/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ project(":iceberg-aws-bundle") {
implementation "software.amazon.awssdk:dynamodb"
implementation "software.amazon.awssdk:lakeformation"

implementation libs.analyticsaccelerator.s3
implementation(libs.analyticsaccelerator.s3)
}

shadowJar {
Expand All @@ -52,10 +52,6 @@ project(":iceberg-aws-bundle") {
include 'NOTICE'
}

dependencies {
exclude(dependency('org.slf4j:slf4j-api'))
}

// relocate AWS-specific versions
relocate 'org.apache.http', 'org.apache.iceberg.aws.shaded.org.apache.http'
relocate 'io.netty', 'org.apache.iceberg.aws.shaded.io.netty'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariables;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;
import software.amazon.awssdk.core.metrics.CoreMetric;
import software.amazon.awssdk.metrics.MetricCollector;
Expand Down Expand Up @@ -156,8 +158,9 @@ public void testCheckCommitStatusAfterRetries() {
.isEqualTo(2);
}

@Test
public void testNoRetryAwarenessCorruptsTable() {
@ParameterizedTest
@MethodSource("org.apache.iceberg.aws.s3.S3TestUtil#analyticsAcceleratorLibraryProperties")
public void testNoRetryAwarenessCorruptsTable(Map<String, String> aalProperties) {
// This test exists to replicate the issue the prior test validates the fix for
// See https://github.com/apache/iceberg/issues/7151
String namespace = createNamespace();
Expand Down
14 changes: 14 additions & 0 deletions aws/src/integration/java/org/apache/iceberg/aws/s3/MinioUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.LegacyMd5Plugin;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;

Expand Down Expand Up @@ -73,4 +75,16 @@ public static S3Client createS3Client(MinIOContainer container, boolean legacyMd
builder.forcePathStyle(true); // OSX won't resolve subdomains
return builder.build();
}

public static S3AsyncClient createS3AsyncClient(MinIOContainer container) {
URI uri = URI.create(container.getS3URL());
S3AsyncClientBuilder builder = S3AsyncClient.builder();
builder.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(container.getUserName(), container.getPassword())));
builder.applyMutation(mutator -> mutator.endpointOverride(uri));
builder.region(Region.US_EAST_1);
builder.forcePathStyle(true); // OSX won't resolve subdomains
return builder.build();
}
}
46 changes: 46 additions & 0 deletions aws/src/integration/java/org/apache/iceberg/aws/s3/S3TestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,20 @@
*/
package org.apache.iceberg.aws.s3;

import static org.assertj.core.api.Assumptions.assumeThat;

import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.params.provider.Arguments;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3TestUtil {

private static final Logger LOG = LoggerFactory.getLogger(S3TestUtil.class);

private S3TestUtil() {}

public static String getBucketFromUri(String s3Uri) {
Expand All @@ -29,4 +41,38 @@ public static String getBucketFromUri(String s3Uri) {
public static String getKeyFromUri(String s3Uri) {
return new S3URI(s3Uri).key();
}

/**
* Skip a test if the Analytics Accelerator Library for Amazon S3 is enabled.
*
* @param properties properties to probe
*/
public static void skipIfAnalyticsAcceleratorEnabled(
S3FileIOProperties properties, String message) {
boolean isAcceleratorEnabled = properties.isS3AnalyticsAcceleratorEnabled();
if (isAcceleratorEnabled) {
LOG.warn(message);
}
assumeThat(!isAcceleratorEnabled).describedAs(message).isTrue();
}

public static Stream<Arguments> analyticsAcceleratorLibraryProperties() {
return listAnalyticsAcceleratorLibraryProperties().stream().map(Arguments::of);
}

public static List<Map<String, String>> listAnalyticsAcceleratorLibraryProperties() {
return List.of(
ImmutableMap.of(
S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, Boolean.toString(true)),
ImmutableMap.of(
S3FileIOProperties.S3_ANALYTICS_ACCELERATOR_ENABLED, Boolean.toString(false)));
}

public static Map<String, String> mergeProperties(
Map<String, String> aalProperties, Map<String, String> testProperties) {
return ImmutableMap.<String, String>builder()
.putAll(aalProperties)
.putAll(testProperties)
.build();
}
}
Loading
Loading