Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public static HadoopInputFile fromStatus(FileStatus stat, Configuration conf) {
}
}

static HadoopInputFile fromFsPath(FileSystem fs, Path path, Configuration conf) {
return new HadoopInputFile(fs, path, conf);
}

private HadoopInputFile(FileSystem fs, Path path, Configuration conf) {
this.fs = fs;
this.path = path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,25 @@
*/
public class HadoopOutputFile implements OutputFile {
public static OutputFile fromPath(Path path, Configuration conf) {
return new HadoopOutputFile(path, conf);
return new HadoopOutputFile(Util.getFS(path, conf), path, conf);
}

static OutputFile fromFsPath(FileSystem fs, Path path, Configuration conf) {
return new HadoopOutputFile(fs, path, conf);
}

private final Path path;
private final Configuration conf;
private final FileSystem fs;

private HadoopOutputFile(Path path, Configuration conf) {
private HadoopOutputFile(FileSystem fs, Path path, Configuration conf) {
this.path = path;
this.conf = conf;
this.fs = fs;
}

@Override
public PositionOutputStream create() {
FileSystem fs = Util.getFS(path, conf);
try {
return HadoopStreams.wrap(fs.create(path, false /* createOrOverwrite */));
} catch (FileAlreadyExistsException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ class HadoopTableOperations implements TableOperations {

private final Configuration conf;
private final Path location;
private final FileSystem metadataFs;
private TableMetadata currentMetadata = null;
private Integer version = null;
private boolean shouldRefresh = true;

HadoopTableOperations(Path location, Configuration conf) {
this.conf = conf;
this.location = location;
this.metadataFs = Util.getFS(location, conf);
}

public TableMetadata current() {
Expand All @@ -67,18 +69,17 @@ public TableMetadata current() {
public TableMetadata refresh() {
int ver = version != null ? version : readVersionHint();
Path metadataFile = metadataFile(ver);
FileSystem fs = Util.getFS(metadataFile, conf);
try {
// don't check if the file exists if version is non-null because it was already checked
if (version == null && !fs.exists(metadataFile)) {
if (version == null && !metadataFs.exists(metadataFile)) {
if (ver == 0) {
// no v0 metadata means the table doesn't exist yet
return null;
}
throw new ValidationException("Metadata file is missing: %s", metadataFile);
}

while (fs.exists(metadataFile(ver + 1))) {
while (metadataFs.exists(metadataFile(ver + 1))) {
ver += 1;
metadataFile = metadataFile(ver);
}
Expand All @@ -88,7 +89,7 @@ public TableMetadata refresh() {
}
this.version = ver;
this.currentMetadata = TableMetadataParser.read(this,
HadoopInputFile.fromPath(metadataFile, conf));
HadoopInputFile.fromFsPath(metadataFs, metadataFile, conf));
this.shouldRefresh = false;
return currentMetadata;
}
Expand All @@ -105,14 +106,13 @@ public void commit(TableMetadata base, TableMetadata metadata) {
}

Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + getFileExtension(conf));
TableMetadataParser.write(metadata, HadoopOutputFile.fromPath(tempMetadataFile, conf));
TableMetadataParser.write(metadata, HadoopOutputFile.fromFsPath(metadataFs, tempMetadataFile, conf));

int nextVersion = (version != null ? version : 0) + 1;
Path finalMetadataFile = metadataFile(nextVersion);
FileSystem fs = Util.getFS(tempMetadataFile, conf);

try {
if (fs.exists(finalMetadataFile)) {
if (metadataFs.exists(finalMetadataFile)) {
throw new CommitFailedException(
"Version %d already exists: %s", nextVersion, finalMetadataFile);
}
Expand All @@ -123,7 +123,7 @@ public void commit(TableMetadata base, TableMetadata metadata) {

try {
// this rename operation is the atomic commit operation
if (!fs.rename(tempMetadataFile, finalMetadataFile)) {
if (!metadataFs.rename(tempMetadataFile, finalMetadataFile)) {
throw new CommitFailedException(
"Failed to commit changes using rename: %s", finalMetadataFile);
}
Expand All @@ -140,20 +140,19 @@ public void commit(TableMetadata base, TableMetadata metadata) {

@Override
public InputFile newInputFile(String path) {
return HadoopInputFile.fromPath(new Path(path), conf);
return HadoopInputFile.fromFsPath(metadataFs, new Path(path), conf);
}

@Override
public OutputFile newMetadataFile(String filename) {
return HadoopOutputFile.fromPath(metadataPath(filename), conf);
return HadoopOutputFile.fromFsPath(metadataFs, metadataPath(filename), conf);
}

@Override
public void deleteFile(String path) {
Path toDelete = new Path(path);
FileSystem fs = Util.getFS(toDelete, conf);
try {
fs.delete(toDelete, false /* not recursive */ );
metadataFs.delete(toDelete, false /* not recursive */ );
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to delete file: %s", path);
}
Expand All @@ -178,9 +177,7 @@ private Path versionHintFile() {

private void writeVersionHint(int version) {
Path versionHintFile = versionHintFile();
FileSystem fs = Util.getFS(versionHintFile, conf);

try (FSDataOutputStream out = fs.create(versionHintFile, true /* overwrite */ )) {
try (FSDataOutputStream out = metadataFs.create(versionHintFile, true /* overwrite */ )) {
out.write(String.valueOf(version).getBytes("UTF-8"));

} catch (IOException e) {
Expand Down