Skip to content

Commit 4b639f6

Browse files
authored
[GOBBLIN-2159] Support partition level copy in Iceberg-Distcp (#4058)
1 parent 916d570 commit 4b639f6

16 files changed

+1475
-34
lines changed

gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121

2222
import org.apache.hadoop.conf.Configuration;
2323
import org.apache.iceberg.CatalogUtil;
24+
import org.apache.iceberg.Table;
2425
import org.apache.iceberg.TableOperations;
2526
import org.apache.iceberg.catalog.Catalog;
2627
import org.apache.iceberg.catalog.TableIdentifier;
2728
import org.apache.gobblin.dataset.DatasetConstants;
29+
import org.apache.iceberg.exceptions.NoSuchTableException;
2830

2931
/**
3032
* Base implementation of {@link IcebergCatalog} to access {@link IcebergTable} and the
@@ -41,9 +43,15 @@ protected BaseIcebergCatalog(String catalogName, Class<? extends Catalog> compan
4143
}
4244

4345
@Override
44-
public IcebergTable openTable(String dbName, String tableName) {
46+
public IcebergTable openTable(String dbName, String tableName) throws IcebergTable.TableNotFoundException {
4547
TableIdentifier tableId = TableIdentifier.of(dbName, tableName);
46-
return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), getDatasetDescriptorPlatform(), createTableOperations(tableId), this.getCatalogUri());
48+
try {
49+
return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), getDatasetDescriptorPlatform(),
50+
createTableOperations(tableId), this.getCatalogUri(), loadTableInstance(tableId));
51+
} catch (NoSuchTableException ex) {
52+
// defend against `org.apache.iceberg.catalog.Catalog::loadTable` throwing inside some `@Override` of `loadTableInstance`
53+
throw new IcebergTable.TableNotFoundException(tableId);
54+
}
4755
}
4856

4957
protected Catalog createCompanionCatalog(Map<String, String> properties, Configuration configuration) {
@@ -67,4 +75,6 @@ protected String getDatasetDescriptorPlatform() {
6775
}
6876

6977
protected abstract TableOperations createTableOperations(TableIdentifier tableId);
78+
79+
protected abstract Table loadTableInstance(TableIdentifier tableId);
7080
}

gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@
2929
public interface IcebergCatalog {
3030

3131
/** @return table identified by `dbName` and `tableName` */
32-
IcebergTable openTable(String dbName, String tableName);
32+
IcebergTable openTable(String dbName, String tableName) throws IcebergTable.TableNotFoundException;
3333

3434
/** @return table identified by `tableId` */
35-
default IcebergTable openTable(TableIdentifier tableId) {
35+
default IcebergTable openTable(TableIdentifier tableId) throws IcebergTable.TableNotFoundException {
3636
// CHALLENGE: clearly better to implement in the reverse direction - `openTable(String, String)` in terms of `openTable(TableIdentifier)` -
3737
// but challenging to do at this point, with multiple derived classes already "in the wild" that implement `openTable(String, String)`
3838
return openTable(tableId.namespace().toString(), tableId.name());

gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public String getConfigPrefix() {
8585
}
8686

8787
protected final FileSystem sourceFs;
88-
private final Properties properties;
88+
protected final Properties properties;
8989

9090
/**
9191
* Finds all {@link IcebergDataset}s in the file system using the Iceberg Catalog.
@@ -153,7 +153,7 @@ protected IcebergDataset createIcebergDataset(IcebergCatalog sourceIcebergCatalo
153153
IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(destDbName, destTableName);
154154
// TODO: Rethink strategy to enforce dest iceberg table
155155
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", destDbName, destTableName));
156-
return new IcebergDataset(srcIcebergTable, destIcebergTable, properties, fs, getConfigShouldCopyMetadataPath(properties));
156+
return createSpecificDataset(srcIcebergTable, destIcebergTable, properties, fs, getConfigShouldCopyMetadataPath(properties));
157157
}
158158

159159
protected static IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException {
@@ -165,6 +165,11 @@ protected static IcebergCatalog createIcebergCatalog(Properties properties, Cata
165165
return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration);
166166
}
167167

168+
protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem fs, boolean shouldIncludeMetadataPath)
169+
throws IOException {
170+
return new IcebergDataset(srcIcebergTable, destIcebergTable, properties, fs, shouldIncludeMetadataPath);
171+
}
172+
168173
protected static boolean getConfigShouldCopyMetadataPath(Properties properties) {
169174
return Boolean.valueOf(properties.getProperty(ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH, DEFAULT_ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH));
170175
}

gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java

+6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.hadoop.conf.Configuration;
2323
import org.apache.hadoop.hive.conf.HiveConf;
24+
import org.apache.iceberg.Table;
2425
import org.apache.iceberg.TableOperations;
2526
import org.apache.iceberg.catalog.TableIdentifier;
2627
import org.apache.iceberg.hive.HiveCatalog;
@@ -61,4 +62,9 @@ protected TableOperations createTableOperations(TableIdentifier tableId) {
6162
public boolean tableAlreadyExists(IcebergTable icebergTable) {
6263
return hc.tableExists(icebergTable.getTableId());
6364
}
65+
66+
@Override
67+
protected Table loadTableInstance(TableIdentifier tableId) {
68+
return hc.loadTable(tableId);
69+
}
6470
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.data.management.copy.iceberg;
19+
20+
import java.io.IOException;
21+
import java.time.Duration;
22+
import java.util.List;
23+
import java.util.Optional;
24+
import java.util.Properties;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.ExecutionException;
27+
28+
import org.apache.iceberg.DataFile;
29+
import org.apache.iceberg.catalog.TableIdentifier;
30+
import org.apache.iceberg.util.SerializationUtil;
31+
32+
import com.github.rholder.retry.Attempt;
33+
import com.github.rholder.retry.RetryException;
34+
import com.github.rholder.retry.RetryListener;
35+
import com.github.rholder.retry.Retryer;
36+
import com.google.common.collect.ImmutableMap;
37+
import com.typesafe.config.Config;
38+
import com.typesafe.config.ConfigFactory;
39+
40+
import lombok.extern.slf4j.Slf4j;
41+
42+
import org.apache.gobblin.commit.CommitStep;
43+
import org.apache.gobblin.util.retry.RetryerFactory;
44+
45+
import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS;
46+
import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES;
47+
import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE;
48+
import static org.apache.gobblin.util.retry.RetryerFactory.RetryType;
49+
50+
/**
51+
* Commit step for overwriting partitions in an Iceberg table.
52+
* <p>
53+
* This class implements the {@link CommitStep} interface and provides functionality to overwrite
54+
* partitions in the destination Iceberg table using serialized data files.
55+
* </p>
56+
*/
57+
@Slf4j
58+
public class IcebergOverwritePartitionsStep implements CommitStep {
59+
private final String destTableIdStr;
60+
private final Properties properties;
61+
private final byte[] serializedDataFiles;
62+
private final String partitionColName;
63+
private final String partitionValue;
64+
public static final String OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
65+
".catalog.overwrite.partitions.retries";
66+
private static final Config RETRYER_FALLBACK_CONFIG = ConfigFactory.parseMap(ImmutableMap.of(
67+
RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(3L),
68+
RETRY_TIMES, 3,
69+
RETRY_TYPE, RetryType.FIXED_ATTEMPT.name()));
70+
71+
/**
72+
* Constructs an {@code IcebergReplacePartitionsStep} with the specified parameters.
73+
*
74+
* @param destTableIdStr the identifier of the destination table as a string
75+
* @param serializedDataFiles [from List<DataFiles>] the serialized data files to be used for replacing partitions
76+
* @param properties the properties containing configuration
77+
*/
78+
public IcebergOverwritePartitionsStep(String destTableIdStr, String partitionColName, String partitionValue, byte[] serializedDataFiles, Properties properties) {
79+
this.destTableIdStr = destTableIdStr;
80+
this.partitionColName = partitionColName;
81+
this.partitionValue = partitionValue;
82+
this.serializedDataFiles = serializedDataFiles;
83+
this.properties = properties;
84+
}
85+
86+
@Override
87+
public boolean isCompleted() {
88+
return false;
89+
}
90+
91+
/**
92+
* Executes the partition replacement in the destination Iceberg table.
93+
* Also, have retry mechanism as done in {@link IcebergRegisterStep#execute()}
94+
*
95+
* @throws IOException if an I/O error occurs during execution
96+
*/
97+
@Override
98+
public void execute() throws IOException {
99+
// Unlike IcebergRegisterStep::execute, which validates dest table metadata has not changed between copy entity
100+
// generation and the post-copy commit, do no such validation here, so dest table writes may continue throughout
101+
// our copying. any new data written in the meanwhile to THE SAME partitions we are about to overwrite will be
102+
// clobbered and replaced by the copy entities from our execution.
103+
IcebergTable destTable = createDestinationCatalog().openTable(TableIdentifier.parse(this.destTableIdStr));
104+
List<DataFile> dataFiles = SerializationUtil.deserializeFromBytes(this.serializedDataFiles);
105+
try {
106+
log.info("~{}~ Starting partition overwrite - partition: {}; value: {}; numDataFiles: {}; path[0]: {}",
107+
this.destTableIdStr,
108+
this.partitionColName,
109+
this.partitionValue,
110+
dataFiles.size(),
111+
dataFiles.get(0).path()
112+
);
113+
Retryer<Void> overwritePartitionsRetryer = createOverwritePartitionsRetryer();
114+
overwritePartitionsRetryer.call(() -> {
115+
destTable.overwritePartition(dataFiles, this.partitionColName, this.partitionValue);
116+
return null;
117+
});
118+
log.info("~{}~ Successful partition overwrite - partition: {}; value: {}",
119+
this.destTableIdStr,
120+
this.partitionColName,
121+
this.partitionValue
122+
);
123+
} catch (ExecutionException executionException) {
124+
String msg = String.format("~%s~ Failed to overwrite partitions", this.destTableIdStr);
125+
log.error(msg, executionException);
126+
throw new RuntimeException(msg, executionException.getCause());
127+
} catch (RetryException retryException) {
128+
String interruptedNote = Thread.currentThread().isInterrupted() ? "... then interrupted" : "";
129+
String msg = String.format("~%s~ Failure attempting to overwrite partition [num failures: %d] %s",
130+
this.destTableIdStr,
131+
retryException.getNumberOfFailedAttempts(),
132+
interruptedNote);
133+
Throwable informativeException = retryException.getLastFailedAttempt().hasException()
134+
? retryException.getLastFailedAttempt().getExceptionCause()
135+
: retryException;
136+
log.error(msg, informativeException);
137+
throw new RuntimeException(msg, informativeException);
138+
}
139+
}
140+
141+
protected IcebergCatalog createDestinationCatalog() throws IOException {
142+
return IcebergDatasetFinder.createIcebergCatalog(this.properties, IcebergDatasetFinder.CatalogLocation.DESTINATION);
143+
}
144+
145+
private Retryer<Void> createOverwritePartitionsRetryer() {
146+
Config config = ConfigFactory.parseProperties(this.properties);
147+
Config retryerOverridesConfig = config.hasPath(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX)
148+
? config.getConfig(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX)
149+
: ConfigFactory.empty();
150+
151+
return RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG), Optional.of(new RetryListener() {
152+
@Override
153+
public <V> void onRetry(Attempt<V> attempt) {
154+
if (attempt.hasException()) {
155+
String msg = String.format("~%s~ Exception while overwriting partitions [attempt: %d; elapsed: %s]",
156+
destTableIdStr,
157+
attempt.getAttemptNumber(),
158+
Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
159+
log.warn(msg, attempt.getExceptionCause());
160+
}
161+
}
162+
}));
163+
}
164+
}

0 commit comments

Comments
 (0)