diff --git a/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCache.java b/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCache.java index fe135516359a..0ea1de7965cc 100644 --- a/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCache.java +++ b/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCache.java @@ -21,8 +21,6 @@ import com.google.common.cache.CacheStats; import com.google.common.cache.LoadingCache; import com.google.common.cache.RemovalCause; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; @@ -32,6 +30,8 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -41,7 +41,6 @@ import java.util.concurrent.ExecutionException; import static com.google.common.base.MoreObjects.toStringHelper; -import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -144,42 +143,50 @@ public ImmutableMap getAll(Iterable keys) throws ExecutionException { List> newTokens = new ArrayList<>(); + List> temporaryTokens = new ArrayList<>(); try { - BiMap> keyToToken = HashBiMap.create(); + Map result = new LinkedHashMap<>(); for (K key : keys) { + if (result.containsKey(key)) { + continue; + } // This is not bulk, but is fast local operation Token newToken = new Token<>(key); - Token token = tokens.computeIfAbsent(key, ignored -> newToken); - keyToToken.put(key, token); - if (token == newToken) { - newTokens.add(newToken); + Token oldToken = tokens.putIfAbsent(key, newToken); + if (oldToken != null) { + // Token exists but a data may not exist (e.g. due to concurrent eviction) + V value = dataCache.getIfPresent(oldToken); + if (value != null) { + result.put(key, value); + continue; + } + // Old token exists but value wasn't found. This can happen when there is concurrent eviction/invalidation + // or when the value is still being loaded. The new token is not registered in tokens, so won't be used + // by subsequent invocations. + temporaryTokens.add(newToken); } + newTokens.add(newToken); } - Map, V> values = dataCache.getAll(keyToToken.values()); - - BiMap, K> tokenToKey = keyToToken.inverse(); - ImmutableMap.Builder result = ImmutableMap.builder(); + Map, V> values = dataCache.getAll(newTokens); for (Map.Entry, V> entry : values.entrySet()) { - Token token = entry.getKey(); - - // While token.getKey() returns equal key, a caller may expect us to maintain key identity, in case equal keys are still distinguishable. - K key = tokenToKey.get(token); - checkState(key != null, "No key found for %s in %s when loading %s", token, tokenToKey, keys); - - result.put(key, entry.getValue()); + Token newToken = entry.getKey(); + result.put(newToken.getKey(), entry.getValue()); } - return result.buildOrThrow(); + return ImmutableMap.copyOf(result); } catch (Throwable e) { for (Token token : newTokens) { - // Failed to load and it was our new token persisted in tokens map. + // Failed to load and it was our new token (potentially) persisted in tokens map. // No cache entry exists for the token (unless concurrent load happened), // so we need to remove it. tokens.remove(token.getKey(), token); } throw e; } + finally { + dataCache.invalidateAll(temporaryTokens); + } } @Override @@ -226,6 +233,16 @@ public void invalidateAll() tokens.clear(); } + // Not thread safe, test only. + @VisibleForTesting + void clearDataCacheOnly() + { + Map> tokensCopy = new HashMap<>(tokens); + dataCache.asMap().clear(); + verify(tokens.isEmpty(), "Clearing dataCache should trigger tokens eviction"); + tokens.putAll(tokensCopy); + } + @Override public CacheStats stats() { @@ -406,7 +423,17 @@ public Map, V> loadAll(Iterable> tokens) for (Token token : tokenList) { keys.add(token.getKey()); } - Map values = delegate.loadAll(keys); + Map values; + try { + values = delegate.loadAll(keys); + } + catch (UnsupportedLoadingOperationException e) { + // Guava uses UnsupportedLoadingOperationException in LoadingCache.loadAll to fall back from bulk loading (without load sharing) + // to loading individual values (with load sharing). EvictableCache implementation does not currently support the fallback mechanism, + // so the individual values would be loaded without load sharing. This would be an unintentional and non-obvious behavioral + // discrepancy between EvictableCache and Guava Caches, so the mechanism is disabled. + throw new UnsupportedOperationException("LoadingCache.getAll() is not supported by EvictableCache when CacheLoader.loadAll is not implemented", e); + } ImmutableMap.Builder, V> result = ImmutableMap.builder(); for (int i = 0; i < tokenList.size(); i++) { diff --git a/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableCache.java b/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableCache.java index c704e4172708..79f03ac6f853 100644 --- a/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableCache.java +++ b/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableCache.java @@ -16,6 +16,7 @@ import com.google.common.base.Strings; import com.google.common.cache.Cache; import com.google.common.cache.CacheStats; +import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.gaul.modernizer_maven_annotations.SuppressModernizer; @@ -25,15 +26,18 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.collect.cache.CacheStatsAssertions.assertCacheStats; @@ -275,6 +279,60 @@ private static Integer newInteger(int value) return newInteger; } + /** + * Test that the loader is invoked only once for concurrent invocations of {{@link LoadingCache#get(Object, Callable)} with equal keys. + * This is a behavior of Guava Cache as well. While this is necessarily desirable behavior (see + * https://github.com/trinodb/trino/issues/11067), + * the test exists primarily to document current state and support discussion, should the current state change. + */ + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testConcurrentGetWithCallableShareLoad() + throws Exception + { + AtomicInteger loads = new AtomicInteger(); + AtomicInteger concurrentInvocations = new AtomicInteger(); + + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(); + + int threads = 2; + int invocationsPerThread = 100; + ExecutorService executor = newFixedThreadPool(threads); + try { + CyclicBarrier barrier = new CyclicBarrier(threads); + List> futures = new ArrayList<>(); + for (int i = 0; i < threads; i++) { + futures.add(executor.submit(() -> { + for (int invocation = 0; invocation < invocationsPerThread; invocation++) { + int key = invocation; + barrier.await(10, SECONDS); + int value = cache.get(key, () -> { + loads.incrementAndGet(); + int invocations = concurrentInvocations.incrementAndGet(); + checkState(invocations == 1, "There should be no concurrent invocations, cache should do load sharing when get() invoked for same key"); + Thread.sleep(1); + concurrentInvocations.decrementAndGet(); + return -key; + }); + assertEquals(value, -invocation); + } + return null; + })); + } + + for (Future future : futures) { + future.get(10, SECONDS); + } + assertThat(loads).as("loads") + .hasValueBetween(invocationsPerThread, threads * invocationsPerThread - 1 /* inclusive */); + } + finally { + executor.shutdownNow(); + executor.awaitTermination(10, SECONDS); + } + } + /** * Covers https://github.com/google/guava/issues/1881 */ diff --git a/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableLoadingCache.java b/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableLoadingCache.java index 26919bfcc865..7563455a13f1 100644 --- a/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableLoadingCache.java +++ b/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableLoadingCache.java @@ -18,24 +18,29 @@ import com.google.common.cache.CacheStats; import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.gaul.modernizer_maven_annotations.SuppressModernizer; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.getOnlyElement; @@ -47,6 +52,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotSame; import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; @@ -222,6 +228,210 @@ public void testGetAllMaintainsKeyIdentity() assertSame(entry.getKey(), second); } + /** + * Test that they keys provided to {@link LoadingCache#get(Object)} are not necessarily the ones provided to + * {@link CacheLoader#load(Object)}. While guarantying this would be obviously desirable (as in + * {@link #testGetAllMaintainsKeyIdentityForBulkLoader}), it seems not feasible to do this while + * also maintain load sharing (see {@link #testConcurrentGetShareLoad()}). + */ + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testGetDoesNotMaintainKeyIdentityForLoader() + throws Exception + { + AtomicInteger loadCounter = new AtomicInteger(); + int firstAdditionalField = 1; + int secondAdditionalField = 123456789; + + LoadingCache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(CacheLoader.from((ClassWithPartialEquals key) -> { + loadCounter.incrementAndGet(); + assertEquals(key.getAdditionalField(), firstAdditionalField); // not secondAdditionalField because get() reuses existing token + return key.getValue(); + })); + + ClassWithPartialEquals keyA = new ClassWithPartialEquals(42, firstAdditionalField); + ClassWithPartialEquals keyB = new ClassWithPartialEquals(42, secondAdditionalField); + // sanity check: objects are equal despite having different observed state + assertEquals(keyA, keyB); + assertNotEquals(keyA.getAdditionalField(), keyB.getAdditionalField()); + + // Populate the cache + assertEquals((int) cache.get(keyA, () -> 317), 317); + assertEquals(loadCounter.get(), 0); + + // invalidate dataCache but keep tokens -- simulate concurrent implicit or explicit eviction + ((EvictableCache) cache).clearDataCacheOnly(); + assertEquals((int) cache.get(keyB), 42); + assertEquals(loadCounter.get(), 1); + } + + /** + * Test that they keys provided to {@link LoadingCache#getAll(Iterable)} are the ones provided to {@link CacheLoader#loadAll(Iterable)}. + * It is possible that {@link CacheLoader#loadAll(Iterable)} requires keys to have some special characteristics and some + * other, equal keys, derived from {@code EvictableCache.tokens}, may not have that characteristics. + * This can happen only when cache keys are not fully value-based. While discouraged, this situation is possible. + * Guava Cache also exhibits the behavior tested here. + */ + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testGetAllMaintainsKeyIdentityForBulkLoader() + throws Exception + { + AtomicInteger loadAllCounter = new AtomicInteger(); + int expectedAdditionalField = 123456789; + + LoadingCache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(new CacheLoader() + { + @Override + public Integer load(ClassWithPartialEquals key) + { + throw new UnsupportedOperationException(); + } + + @Override + public Map loadAll(Iterable keys) + { + loadAllCounter.incrementAndGet(); + // For the sake of simplicity, the test currently leverages that getAll() with singleton list will + // end up calling loadAll() even though load() could be used. + ClassWithPartialEquals key = getOnlyElement(keys); + assertEquals(key.getAdditionalField(), expectedAdditionalField); + return ImmutableMap.of(key, key.getValue()); + } + }); + + ClassWithPartialEquals keyA = new ClassWithPartialEquals(42, 1); + ClassWithPartialEquals keyB = new ClassWithPartialEquals(42, expectedAdditionalField); + // sanity check: objects are equal despite having different observed state + assertEquals(keyA, keyB); + assertNotEquals(keyA.getAdditionalField(), keyB.getAdditionalField()); + + // Populate the cache + assertEquals((int) cache.get(keyA, () -> 317), 317); + assertEquals(loadAllCounter.get(), 0); + + // invalidate dataCache but keep tokens -- simulate concurrent implicit or explicit eviction + ((EvictableCache) cache).clearDataCacheOnly(); + Map map = cache.getAll(ImmutableList.of(keyB)); + assertThat(map).hasSize(1); + assertSame(getOnlyElement(map.keySet()), keyB); + assertEquals((int) getOnlyElement(map.values()), 42); + assertEquals(loadAllCounter.get(), 1); + } + + /** + * Test that the loader is invoked only once for concurrent invocations of {{@link LoadingCache#get(Object, Callable)} with equal keys. + * This is a behavior of Guava Cache as well. While this is necessarily desirable behavior (see + * https://github.com/trinodb/trino/issues/11067), + * the test exists primarily to document current state and support discussion, should the current state change. + */ + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testConcurrentGetWithCallableShareLoad() + throws Exception + { + AtomicInteger loads = new AtomicInteger(); + AtomicInteger concurrentInvocations = new AtomicInteger(); + + LoadingCache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(CacheLoader.from(() -> { + throw new UnsupportedOperationException(); + })); + + int threads = 2; + int invocationsPerThread = 100; + ExecutorService executor = newFixedThreadPool(threads); + try { + CyclicBarrier barrier = new CyclicBarrier(threads); + List> futures = new ArrayList<>(); + for (int i = 0; i < threads; i++) { + futures.add(executor.submit(() -> { + for (int invocation = 0; invocation < invocationsPerThread; invocation++) { + int key = invocation; + barrier.await(10, SECONDS); + int value = cache.get(key, () -> { + loads.incrementAndGet(); + int invocations = concurrentInvocations.incrementAndGet(); + checkState(invocations == 1, "There should be no concurrent invocations, cache should do load sharing when get() invoked for same key"); + Thread.sleep(1); + concurrentInvocations.decrementAndGet(); + return -key; + }); + assertEquals(value, -invocation); + } + return null; + })); + } + + for (Future future : futures) { + future.get(10, SECONDS); + } + assertThat(loads).as("loads") + .hasValueBetween(invocationsPerThread, threads * invocationsPerThread - 1 /* inclusive */); + } + finally { + executor.shutdownNow(); + executor.awaitTermination(10, SECONDS); + } + } + + /** + * Test that the loader is invoked only once for concurrent invocations of {{@link LoadingCache#get(Object)} with equal keys. + */ + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testConcurrentGetShareLoad() + throws Exception + { + AtomicInteger loads = new AtomicInteger(); + AtomicInteger concurrentInvocations = new AtomicInteger(); + + LoadingCache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(new CacheLoader() + { + @Override + public Integer load(Integer key) + throws Exception + { + loads.incrementAndGet(); + int invocations = concurrentInvocations.incrementAndGet(); + checkState(invocations == 1, "There should be no concurrent invocations, cache should do load sharing when get() invoked for same key"); + Thread.sleep(1); + concurrentInvocations.decrementAndGet(); + return -key; + } + }); + + int threads = 2; + int invocationsPerThread = 100; + ExecutorService executor = newFixedThreadPool(threads); + try { + CyclicBarrier barrier = new CyclicBarrier(threads); + List> futures = new ArrayList<>(); + for (int i = 0; i < threads; i++) { + futures.add(executor.submit(() -> { + for (int invocation = 0; invocation < invocationsPerThread; invocation++) { + barrier.await(10, SECONDS); + assertEquals((int) cache.get(invocation), -invocation); + } + return null; + })); + } + + for (Future future : futures) { + future.get(10, SECONDS); + } + assertThat(loads).as("loads") + .hasValueBetween(invocationsPerThread, threads * invocationsPerThread - 1 /* inclusive */); + } + finally { + executor.shutdownNow(); + executor.awaitTermination(10, SECONDS); + } + } + /** * Covers https://github.com/google/guava/issues/1881 */ @@ -372,4 +582,44 @@ public void testInvalidateAndLoadConcurrently(Invalidation invalidation) executor.awaitTermination(10, SECONDS); } } + + /** + * A class implementing value-based equality taking into account some fields, but not all. + * This is definitely discouraged, but still may happen in practice. + */ + private static class ClassWithPartialEquals + { + private final int value; + private final int additionalField; // not part of equals + + public ClassWithPartialEquals(int value, int additionalField) + { + this.value = value; + this.additionalField = additionalField; + } + + public int getValue() + { + return value; + } + + public int getAdditionalField() + { + return additionalField; + } + + @Override + public boolean equals(Object other) + { + return other != null && + this.getClass() == other.getClass() && + this.value == ((ClassWithPartialEquals) other).value; + } + + @Override + public int hashCode() + { + return value; + } + } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/cache/TestCachingHiveMetastore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/cache/TestCachingHiveMetastore.java index e77ef487481e..010c115d3875 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/cache/TestCachingHiveMetastore.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/cache/TestCachingHiveMetastore.java @@ -57,6 +57,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -84,6 +85,7 @@ import static io.trino.plugin.hive.metastore.thrift.MockThriftMetastoreClient.TEST_COLUMN; import static io.trino.plugin.hive.metastore.thrift.MockThriftMetastoreClient.TEST_DATABASE; import static io.trino.plugin.hive.metastore.thrift.MockThriftMetastoreClient.TEST_PARTITION1; +import static io.trino.plugin.hive.metastore.thrift.MockThriftMetastoreClient.TEST_PARTITION1_VALUE; import static io.trino.plugin.hive.metastore.thrift.MockThriftMetastoreClient.TEST_PARTITION2; import static io.trino.plugin.hive.metastore.thrift.MockThriftMetastoreClient.TEST_PARTITION_VALUES1; import static io.trino.plugin.hive.metastore.thrift.MockThriftMetastoreClient.TEST_ROLES; @@ -250,6 +252,71 @@ public void testGetPartitionNames() assertEquals(mockClient.getAccessCount(), 2); } + /** + * Test {@link CachingHiveMetastore#getPartition(Table, List)} followed by + * {@link CachingHiveMetastore#getPartitionsByNames(Table, List)}. + *

+ * At the moment of writing, CachingHiveMetastore uses HivePartitionName for keys in partition cache. + * HivePartitionName has a peculiar, semi- value-based equality. HivePartitionName may or may not be missing + * a name and it matters for bulk load, but it doesn't matter for single-partition load. + * Because of equality semantics, the cache keys may gets mixed during bulk load. + */ + @Test + public void testGetPartitionThenGetPartitions() + { + Table table = metastore.getTable(TEST_DATABASE, TEST_TABLE).orElseThrow(); + Optional firstRead = metastore.getPartition(table, List.of(TEST_PARTITION1_VALUE)); + assertThat(firstRead).isPresent(); + assertThat(firstRead.get().getValues()).isEqualTo(List.of(TEST_PARTITION1_VALUE)); + + Map> byName = metastore.getPartitionsByNames(table, List.of(TEST_PARTITION1)); + assertThat(byName).containsOnlyKeys(TEST_PARTITION1); + Optional secondRead = byName.get(TEST_PARTITION1); + assertThat(secondRead).isPresent(); + assertThat(secondRead.get().getValues()).isEqualTo(List.of(TEST_PARTITION1_VALUE)); + } + + /** + * A variant of {@link #testGetPartitionThenGetPartitions} where the second get happens concurrently with eviction, + * here simulated with an explicit invalidation. + */ + // Repeat test with invocationCount for better test coverage, since the tested aspect is inherently non-deterministic. + @Test(timeOut = 60_000, invocationCount = 20) + public void testGetPartitionThenGetPartitionsRacingWithInvalidation() + throws Exception + { + Table table = metastore.getTable(TEST_DATABASE, TEST_TABLE).orElseThrow(); + Optional firstRead = metastore.getPartition(table, List.of(TEST_PARTITION1_VALUE)); + assertThat(firstRead).isPresent(); + assertThat(firstRead.get().getValues()).isEqualTo(List.of(TEST_PARTITION1_VALUE)); + + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + CyclicBarrier barrier = new CyclicBarrier(2); + Future invalidation = executor.submit(() -> { + barrier.await(10, SECONDS); + metastore.flushCache(); + return null; + }); + + Future>> read = executor.submit(() -> { + barrier.await(10, SECONDS); + return metastore.getPartitionsByNames(table, List.of(TEST_PARTITION1)); + }); + + Map> byName = read.get(); + assertThat(byName).containsOnlyKeys(TEST_PARTITION1); + Optional secondRead = byName.get(TEST_PARTITION1); + assertThat(secondRead).isPresent(); + assertThat(secondRead.get().getValues()).isEqualTo(List.of(TEST_PARTITION1_VALUE)); + + invalidation.get(); // no exception raised + } + finally { + executor.shutdownNow(); + } + } + @Test public void testInvalidGetPartitionNamesByFilterAll() { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/MockThriftMetastoreClient.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/MockThriftMetastoreClient.java index b9e165a56e92..bae22d3207bb 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/MockThriftMetastoreClient.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/MockThriftMetastoreClient.java @@ -56,7 +56,8 @@ public class MockThriftMetastoreClient public static final String TEST_DATABASE = "testdb"; public static final String BAD_DATABASE = "baddb"; public static final String TEST_TABLE = "testtbl"; - public static final String TEST_PARTITION1 = "key=testpartition1"; + public static final String TEST_PARTITION1_VALUE = "testpartition1"; + public static final String TEST_PARTITION1 = "key=" + TEST_PARTITION1_VALUE; public static final String TEST_COLUMN = "column"; public static final String TEST_PARTITION2 = "key=testpartition2"; public static final String BAD_PARTITION = "key=badpartition1";