diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestRule.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestRule.java new file mode 100644 index 000000000000..1c327cf8d66d --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/AliyunOSSTestRule.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss; + +import com.aliyun.oss.OSS; +import java.util.UUID; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +/** + * API for test Aliyun Object Storage Service (OSS) which is either local mock http server or remote aliyun oss server + *

+ * This API includes start,stop OSS service, create OSS client, setup bucket and teardown bucket. + */ +public interface AliyunOSSTestRule extends TestRule { + UUID RANDOM_UUID = java.util.UUID.randomUUID(); + + /** + * Returns a specific bucket name for testing purpose. + */ + default String testBucketName() { + return String.format("oss-testing-bucket-%s", RANDOM_UUID); + } + + @Override + default Statement apply(Statement base, Description description) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + start(); + try { + base.evaluate(); + } finally { + stop(); + } + } + }; + } + + /** + * Start the Aliyun Object storage services application that the OSS client could connect to. + */ + void start(); + + /** + * Stop the Aliyun object storage services. + */ + void stop(); + + /** + * Returns an newly created {@link OSS} client. + */ + OSS createOSSClient(); + + /** + * Preparation work of bucket for the test case, for example we need to check the existence of specific bucket. + */ + void setUpBucket(String bucket); + + /** + * Clean all the objects that created from this test suite in the bucket. + */ + void tearDownBucket(String bucket); +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockApp.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockApp.java new file mode 100644 index 000000000000..81e2e9115630 --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockApp.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss.mock; + +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.Banner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.convert.converter.Converter; +import org.springframework.http.MediaType; +import org.springframework.http.converter.xml.MappingJackson2XmlHttpMessageConverter; +import org.springframework.util.StringUtils; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; + +@SuppressWarnings("checkstyle:AnnotationUseStyle") +@Configuration +@EnableAutoConfiguration(exclude = {SecurityAutoConfiguration.class}, excludeName = { + "org.springframework.boot.actuate.autoconfigure.security.servlet.ManagementWebSecurityAutoConfiguration" +}) +@ComponentScan +public class AliyunOSSMockApp { + + static final String PROP_ROOT_DIR = "root-dir"; + + static final String PROP_HTTP_PORT = "server.port"; + static final int PORT_HTTP_PORT_DEFAULT = 9393; + + static final String PROP_SILENT = "silent"; + + @Autowired + private ConfigurableApplicationContext context; + + public static AliyunOSSMockApp start(Map properties, String... args) { + Map defaults = Maps.newHashMap(); + defaults.put(PROP_HTTP_PORT, PORT_HTTP_PORT_DEFAULT); + + Banner.Mode bannerMode = Banner.Mode.CONSOLE; + + if (Boolean.parseBoolean(String.valueOf(properties.remove(PROP_SILENT)))) { + defaults.put("logging.level.root", "WARN"); + bannerMode = Banner.Mode.OFF; + } + + final ConfigurableApplicationContext ctx = + new SpringApplicationBuilder(AliyunOSSMockApp.class) + .properties(defaults) + .properties(properties) + .bannerMode(bannerMode) + .run(args); + + return ctx.getBean(AliyunOSSMockApp.class); + } + + public void stop() { + SpringApplication.exit(context, () -> 0); + } + + @Configuration + static class Config implements WebMvcConfigurer { + + @Bean + Converter rangeConverter() { + return new RangeConverter(); + } + + /** + * Creates an HttpMessageConverter for XML. + * + * @return The configured {@link MappingJackson2XmlHttpMessageConverter}. + */ + @Bean + public MappingJackson2XmlHttpMessageConverter getMessageConverter() { + List mediaTypes = Lists.newArrayList(); + mediaTypes.add(MediaType.APPLICATION_XML); + mediaTypes.add(MediaType.APPLICATION_FORM_URLENCODED); + mediaTypes.add(MediaType.APPLICATION_OCTET_STREAM); + + final MappingJackson2XmlHttpMessageConverter xmlConverter = new MappingJackson2XmlHttpMessageConverter(); + xmlConverter.setSupportedMediaTypes(mediaTypes); + + return xmlConverter; + } + } + + private static class RangeConverter implements Converter { + + private static final Pattern REQUESTED_RANGE_PATTERN = Pattern.compile("^bytes=((\\d*)-(\\d*))((,\\d*-\\d*)*)"); + + @Override + public Range convert(String rangeString) { + Preconditions.checkNotNull(rangeString, "Range value should not be null."); + + final Range range; + + // parsing a range specification of format: "bytes=start-end" - multiple ranges not supported + final Matcher matcher = REQUESTED_RANGE_PATTERN.matcher(rangeString.trim()); + if (matcher.matches()) { + final String rangeStart = matcher.group(2); + final String rangeEnd = matcher.group(3); + + long start = StringUtils.isEmpty(rangeStart) ? -1L : Long.parseLong(rangeStart); + long end = StringUtils.isEmpty(rangeEnd) ? Long.MAX_VALUE : Long.parseLong(rangeEnd); + range = new Range(start, end); + + if (matcher.groupCount() == 5 && !"".equals(matcher.group(4))) { + throw new IllegalArgumentException( + "Unsupported range specification. Only single range specifications allowed"); + } + if (range.start() != -1 && range.start() < 0) { + throw new IllegalArgumentException( + "Unsupported range specification. A start byte must be supplied"); + } + + if (range.end() != -1 && range.end() < range.start()) { + throw new IllegalArgumentException( + "Range header is malformed. End byte is smaller than start byte."); + } + } else { + // Per Aliyun OSS behavior, return whole object content for illegal header + range = new Range(0, Long.MAX_VALUE); + } + + return range; + } + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalController.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalController.java new file mode 100644 index 000000000000..a9615f05ec78 --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalController.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss.mock; + +import com.aliyun.oss.OSSErrorCode; +import com.aliyun.oss.model.Bucket; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonRootName; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.OutputStream; +import javax.servlet.ServletInputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.input.BoundedInputStream; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.ControllerAdvice; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestHeader; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.mvc.method.annotation.ResponseEntityExceptionHandler; + +import static org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR; +import static org.springframework.http.HttpStatus.OK; +import static org.springframework.http.HttpStatus.PARTIAL_CONTENT; +import static org.springframework.http.HttpStatus.REQUESTED_RANGE_NOT_SATISFIABLE; + +@RestController +public class AliyunOSSMockLocalController { + private static final Logger LOG = LoggerFactory.getLogger(AliyunOSSMockLocalController.class); + + @Autowired + private AliyunOSSMockLocalStore localStore; + + private static String filenameFrom(@PathVariable String bucketName, HttpServletRequest request) { + String requestUri = request.getRequestURI(); + return requestUri.substring(requestUri.indexOf(bucketName) + bucketName.length() + 1); + } + + @RequestMapping(value = "/{bucketName}", method = RequestMethod.PUT, produces = "application/xml") + public void putBucket(@PathVariable String bucketName) throws IOException { + if (localStore.getBucket(bucketName) != null) { + throw new OssException(409, OSSErrorCode.BUCKET_ALREADY_EXISTS, bucketName + " already exists."); + } + + localStore.createBucket(bucketName); + } + + @RequestMapping(value = "/{bucketName}", method = RequestMethod.DELETE, produces = "application/xml") + public void deleteBucket(@PathVariable String bucketName) throws IOException { + verifyBucketExistence(bucketName); + + localStore.deleteBucket(bucketName); + } + + @RequestMapping(value = "/{bucketName:.+}/**", method = RequestMethod.PUT) + public ResponseEntity putObject(@PathVariable String bucketName, HttpServletRequest request) { + verifyBucketExistence(bucketName); + String filename = filenameFrom(bucketName, request); + try (ServletInputStream inputStream = request.getInputStream()) { + ObjectMetadata metadata = localStore.putObject( + bucketName, + filename, + inputStream, + request.getContentType(), + request.getHeader(HttpHeaders.CONTENT_ENCODING), + ImmutableMap.of()); + + HttpHeaders responseHeaders = new HttpHeaders(); + responseHeaders.setETag("\"" + metadata.getContentMD5() + "\""); + responseHeaders.setLastModified(metadata.getLastModificationDate()); + + return new ResponseEntity<>(responseHeaders, OK); + } catch (Exception e) { + LOG.error("Failed to put object - bucket: {} - object: {}", bucketName, filename, e); + return new ResponseEntity<>(e.getMessage(), INTERNAL_SERVER_ERROR); + } + } + + @RequestMapping(value = "/{bucketName:.+}/**", method = RequestMethod.DELETE) + public void deleteObject(@PathVariable String bucketName, HttpServletRequest request) { + verifyBucketExistence(bucketName); + + localStore.deleteObject(bucketName, filenameFrom(bucketName, request)); + } + + @RequestMapping(value = "/{bucketName:.+}/**", method = RequestMethod.HEAD) + public ResponseEntity getObjectMeta(@PathVariable String bucketName, HttpServletRequest request) { + verifyBucketExistence(bucketName); + ObjectMetadata metadata = verifyObjectExistence(bucketName, filenameFrom(bucketName, request)); + + HttpHeaders headers = new HttpHeaders(); + headers.setETag("\"" + metadata.getContentMD5() + "\""); + headers.setLastModified(metadata.getLastModificationDate()); + headers.setContentLength(metadata.getContentLength()); + + return new ResponseEntity<>(headers, OK); + } + + @SuppressWarnings("checkstyle:AnnotationUseStyle") + @RequestMapping( + value = "/{bucketName:.+}/**", + method = RequestMethod.GET, + produces = "application/xml") + public void getObject( + @PathVariable String bucketName, + @RequestHeader(value = "Range", required = false) Range range, + HttpServletRequest request, + HttpServletResponse response) throws IOException { + verifyBucketExistence(bucketName); + + String filename = filenameFrom(bucketName, request); + ObjectMetadata metadata = verifyObjectExistence(bucketName, filename); + + if (range != null) { + long fileSize = metadata.getContentLength(); + long bytesToRead = Math.min(fileSize - 1, range.end()) - range.start() + 1; + long skipSize = range.start(); + if (range.start() == -1) { + bytesToRead = Math.min(fileSize - 1, range.end()); + skipSize = fileSize - range.end(); + } + if (range.end() == -1) { + bytesToRead = fileSize - range.start(); + } + if (bytesToRead < 0 || fileSize < range.start()) { + response.setStatus(REQUESTED_RANGE_NOT_SATISFIABLE.value()); + response.flushBuffer(); + return; + } + + response.setStatus(PARTIAL_CONTENT.value()); + response.setHeader(HttpHeaders.ACCEPT_RANGES, "bytes"); + response.setHeader(HttpHeaders.CONTENT_RANGE, String.format("bytes %s-%s/%s", + range.start(), bytesToRead + range.start() + 1, metadata.getContentLength())); + response.setHeader(HttpHeaders.ETAG, "\"" + metadata.getContentMD5() + "\""); + response.setDateHeader(HttpHeaders.LAST_MODIFIED, metadata.getLastModificationDate()); + response.setContentType(metadata.getContentType()); + response.setContentLengthLong(bytesToRead); + + try (OutputStream outputStream = response.getOutputStream()) { + try (FileInputStream fis = new FileInputStream(metadata.getDataFile())) { + fis.skip(skipSize); + IOUtils.copy(new BoundedInputStream(fis, bytesToRead), outputStream); + } + } + } else { + response.setHeader(HttpHeaders.ACCEPT_RANGES, "bytes"); + response.setHeader(HttpHeaders.ETAG, "\"" + metadata.getContentMD5() + "\""); + response.setDateHeader(HttpHeaders.LAST_MODIFIED, metadata.getLastModificationDate()); + response.setContentType(metadata.getContentType()); + response.setContentLengthLong(metadata.getContentLength()); + + try (OutputStream outputStream = response.getOutputStream()) { + try (FileInputStream fis = new FileInputStream(metadata.getDataFile())) { + IOUtils.copy(fis, outputStream); + } + } + } + } + + private void verifyBucketExistence(String bucketName) { + Bucket bucket = localStore.getBucket(bucketName); + if (bucket == null) { + throw new OssException(404, OSSErrorCode.NO_SUCH_BUCKET, "The specified bucket does not exist. "); + } + } + + private ObjectMetadata verifyObjectExistence(String bucketName, String filename) { + ObjectMetadata objectMetadata = null; + try { + objectMetadata = localStore.getObjectMetadata(bucketName, filename); + } catch (IOException e) { + LOG.error("Failed to get the object metadata, bucket: {}, object: {}.", bucketName, filename, e); + } + + if (objectMetadata == null) { + throw new OssException(404, OSSErrorCode.NO_SUCH_KEY, "The specify oss key does not exists."); + } + + return objectMetadata; + } + + @ControllerAdvice + public static class OSSMockExceptionHandler extends ResponseEntityExceptionHandler { + + @ExceptionHandler + public ResponseEntity handleOSSException(OssException ex) { + LOG.info("Responding with status {} - {}, {}", ex.status, ex.code, ex.message); + + ErrorResponse errorResponse = new ErrorResponse(); + errorResponse.setCode(ex.getCode()); + errorResponse.setMessage(ex.getMessage()); + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_XML); + + return ResponseEntity.status(ex.status) + .headers(headers) + .body(errorResponse); + } + } + + public static class OssException extends RuntimeException { + + private final int status; + private final String code; + private final String message; + + public OssException(final int status, final String code, final String message) { + super(message); + this.status = status; + this.code = code; + this.message = message; + } + + public String getCode() { + return code; + } + + @Override + public String getMessage() { + return message; + } + } + + @JsonRootName("Error") + public static class ErrorResponse { + @JsonProperty("Code") + private String code; + + @JsonProperty("Message") + private String message; + + public void setCode(String code) { + this.code = code; + } + + public void setMessage(String message) { + this.message = message; + } + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalStore.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalStore.java new file mode 100644 index 000000000000..8427be915e89 --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalStore.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss.mock; + +import com.aliyun.oss.OSSErrorCode; +import com.aliyun.oss.model.Bucket; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.BasicFileAttributes; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.directory.api.util.Hex; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; + +@Component +public class AliyunOSSMockLocalStore { + private static final Logger LOG = LoggerFactory.getLogger(AliyunOSSMockLocalStore.class); + + private static final String DATA_FILE = ".DATA"; + private static final String META_FILE = ".META"; + + private final File root; + + private final ObjectMapper objectMapper = new ObjectMapper(); + + public AliyunOSSMockLocalStore(@Value("${" + AliyunOSSMockApp.PROP_ROOT_DIR + ":}") String rootDir) { + Preconditions.checkNotNull(rootDir, "Root directory cannot be null"); + this.root = new File(rootDir); + + root.deleteOnExit(); + root.mkdirs(); + + LOG.info("Root directory of local OSS store is {}", root); + } + + static String md5sum(String filepath) throws IOException { + try (InputStream is = new FileInputStream(filepath)) { + return md5sum(is); + } + } + + static String md5sum(InputStream is) throws IOException { + MessageDigest md; + try { + md = MessageDigest.getInstance("MD5"); + md.reset(); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + byte[] bytes = new byte[1024]; + int numBytes; + + while ((numBytes = is.read(bytes)) != -1) { + md.update(bytes, 0, numBytes); + } + return new String(Hex.encodeHex(md.digest())); + } + + private static void inputStreamToFile(InputStream inputStream, File targetFile) throws IOException { + try (OutputStream outputStream = new FileOutputStream(targetFile)) { + IOUtils.copy(inputStream, outputStream); + } + } + + void createBucket(String bucketName) throws IOException { + File newBucket = new File(root, bucketName); + FileUtils.forceMkdir(newBucket); + } + + Bucket getBucket(String bucketName) { + List buckets = findBucketsByFilter(file -> + Files.isDirectory(file) && file.getFileName().endsWith(bucketName)); + + return buckets.size() > 0 ? buckets.get(0) : null; + } + + void deleteBucket(String bucketName) throws IOException { + Bucket bucket = getBucket(bucketName); + Preconditions.checkNotNull(bucket, "Bucket %s shouldn't be null.", bucketName); + + File dir = new File(root, bucket.getName()); + if (Files.walk(dir.toPath()).anyMatch(p -> p.toFile().isFile())) { + throw new AliyunOSSMockLocalController.OssException(409, OSSErrorCode.BUCKET_NOT_EMPTY, + "The bucket you tried to delete is not empty. "); + } + + FileUtils.deleteDirectory(dir); + } + + ObjectMetadata putObject( + String bucketName, + String fileName, + InputStream dataStream, + String contentType, + String contentEncoding, + Map userMetaData) throws IOException { + File bucketDir = new File(root, bucketName); + assert bucketDir.exists() || bucketDir.mkdirs(); + + File dataFile = new File(bucketDir, fileName + DATA_FILE); + File metaFile = new File(bucketDir, fileName + META_FILE); + if (!dataFile.exists()) { + dataFile.getParentFile().mkdirs(); + dataFile.createNewFile(); + } + + inputStreamToFile(dataStream, dataFile); + + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentLength(dataFile.length()); + metadata.setContentMD5(md5sum(dataFile.getAbsolutePath())); + metadata.setContentType(contentType != null ? contentType : MediaType.APPLICATION_OCTET_STREAM_VALUE); + metadata.setContentEncoding(contentEncoding); + metadata.setDataFile(dataFile.getAbsolutePath()); + metadata.setMetaFile(metaFile.getAbsolutePath()); + + BasicFileAttributes attributes = Files.readAttributes(dataFile.toPath(), BasicFileAttributes.class); + metadata.setLastModificationDate(attributes.lastModifiedTime().toMillis()); + + metadata.setUserMetaData(userMetaData); + + objectMapper.writeValue(metaFile, metadata); + + return metadata; + } + + void deleteObject(String bucketName, String filename) { + File bucketDir = new File(root, bucketName); + assert bucketDir.exists(); + + File dataFile = new File(bucketDir, filename + DATA_FILE); + File metaFile = new File(bucketDir, filename + META_FILE); + assert !dataFile.exists() || dataFile.delete(); + assert !metaFile.exists() || metaFile.delete(); + } + + ObjectMetadata getObjectMetadata(String bucketName, String filename) throws IOException { + File bucketDir = new File(root, bucketName); + assert bucketDir.exists(); + + File dataFile = new File(bucketDir, filename + DATA_FILE); + if (!dataFile.exists()) { + return null; + } + + File metaFile = new File(bucketDir, filename + META_FILE); + return objectMapper.readValue(metaFile, ObjectMetadata.class); + } + + private List findBucketsByFilter(final DirectoryStream.Filter filter) { + List buckets = Lists.newArrayList(); + + try (DirectoryStream stream = Files.newDirectoryStream(root.toPath(), filter)) { + for (final Path path : stream) { + buckets.add(new Bucket(path.getFileName().toString())); + } + } catch (final IOException e) { + LOG.error("Could not iterate over Bucket-Folders", e); + } + + return buckets; + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockRule.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockRule.java new file mode 100644 index 000000000000..80f25c09b760 --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockRule.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss.mock; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.iceberg.aliyun.oss.AliyunOSSTestRule; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Strings; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public class AliyunOSSMockRule implements AliyunOSSTestRule { + + private final Map properties; + + private AliyunOSSMockApp ossMockApp; + + private AliyunOSSMockRule(Map properties) { + this.properties = properties; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public void start() { + ossMockApp = AliyunOSSMockApp.start(properties); + } + + @Override + public void stop() { + ossMockApp.stop(); + } + + @Override + public OSS createOSSClient() { + String endpoint = String.format("http://localhost:%s", properties.getOrDefault( + AliyunOSSMockApp.PROP_HTTP_PORT, + AliyunOSSMockApp.PORT_HTTP_PORT_DEFAULT)); + return new OSSClientBuilder().build(endpoint, "foo", "bar"); + } + + private File rootDir() { + Object rootDir = properties.get(AliyunOSSMockApp.PROP_ROOT_DIR); + Preconditions.checkNotNull(rootDir, "Root directory cannot be null"); + return new File(rootDir.toString()); + } + + @Override + public void setUpBucket(String bucket) { + createOSSClient().createBucket(bucket); + } + + @Override + public void tearDownBucket(String bucket) { + try { + Files.walk(rootDir().toPath()) + .filter(p -> p.toFile().isFile()) + .forEach(p -> { + try { + Files.delete(p); + } catch (IOException e) { + // delete this file quietly. + } + }); + + createOSSClient().deleteBucket(bucket); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static class Builder { + private Map props = Maps.newHashMap(); + + public Builder silent() { + props.put(AliyunOSSMockApp.PROP_SILENT, true); + return this; + } + + public AliyunOSSTestRule build() { + String rootDir = (String) props.get(AliyunOSSMockApp.PROP_ROOT_DIR); + if (Strings.isNullOrEmpty(rootDir)) { + File dir = new File(FileUtils.getTempDirectory(), "oss-mock-file-store-" + System.currentTimeMillis()); + rootDir = dir.getAbsolutePath(); + props.put(AliyunOSSMockApp.PROP_ROOT_DIR, rootDir); + } + File root = new File(rootDir); + root.deleteOnExit(); + root.mkdir(); + return new AliyunOSSMockRule(props); + } + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/ObjectMetadata.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/ObjectMetadata.java new file mode 100644 index 000000000000..95fbd0198824 --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/ObjectMetadata.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss.mock; + +import java.util.Map; + +public class ObjectMetadata { + + private long contentLength; + + // In millis + private long lastModificationDate; + + private String contentMD5; + + private String contentType; + + private String contentEncoding; + + private Map userMetaData; + + private String dataFile; + + private String metaFile; + + // The following getters and setters are required for Jackson ObjectMapper serialization and deserialization. + + public long getContentLength() { + return contentLength; + } + + public void setContentLength(long contentLength) { + this.contentLength = contentLength; + } + + public long getLastModificationDate() { + return lastModificationDate; + } + + public void setLastModificationDate(long lastModificationDate) { + this.lastModificationDate = lastModificationDate; + } + + public String getContentMD5() { + return contentMD5; + } + + public void setContentMD5(String contentMD5) { + this.contentMD5 = contentMD5; + } + + public String getContentType() { + return contentType; + } + + public void setContentType(String contentType) { + this.contentType = contentType; + } + + public String getContentEncoding() { + return contentEncoding; + } + + public void setContentEncoding(String contentEncoding) { + this.contentEncoding = contentEncoding; + } + + public Map getUserMetaData() { + return userMetaData; + } + + public void setUserMetaData(Map userMetaData) { + this.userMetaData = userMetaData; + } + + public String getDataFile() { + return dataFile; + } + + public void setDataFile(String dataFile) { + this.dataFile = dataFile; + } + + public String getMetaFile() { + return metaFile; + } + + public void setMetaFile(String metaFile) { + this.metaFile = metaFile; + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/Range.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/Range.java new file mode 100644 index 000000000000..dcf1291b95f7 --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/Range.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss.mock; + +public class Range { + + private final long start; + private final long end; + + public Range(long start, long end) { + this.start = start; + this.end = end; + } + + public long start() { + return start; + } + + public long end() { + return end; + } + + @Override + public String toString() { + return String.format("%d-%d", start, end); + } +} diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/TestLocalAliyunOSS.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/TestLocalAliyunOSS.java new file mode 100644 index 000000000000..faa13b25a953 --- /dev/null +++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/TestLocalAliyunOSS.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aliyun.oss.mock; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSErrorCode; +import com.aliyun.oss.OSSException; +import com.aliyun.oss.model.GetObjectRequest; +import com.aliyun.oss.model.PutObjectResult; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; +import java.util.Random; +import java.util.UUID; +import org.apache.commons.io.IOUtils; +import org.apache.iceberg.aliyun.oss.AliyunOSSTestRule; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +public class TestLocalAliyunOSS { + + @ClassRule + public static final AliyunOSSTestRule OSS_TEST_RULE = AliyunOSSMockRule.builder().silent().build(); + + private final OSS oss = OSS_TEST_RULE.createOSSClient(); + private final String bucketName = OSS_TEST_RULE.testBucketName(); + private final Random random = new Random(1); + + private static void assertThrows(Runnable runnable, String expectedErrorCode) { + try { + runnable.run(); + Assert.fail("No exception was thrown, expected errorCode: " + expectedErrorCode); + } catch (OSSException e) { + Assert.assertEquals(expectedErrorCode, e.getErrorCode()); + } + } + + @Before + public void before() { + OSS_TEST_RULE.setUpBucket(bucketName); + } + + @After + public void after() { + OSS_TEST_RULE.tearDownBucket(bucketName); + } + + @Test + public void testBuckets() { + Assert.assertTrue(doesBucketExist(bucketName)); + assertThrows(() -> oss.createBucket(bucketName), OSSErrorCode.BUCKET_ALREADY_EXISTS); + + oss.deleteBucket(bucketName); + Assert.assertFalse(doesBucketExist(bucketName)); + + oss.createBucket(bucketName); + Assert.assertTrue(doesBucketExist(bucketName)); + } + + @Test + public void testDeleteBucket() { + String bucketNotExist = String.format("bucket-not-existing-%s", UUID.randomUUID()); + assertThrows(() -> oss.deleteBucket(bucketNotExist), OSSErrorCode.NO_SUCH_BUCKET); + + byte[] bytes = new byte[2000]; + random.nextBytes(bytes); + + oss.putObject(bucketName, "object1", wrap(bytes)); + + oss.putObject(bucketName, "object2", wrap(bytes)); + + assertThrows(() -> oss.deleteBucket(bucketName), OSSErrorCode.BUCKET_NOT_EMPTY); + + oss.deleteObject(bucketName, "object1"); + assertThrows(() -> oss.deleteBucket(bucketName), OSSErrorCode.BUCKET_NOT_EMPTY); + + oss.deleteObject(bucketName, "object2"); + oss.deleteBucket(bucketName); + Assert.assertFalse(doesBucketExist(bucketName)); + + oss.createBucket(bucketName); + } + + @Test + public void testPutObject() throws IOException { + byte[] bytes = new byte[4 * 1024]; + random.nextBytes(bytes); + + String bucketNotExist = String.format("bucket-not-existing-%s", UUID.randomUUID()); + assertThrows(() -> oss.putObject(bucketNotExist, "object", wrap(bytes)), OSSErrorCode.NO_SUCH_BUCKET); + + PutObjectResult result = oss.putObject(bucketName, "object", wrap(bytes)); + Assert.assertEquals(AliyunOSSMockLocalStore.md5sum(wrap(bytes)), result.getETag()); + } + + @Test + public void testDoesObjectExist() { + Assert.assertFalse(oss.doesObjectExist(bucketName, "key")); + + byte[] bytes = new byte[4 * 1024]; + random.nextBytes(bytes); + oss.putObject(bucketName, "key", wrap(bytes)); + + Assert.assertTrue(oss.doesObjectExist(bucketName, "key")); + oss.deleteObject(bucketName, "key"); + } + + @Test + public void testGetObject() throws IOException { + String bucketNotExist = String.format("bucket-not-existing-%s", UUID.randomUUID()); + assertThrows(() -> oss.getObject(bucketNotExist, "key"), OSSErrorCode.NO_SUCH_BUCKET); + + assertThrows(() -> oss.getObject(bucketName, "key"), OSSErrorCode.NO_SUCH_KEY); + + byte[] bytes = new byte[2000]; + random.nextBytes(bytes); + + oss.putObject(bucketName, "key", new ByteArrayInputStream(bytes)); + + byte[] actual = new byte[2000]; + IOUtils.readFully(oss.getObject(bucketName, "key").getObjectContent(), actual); + + Assert.assertArrayEquals(bytes, actual); + oss.deleteObject(bucketName, "key"); + } + + @Test + public void testGetObjectWithRange() throws IOException { + + byte[] bytes = new byte[100]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = (byte) i; + } + oss.putObject(bucketName, "key", new ByteArrayInputStream(bytes)); + + int start = 0; + int end = 0; + testRange(bytes, start, end); + + start = 0; + end = 1; + testRange(bytes, start, end); + + start = 1; + end = 9; + testRange(bytes, start, end); + + start = 0; + end = 99; + testRange(bytes, start, end); + + start = -1; + end = 2; + testRange(bytes, start, end); + + start = 98; + end = -1; + testRange(bytes, start, end); + + start = -1; + end = -1; + testRange(bytes, start, end); + + oss.deleteObject(bucketName, "key"); + } + + private void testRange(byte[] bytes, int start, int end) throws IOException { + byte[] testBytes; + byte[] actual; + int len; + if (start == -1 && end == -1) { + len = bytes.length; + actual = new byte[len]; + testBytes = new byte[len]; + System.arraycopy(bytes, 0, testBytes, 0, len); + } else if (start == -1) { + len = end; + actual = new byte[len]; + testBytes = new byte[len]; + System.arraycopy(bytes, bytes.length - end, testBytes, 0, len); + } else if (end == -1) { + len = bytes.length - start; + actual = new byte[len]; + testBytes = new byte[len]; + System.arraycopy(bytes, start, testBytes, 0, len); + } else { + len = end - start + 1; + actual = new byte[len]; + testBytes = new byte[len]; + System.arraycopy(bytes, start, testBytes, 0, len); + } + + GetObjectRequest getObjectRequest; + getObjectRequest = new GetObjectRequest(bucketName, "key"); + getObjectRequest.setRange(start, end); + IOUtils.readFully(oss.getObject(getObjectRequest).getObjectContent(), actual); + Assert.assertArrayEquals(testBytes, actual); + } + + private InputStream wrap(byte[] data) { + return new ByteArrayInputStream(data); + } + + private boolean doesBucketExist(String bucket) { + try { + oss.createBucket(bucket); + oss.deleteBucket(bucket); + return false; + } catch (OSSException e) { + if (Objects.equals(e.getErrorCode(), OSSErrorCode.BUCKET_ALREADY_EXISTS)) { + return true; + } + throw e; + } + } +} diff --git a/build.gradle b/build.gradle index cecf8b7e0bed..6f7041e873ba 100644 --- a/build.gradle +++ b/build.gradle @@ -282,6 +282,37 @@ project(':iceberg-data') { } } +project(':iceberg-aliyun') { + dependencies { + implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + api project(':iceberg-api') + implementation project(':iceberg-common') + + compileOnly 'com.aliyun.oss:aliyun-sdk-oss' + compileOnly 'javax.xml.bind:jaxb-api' + compileOnly 'javax.activation:activation' + compileOnly 'org.glassfish.jaxb:jaxb-runtime' + compileOnly("org.apache.hadoop:hadoop-common") { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'javax.servlet', module: 'servlet-api' + exclude group: 'com.google.code.gson', module: 'gson' + } + + testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml' + testImplementation 'org.springframework:spring-web' + testImplementation('org.springframework.boot:spring-boot-starter-jetty') { + exclude module: 'logback-classic' + exclude group: 'org.eclipse.jetty.websocket', module: 'javax-websocket-server-impl' + exclude group: 'org.eclipse.jetty.websocket', module: 'websocket-server' + } + testImplementation('org.springframework.boot:spring-boot-starter-web') { + exclude module: 'logback-classic' + exclude module: 'spring-boot-starter-logging' + } + } +} + project(':iceberg-aws') { dependencies { implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') diff --git a/settings.gradle b/settings.gradle index ba00916e8239..7f052475d008 100644 --- a/settings.gradle +++ b/settings.gradle @@ -22,6 +22,7 @@ include 'api' include 'common' include 'core' include 'data' +include 'aliyun' include 'aws' include 'flink' include 'flink-runtime' @@ -43,6 +44,7 @@ project(':api').name = 'iceberg-api' project(':common').name = 'iceberg-common' project(':core').name = 'iceberg-core' project(':data').name = 'iceberg-data' +project(':aliyun').name = 'iceberg-aliyun' project(':aws').name = 'iceberg-aws' project(':flink').name = 'iceberg-flink' project(':flink-runtime').name = 'iceberg-flink-runtime' @@ -71,3 +73,4 @@ if (JavaVersion.current() == JavaVersion.VERSION_1_8) { project(':hive3').name = 'iceberg-hive3' project(':hive3-orc-bundle').name = 'iceberg-hive3-orc-bundle' } + diff --git a/versions.props b/versions.props index 6bf81cd1d6b8..fae3e7773604 100644 --- a/versions.props +++ b/versions.props @@ -17,6 +17,10 @@ com.github.ben-manes.caffeine:caffeine = 2.8.4 org.apache.arrow:arrow-vector = 2.0.0 org.apache.arrow:arrow-memory-netty = 2.0.0 com.github.stephenc.findbugs:findbugs-annotations = 1.3.9-1 +com.aliyun.oss:aliyun-sdk-oss = 3.10.2 +javax.xml.bind:jaxb-api = 2.3.1 +javax.activation:activation = 1.1.1 +org.glassfish.jaxb:jaxb-runtime = 2.3.3 software.amazon.awssdk:* = 2.15.7 org.scala-lang:scala-library = 2.12.10 org.projectnessie:* = 0.9.2 @@ -32,3 +36,6 @@ org.apache.tez:tez-mapreduce = 0.8.4 com.adobe.testing:s3mock-junit4 = 2.1.28 org.assertj:assertj-core = 3.19.0 org.xerial:sqlite-jdbc = 3.34.0 +com.fasterxml.jackson.dataformat:jackson-dataformat-xml = 2.9.9 +org.springframework:* = 5.3.9 +org.springframework.boot:* = 2.5.4