Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.fs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
* A non-serializable file system for testing only. See {@link TestHoodieSerializableFileStatus}
* Can't make this an inner class as the outer class would also be non-serializable and invalidate
* the purpose of testing
*/
public class NonSerializableFileSystem extends FileSystem {
@Override
public URI getUri() {
try {
return new URI("");
} catch (URISyntaxException e) {
return null;
}
}

@Override
public FSDataInputStream open(Path path, int i) throws IOException {
return null;
}

@Override
public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b, int i,
short i1, long l, Progressable progressable) throws IOException {
return null;
}

@Override
public FSDataOutputStream append(Path path, int i, Progressable progressable)
throws IOException {
return null;
}

@Override
public boolean rename(Path path, Path path1) throws IOException {
return false;
}

@Override
public boolean delete(Path path, boolean b) throws IOException {
return false;
}

@Override
public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException {
FileStatus[] ret = new FileStatus[5];
for (int i = 0; i < 5; i++) {
ret[i] = new FileStatus(100L, false, 1, 10000L,
0L, 0, null, "owner", "group", path) {
Configuration conf = getConf();

@Override
public long getLen() {
return -1;
}
};
}
return ret;
}

@Override
public void setWorkingDirectory(Path path) {}

@Override
public Path getWorkingDirectory() {
return null;
}

@Override
public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
return false;
}

@Override
public FileStatus getFileStatus(Path path) throws IOException {
return null;
}

