Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e448d82
Hive: Set the Table owner on table creation (#5763)
Oct 21, 2022
8cd8ff6
Hive: Set the database owner on namespace creation (#6045)
haizhou-zhao Nov 28, 2022
18151d2
Hive: Merge identical catch branch (#6477)
krvikash Dec 23, 2022
79a5450
Hive: Lock hardening (#6451)
pvary Jan 11, 2023
7633414
Hive: Make UGI current user the owner of new Hive objects (#6324)
haizhou-zhao Jan 18, 2023
40e16a4
Build: Fix minor error-prone warnings (#6629)
ajantha-bhat Jan 23, 2023
a4b65f4
Core: Add SerializableTable (#2403)
aokolnychyi Apr 5, 2021
eb86d36
Fix formatting in iceberg-catalog/pom.xml
zsmiskolczi May 5, 2023
b984497
Hive: Fix concurrent transactions overwriting commits by adding hive …
SinghAsDev Aug 4, 2022
e22fae0
Hive: Refactor commit lock mechanism from HiveTableOperations (#6648)
pvary Feb 3, 2023
1c1a6fb
Use UGI shortUserName as the default owner of Hive objects (#6955)
zhouyifan279 Mar 7, 2023
77f7d02
Hive: Use EnvironmentContext instead of Hive Locks to provide transac…
pvary Apr 25, 2023
896f094
Nessie: Use AssertJ assertions (#2684)
nastra Jun 14, 2021
9176d72
Fix compile error
zsmiskolczi May 17, 2023
7ce6433
Try EXCL_WRITE lock
zsmiskolczi May 29, 2023
1dcbd84
Pass txnid to MetaStoreLock
zsmiskolczi Jun 1, 2023
8acbfd3
Upgrade Iceberg dependency from 1.2.1 to 1.3.0
zsmiskolczi Jun 7, 2023
210bb93
MR: Remove deprecated AssertHelpers (#7159)
liuxiaocs7 Mar 21, 2023
47f42e7
Hive: Support customizable ClientPool (#6698)
lirui-apache Mar 24, 2023
5a02894
Hive: Clean up expired metastore clients (#7310)
frankliee Apr 26, 2023
a9d56cd
Hive: Support connecting to multiple HMS-Catalog on same HMS URL (#7441)
szehon-ho May 1, 2023
7cba254
Hive: Remove deprecated AssertHelpers (#7482)
liuxiaocs7 May 2, 2023
a32dcb6
Build: Bump com.esotericsoftware:kryo-shaded from 4.0.2 to 4.0.3 (#7669)
dependabot[bot] May 22, 2023
f3ee8f3
"Fix failing tests - part I"
zsmiskolczi Jun 13, 2023
1d7e5fd
MR: Remove deprecated properties for 1.0 release (#5657)
danielcweeks Aug 28, 2022
4e10054
Fix failing tests - part II - regenerate qtest outputs
zsmiskolczi Jun 13, 2023
85b6e5f
Fix failing tests - part VI
zsmiskolczi Jun 14, 2023
5455159
Fix failing tests - part III - Ignore testFailedResidualFiltering.
Jun 14, 2023
1b53423
Core: Avoid creating new metadata file on registerTable (#6591)
krvikash Jan 25, 2023
3f003a0
Remove unused field
zsmiskolczi Jun 20, 2023
e7a8dbd
HIVE-27306: Code cleaning: removal of unused and outdated code.
Jun 20, 2023
6cb5cad
Revert "Pass txnid to MetaStoreLock"
zsmiskolczi Jun 21, 2023
92ccda4
Fix typo in Metastore
zsmiskolczi Jun 21, 2023
505f33f
Clean up patchec-iceberg-core
zsmiskolczi Jun 23, 2023
9b2d040
HIVE-27306: Update schema of partition metadata table
Jun 27, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion common/src/java/org/apache/hadoop/hive/conf/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ public class Constants {
public static final String TIME_POSTFIX_REQUEST_TRACK = "_TIME";

public static final String ICEBERG = "iceberg";
public static final String ICEBERG_PARTITION_TABLE_SCHEMA = "partition,record_count,file_count,spec_id";
public static final String ICEBERG_PARTITION_TABLE_SCHEMA = "partition,spec_id,record_count,file_count," +
"position_delete_record_count,position_delete_file_count,equality_delete_record_count," +
"equality_delete_file_count";
public static final String DELIMITED_JSON_SERDE = "org.apache.hadoop.hive.serde2.DelimitedJSONSerDe";
}
5 changes: 5 additions & 0 deletions iceberg/iceberg-catalog/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
<artifactId>hive-exec</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.immutables</groupId>
<artifactId>value</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-standalone-metastore-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,53 +21,95 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.ThreadPools;
import org.apache.thrift.TException;
import org.immutables.value.Value;

/**
* A ClientPool that caches the underlying HiveClientPool instances.
*
* <p>The following key elements are supported and can be specified via {@link
* CatalogProperties#CLIENT_POOL_CACHE_KEYS}:
*
* <ul>
* <li>ugi - the Hadoop UserGroupInformation instance that represents the current user using the
* cache.
* <li>user_name - similar to UGI but only includes the user's name determined by
* UserGroupInformation#getUserName.
* <li>conf - name of an arbitrary configuration. The value of the configuration will be extracted
* from catalog properties and added to the cache key. A conf element should start with a
* "conf:" prefix which is followed by the configuration name. E.g. specifying "conf:a.b.c"
* will add "a.b.c" to the key, and so that configurations with different default catalog
* wouldn't share the same client pool. Multiple conf elements can be specified.
* </ul>
*/
public class CachedClientPool implements ClientPool<IMetaStoreClient, TException> {

private static Cache<String, HiveClientPool> clientPoolCache;
private static final String CONF_ELEMENT_PREFIX = "conf:";

private static Cache<Key, HiveClientPool> clientPoolCache;

private final Configuration conf;
private final String metastoreUri;
private final int clientPoolSize;
private final long evictionInterval;
private final Key key;

public CachedClientPool(Configuration conf, Map<String, String> properties) {
this.conf = conf;
this.metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
this.clientPoolSize = PropertyUtil.propertyAsInt(properties,
CatalogProperties.CLIENT_POOL_SIZE,
CatalogProperties.CLIENT_POOL_SIZE_DEFAULT);
this.evictionInterval = PropertyUtil.propertyAsLong(properties,
CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT);
this.key = extractKey(properties.get(CatalogProperties.CLIENT_POOL_CACHE_KEYS), conf);
init();
}

@VisibleForTesting
HiveClientPool clientPool() {
return clientPoolCache.get(metastoreUri, k -> new HiveClientPool(clientPoolSize, conf));
return clientPoolCache.get(key, k -> new HiveClientPool(clientPoolSize, conf));
}

private synchronized void init() {
if (clientPoolCache == null) {
clientPoolCache = Caffeine.newBuilder().expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS)
.removalListener((key, value, cause) -> ((HiveClientPool) value).close())
// Since Caffeine does not ensure that removalListener will be involved after expiration
// We use a scheduler with one thread to clean up expired clients.
clientPoolCache =
Caffeine.newBuilder()
.expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS)
.removalListener((ignored, value, cause) -> ((HiveClientPool) value).close())
.scheduler(
Scheduler.forScheduledExecutorService(
ThreadPools.newScheduledPool("hive-metastore-cleaner", 1)))
.build();
}
}

@VisibleForTesting
static Cache<String, HiveClientPool> clientPoolCache() {
static Cache<Key, HiveClientPool> clientPoolCache() {
return clientPoolCache;
}

Expand All @@ -81,4 +123,91 @@ public <R> R run(Action<R, IMetaStoreClient, TException> action, boolean retry)
throws TException, InterruptedException {
return clientPool().run(action, retry);
}

@VisibleForTesting
static Key extractKey(String cacheKeys, Configuration conf) {
// generate key elements in a certain order, so that the Key instances are comparable
List<Object> elements = Lists.newArrayList();
elements.add(conf.get(HiveConf.ConfVars.METASTOREURIS.varname, ""));
elements.add(conf.get(HiveCatalog.HIVE_CONF_CATALOG, "hive"));
if (cacheKeys == null || cacheKeys.isEmpty()) {
return Key.of(elements);
}

Set<KeyElementType> types = Sets.newTreeSet(Comparator.comparingInt(Enum::ordinal));
Map<String, String> confElements = Maps.newTreeMap();
for (String element : cacheKeys.split(",", -1)) {
String trimmed = element.trim();
if (trimmed.toLowerCase(Locale.ROOT).startsWith(CONF_ELEMENT_PREFIX)) {
String key = trimmed.substring(CONF_ELEMENT_PREFIX.length());
ValidationException.check(
!confElements.containsKey(key), "Conf key element %s already specified", key);
confElements.put(key, conf.get(key));
} else {
KeyElementType type = KeyElementType.valueOf(trimmed.toUpperCase());
switch (type) {
case UGI:
case USER_NAME:
ValidationException.check(
!types.contains(type), "%s key element already specified", type.name());
types.add(type);
break;
default:
throw new ValidationException("Unknown key element %s", trimmed);
}
}
}
for (KeyElementType type : types) {
switch (type) {
case UGI:
try {
elements.add(UserGroupInformation.getCurrentUser());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
break;
case USER_NAME:
try {
elements.add(UserGroupInformation.getCurrentUser().getUserName());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
break;
default:
throw new RuntimeException("Unexpected key element " + type.name());
}
}
for (String key : confElements.keySet()) {
elements.add(ConfElement.of(key, confElements.get(key)));
}
return Key.of(elements);
}

@Value.Immutable
abstract static class Key {

abstract List<Object> elements();

private static Key of(Iterable<?> elements) {
return ImmutableKey.builder().elements(elements).build();
}
}

@Value.Immutable
abstract static class ConfElement {
abstract String key();

@Nullable
abstract String value();

static ConfElement of(String key, String value) {
return ImmutableConfElement.builder().key(key).value(value).build();
}
}

private enum KeyElementType {
UGI,
USER_NAME,
CONF
}
}
Loading