Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/src/main/java/com/netflix/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ public interface Table {
*/
UpdateProperties updateProperties();

/**
* Create a new {@link UpdateLocation} to update table location and commit the changes.
*
* @return a new {@link UpdateLocation}
*/
UpdateLocation updateLocation();

/**
* Create a new {@link AppendFiles append API} to add files to this table and commit.
*
Expand Down
7 changes: 7 additions & 0 deletions api/src/main/java/com/netflix/iceberg/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ public interface Transaction {
*/
UpdateProperties updateProperties();

/**
* Create a new {@link UpdateLocation} to update table location.
*
* @return a new {@link UpdateLocation}
*/
UpdateLocation updateLocation();

/**
* Create a new {@link AppendFiles append API} to add files to this table.
*
Expand Down
33 changes: 33 additions & 0 deletions api/src/main/java/com/netflix/iceberg/UpdateLocation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.netflix.iceberg;

/**
* API for setting a table's base location.
*/
public interface UpdateLocation extends PendingUpdate<String> {
/**
* Set the table's location.
*
* @param location a String location
* @return this for method chaining
*/
UpdateLocation setLocation(String location);
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,13 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {

private static final String METADATA_FOLDER_NAME = "metadata";
private static final String DATA_FOLDER_NAME = "data";
private static final String HIVE_LOCATION_FOLDER_NAME = "empty";

private final Configuration conf;
private final FileIO fileIo;

private TableMetadata currentMetadata = null;
private String currentMetadataLocation = null;
private boolean shouldRefresh = true;
private String baseLocation = null;
private int version = -1;

protected BaseMetastoreTableOperations(Configuration conf) {
Expand Down Expand Up @@ -85,16 +83,8 @@ protected void requestRefresh() {
this.shouldRefresh = true;
}

public String hiveTableLocation() {
return String.format("%s/%s", baseLocation, HIVE_LOCATION_FOLDER_NAME);
}

protected String writeNewMetadata(TableMetadata metadata, int version) {
if (baseLocation == null) {
baseLocation = metadata.location();
}

String newTableMetadataFilePath = newTableMetadataFilePath(baseLocation, version);
String newTableMetadataFilePath = newTableMetadataFilePath(metadata, version);
OutputFile newMetadataLocation = fileIo.newOutputFile(newTableMetadataFilePath);

// write the new metadata
Expand All @@ -115,33 +105,39 @@ protected void refreshFromMetadataLocation(String newLocation, int numRetries) {
Tasks.foreach(newLocation)
.retry(numRetries).exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */ )
.suppressFailureWhenFinished()
.run(location -> {
this.currentMetadata = read(this, fromLocation(location, conf));
this.currentMetadataLocation = location;
this.baseLocation = currentMetadata.location();
this.version = parseVersion(location);
.run(metadataLocation -> {
this.currentMetadata = read(this, fromLocation(metadataLocation, conf));
this.currentMetadataLocation = metadataLocation;
this.version = parseVersion(metadataLocation);
});
}
this.shouldRefresh = false;
}

private String metadataFileLocation(TableMetadata metadata, String filename) {
String metadataLocation = metadata.properties()
.get(TableProperties.WRITE_METADATA_LOCATION);

if (metadataLocation != null) {
return String.format("%s/%s", metadataLocation, filename);
} else {
return String.format("%s/%s/%s", metadata.location(), METADATA_FOLDER_NAME, filename);
}
}

@Override
public String metadataFileLocation(String fileName) {
return String.format("%s/%s/%s", baseLocation, METADATA_FOLDER_NAME, fileName);
public String metadataFileLocation(String filename) {
return metadataFileLocation(current(), filename);
}

@Override
public FileIO io() {
return fileIo;
}

private String newTableMetadataFilePath(String baseLocation, int newVersion) {
return String.format("%s/%s/%05d-%s%s",
baseLocation,
METADATA_FOLDER_NAME,
newVersion,
UUID.randomUUID(),
getFileExtension(this.conf));
private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
return metadataFileLocation(meta,
String.format("%05d-%s%s", newVersion, UUID.randomUUID(), getFileExtension(this.conf)));
}

private static int parseVersion(String metadataLocation) {
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/com/netflix/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ public UpdateProperties updateProperties() {
return new PropertiesUpdate(ops);
}

@Override
public UpdateLocation updateLocation() {
return new SetLocation(ops);
}

@Override
public AppendFiles newAppend() {
return new MergeAppend(ops);
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/com/netflix/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ public UpdateProperties updateProperties() {
return props;
}

@Override
public UpdateLocation updateLocation() {
checkLastOperationCommitted("UpdateLocation");
UpdateLocation setLocation = new SetLocation(transactionOps);
updates.add(setLocation);
return setLocation;
}

@Override
public AppendFiles newAppend() {
checkLastOperationCommitted("AppendFiles");
Expand Down Expand Up @@ -327,6 +335,11 @@ public UpdateProperties updateProperties() {
return BaseTransaction.this.updateProperties();
}

@Override
public UpdateLocation updateLocation() {
return BaseTransaction.this.updateLocation();
}

@Override
public AppendFiles newAppend() {
return BaseTransaction.this.newAppend();
Expand Down
67 changes: 67 additions & 0 deletions core/src/main/java/com/netflix/iceberg/SetLocation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.netflix.iceberg;

import com.netflix.iceberg.exceptions.CommitFailedException;
import com.netflix.iceberg.util.Tasks;

import static com.netflix.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
import static com.netflix.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
import static com.netflix.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
import static com.netflix.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
import static com.netflix.iceberg.TableProperties.COMMIT_NUM_RETRIES;
import static com.netflix.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
import static com.netflix.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
import static com.netflix.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;

public class SetLocation implements UpdateLocation {
private final TableOperations ops;
private String newLocation;

public SetLocation(TableOperations ops) {
this.ops = ops;
this.newLocation = null;
}

@Override
public UpdateLocation setLocation(String location) {
this.newLocation = location;
return this;
}

@Override
public String apply() {
return newLocation;
}

@Override
public void commit() {
TableMetadata base = ops.refresh();
Tasks.foreach(ops)
.retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
.exponentialBackoff(
base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
2.0 /* exponential */ )
.onlyRetryOn(CommitFailedException.class)
.run(ops -> ops.commit(base, base.updateLocation(newLocation)));
}
}
6 changes: 6 additions & 0 deletions core/src/main/java/com/netflix/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,12 @@ public TableMetadata buildReplacement(Schema schema, PartitionSpec partitionSpec
-1, snapshots, ImmutableList.of());
}

public TableMetadata updateLocation(String newLocation) {
return new TableMetadata(ops, null, newLocation,
System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
currentSnapshotId, snapshots, snapshotLog);
}

private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec partitionSpec) {
PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema)
.withSpecId(specId);
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/java/com/netflix/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,16 @@ public class TableProperties {

public static final String OBJECT_STORE_PATH = "write.object-storage.path";

// This only applies to files written after this property is set. Files previously written aren't relocated to
// reflect this parameter.
// This only applies to files written after this property is set. Files previously written aren't
// relocated to reflect this parameter.
// If not set, defaults to a "data" folder underneath the root path of the table.
public static final String WRITE_NEW_DATA_LOCATION = "write.folder-storage.path";

// This only applies to files written after this property is set. Files previously written aren't
// relocated to reflect this parameter.
// If not set, defaults to a "meatdata" folder underneath the root path of the table.
public static final String WRITE_METADATA_LOCATION = "write.metadata.path";

public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled";
public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

package com.netflix.iceberg.hadoop;

import com.google.common.base.Preconditions;
import com.netflix.iceberg.io.FileIO;
import com.netflix.iceberg.TableMetadata;
import com.netflix.iceberg.TableMetadataParser;
import com.netflix.iceberg.TableOperations;
import com.netflix.iceberg.TableProperties;
import com.netflix.iceberg.exceptions.CommitFailedException;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -107,6 +109,12 @@ public void commit(TableMetadata base, TableMetadata metadata) {
return;
}

Preconditions.checkArgument(base == null || base.location().equals(metadata.location()),
"Hadoop path-based tables cannot be relocated");
Preconditions.checkArgument(
!metadata.properties().containsKey(TableProperties.WRITE_METADATA_LOCATION),
"Hadoop path-based tables cannot relocate metadata");

Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + getFileExtension(conf));
TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,15 @@ public void commit(TableMetadata base, TableMetadata metadata) {
(int) currentTimeMillis / 1000,
(int) currentTimeMillis / 1000,
Integer.MAX_VALUE,
storageDescriptor(metadata.schema()),
storageDescriptor(metadata),
Collections.emptyList(),
new HashMap<>(),
null,
null,
ICEBERG_TABLE_TYPE_VALUE);
}

tbl.setSd(storageDescriptor(metadata.schema())); // set to pickup any schema changes
tbl.setSd(storageDescriptor(metadata)); // set to pickup any schema changes
final String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP);
if (!Objects.equals(currentMetadataLocation(), metadataLocation)) {
throw new CommitFailedException(format("metadataLocation = %s is not same as table metadataLocation %s for %s.%s",
Expand Down Expand Up @@ -189,11 +189,11 @@ private void setParameters(String newMetadataLocation, Table tbl) {
tbl.setParameters(parameters);
}

private StorageDescriptor storageDescriptor(Schema schema) {
private StorageDescriptor storageDescriptor(TableMetadata metadata) {

final StorageDescriptor storageDescriptor = new StorageDescriptor();
storageDescriptor.setCols(columns(schema));
storageDescriptor.setLocation(hiveTableLocation());
storageDescriptor.setCols(columns(metadata.schema()));
storageDescriptor.setLocation(metadata.location());
storageDescriptor.setOutputFormat("org.apache.hadoop.mapred.FileInputFormat");
storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileOutputFormat");
SerDeInfo serDeInfo = new SerDeInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ String getTableBasePath(String tableName) {
}

String getTableLocation(String tableName) {
return new Path("file", null, Paths.get(getTableBasePath(tableName), "empty").toString()).toString();
return new Path("file", null, Paths.get(getTableBasePath(tableName)).toString()).toString();
}

String metadataLocation(String tableName) {
Expand Down