public Configuration getConf() {
return new Configuration();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.fs;

import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.testutils.HoodieClientTestHarness;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkException;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* Test the if {@link HoodieSerializableFileStatus} is serializable
*/
@TestInstance(Lifecycle.PER_CLASS)
public class TestHoodieSerializableFileStatus extends HoodieClientTestHarness {

HoodieEngineContext engineContext;
List<Path> testPaths;

@BeforeAll
public void setUp() throws IOException {
initSparkContexts();
testPaths = new ArrayList<>(5);
for (int i = 0; i < 5; i++) {
testPaths.add(new Path("s3://table-bucket/"));
}
engineContext = new HoodieSparkEngineContext(jsc);
}

@AfterAll
public void tearDown() {
cleanupSparkContexts();
}

@Test
public void testNonSerializableFileStatus() {
Exception e = Assertions.assertThrows(SparkException.class,
() -> {
List<FileStatus> statuses = engineContext.flatMap(testPaths, path -> {
FileSystem fileSystem = new NonSerializableFileSystem();
return Arrays.stream(fileSystem.listStatus(path));
}, 5);
},
"Serialization is supposed to fail!");
Assertions.assertTrue(e.getMessage().contains("com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException"));
}

@Test
public void testHoodieFileStatusSerialization() {
List<HoodieSerializableFileStatus> statuses = engineContext.flatMap(testPaths, path -> {
FileSystem fileSystem = new NonSerializableFileSystem();
return Arrays.stream(HoodieSerializableFileStatus.fromFileStatuses(fileSystem.listStatus(path)));
}, 5);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.hadoop.fs.permission.FsPermission;

import java.io.IOException;
import java.util.Arrays;
import java.util.stream.Collectors;

/**
* Helper functions around FileStatus and HoodieFileStatus.
Expand Down Expand Up @@ -105,6 +107,13 @@ public static HoodieFileStatus fromFileStatus(FileStatus fileStatus) {
return fStatus;
}

public static HoodieFileStatus[] fromFileStatuses(FileStatus[] statuses) {
return Arrays.stream(statuses)
.map(status -> fromFileStatus(status))
.collect(Collectors.toList())
.toArray(new HoodieFileStatus[statuses.length]);
}

/**
* Used to safely handle FileStatus calls which might fail on some FileSystem implementation.
* (DeprecatedLocalFileSystem)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.fs;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

import java.io.Serializable;
import java.util.Arrays;
import java.util.stream.Collectors;

/**
* A serializable file status implementation
* <p>
* Use `HoodieFileStatus` generated by Avro instead this class if possible
* This class is needed because `hudi-hadoop-mr-bundle` relies on Avro 1.8.2,
* and won't work well with `HoodieFileStatus`
*/
public class HoodieSerializableFileStatus implements Serializable {

private Path path;
private long length;
private Boolean isDir;
private short blockReplication;
private long blockSize;
private long modificationTime;
private long accessTime;

HoodieSerializableFileStatus(FileStatus fileStatus) {
this(fileStatus.getPath(),
fileStatus.getLen(),
fileStatus.isDirectory(),
fileStatus.getReplication(),
fileStatus.getBlockSize(),
fileStatus.getModificationTime(),
fileStatus.getAccessTime());
}

HoodieSerializableFileStatus(Path path, long length, boolean isDir, short blockReplication,
long blockSize, long modificationTime, long accessTime) {
this.path = path;
this.length = length;
this.isDir = isDir;
this.blockReplication = blockReplication;
this.blockSize = blockSize;
this.modificationTime = modificationTime;
this.accessTime = accessTime;
}

public Path getPath() {
return path;
}

public long getLen() {
return length;
}

public Boolean isDirectory() {
return isDir;
}

public short getReplication() {
return blockReplication;
}

public long getBlockSize() {
return blockSize;
}

public long getModificationTime() {
return modificationTime;
}

public long getAccessTime() {
return accessTime;
}

public static HoodieSerializableFileStatus fromFileStatus(FileStatus status) {
return new HoodieSerializableFileStatus(status);
}

public static HoodieSerializableFileStatus[] fromFileStatuses(FileStatus[] statuses) {
return Arrays.stream(statuses)
.map(status -> HoodieSerializableFileStatus.fromFileStatus(status))
.collect(Collectors.toList())
.toArray(new HoodieSerializableFileStatus[statuses.length]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieSerializableFileStatus;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
Expand Down Expand Up @@ -169,9 +170,9 @@ private List<String> getPartitionPathWithPathPrefixUsingFilterExpression(String

// List all directories in parallel
engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing all partitions with prefix " + relativePathPrefix);
List<FileStatus> dirToFileListing = engineContext.flatMap(pathsToList, path -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CTTY looks like in the latest master, we no longer return FileStatus here (the Path instances are used instead). Is this PR still needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I don't think this is needed anymore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I'll close this PR.

List<HoodieSerializableFileStatus> dirToFileListing = engineContext.flatMap(pathsToList, path -> {
FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
return Arrays.stream(fileSystem.listStatus(path));
return Arrays.stream(HoodieSerializableFileStatus.fromFileStatuses(fileSystem.listStatus(path)));
}, listingParallelism);
pathsToList.clear();

Expand All @@ -183,15 +184,16 @@ private List<String> getPartitionPathWithPathPrefixUsingFilterExpression(String
// and second entry holds optionally a directory path to be processed further.
engineContext.setJobStatus(this.getClass().getSimpleName(), "Processing listed partitions");
List<Pair<Option<String>, Option<Path>>> result = engineContext.map(dirToFileListing, fileStatus -> {
FileSystem fileSystem = fileStatus.getPath().getFileSystem(hadoopConf.get());
Path path = fileStatus.getPath();
FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
if (fileStatus.isDirectory()) {
if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, fileStatus.getPath())) {
return Pair.of(Option.of(FSUtils.getRelativePartitionPath(dataBasePath.get(), fileStatus.getPath())), Option.empty());
} else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
return Pair.of(Option.empty(), Option.of(fileStatus.getPath()));
if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, path)) {
return Pair.of(Option.of(FSUtils.getRelativePartitionPath(dataBasePath.get(), path)), Option.empty());
} else if (!path.getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
return Pair.of(Option.empty(), Option.of(path));
}
} else if (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) {
String partitionName = FSUtils.getRelativePartitionPath(dataBasePath.get(), fileStatus.getPath().getParent());
} else if (path.getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) {
String partitionName = FSUtils.getRelativePartitionPath(dataBasePath.get(), path.getParent());
return Pair.of(Option.of(partitionName), Option.empty());
}
return Pair.of(Option.empty(), Option.empty());
Expand Down