diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/HiveTable.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/HiveTable.java index 1ff18265012..7880de2d208 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/HiveTable.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/HiveTable.java @@ -27,6 +27,7 @@ import com.google.common.base.Splitter; +import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils; import org.apache.gobblin.util.HiveJdbcConnector; @@ -42,7 +43,7 @@ public abstract class HiveTable { protected List attributes; public static class Builder> { - protected String name = UUID.randomUUID().toString().replaceAll("-", "_"); + protected String name = HiveMetaStoreUtils.getHiveTableName(UUID.randomUUID().toString()); protected List primaryKeys = new ArrayList<>(); protected List attributes = new ArrayList<>(); diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java index 7c7f6379905..46dbdc800fe 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java @@ -30,6 +30,8 @@ import com.google.common.annotations.VisibleForTesting; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.dataset.Dataset; import org.apache.gobblin.dataset.DatasetsFinder; import org.apache.gobblin.util.PropertiesUtils; @@ -39,6 +41,7 @@ /** * A decorator for filtering datasets after a {@link DatasetsFinder} finds a {@link List} of {@link Dataset}s */ +@Slf4j public class DatasetsFinderFilteringDecorator implements DatasetsFinder { private static final String PREFIX = "filtering.datasets.finder."; public static final String DATASET_CLASS = PREFIX + "class"; @@ -69,6 +72,7 @@ public DatasetsFinderFilteringDecorator(FileSystem fs, Properties properties) th @Override public List findDatasets() throws IOException { List datasets = datasetFinder.findDatasets(); + log.info("Found {} datasets", datasets.size()); List allowedDatasets = Collections.emptyList(); try { allowedDatasets = datasets.parallelStream() @@ -83,6 +87,7 @@ public List findDatasets() throws IOException { wrappedIOException.rethrowWrapped(); } + log.info("Allowed {}/{} datasets", allowedDatasets.size() ,datasets.size()); return allowedDatasets; } diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java index f7dac0c08b1..7603dfa7eb0 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java @@ -29,8 +29,6 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; import org.apache.commons.lang.reflect.MethodUtils; -import org.apache.gobblin.hive.avro.HiveAvroSerDeManager; -import org.apache.gobblin.hive.spec.HiveSpec; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -73,6 +71,8 @@ import org.apache.gobblin.hive.HiveRegistrationUnit.Column; import org.apache.gobblin.hive.HiveTable; import org.apache.gobblin.hive.SharedHiveConfKey; +import org.apache.gobblin.hive.avro.HiveAvroSerDeManager; +import org.apache.gobblin.hive.spec.HiveSpec; /** @@ -151,6 +151,15 @@ public static HiveTable getHiveTable(Table table) { return hiveTable; } + /** + * Hive does not use '-' or '.' in the table name, so they are replaced with '_' + * @param topic + * @return + */ + public static String getHiveTableName(String topic) { + return topic.replaceAll("[-.]", "_"); + } + /** * Convert a {@link HivePartition} into a {@link Partition}. */ @@ -289,7 +298,8 @@ public static boolean containsNonOptionalUnionTypeColumn(HiveTable hiveTable) { .anyMatch(type -> isNonOptionalUnion(type)); } - throw new RuntimeException("Avro based Hive tables without \"" + HiveAvroSerDeManager.SCHEMA_LITERAL +"\" are not supported"); + throw new RuntimeException("Avro based Hive tables without \"" + HiveAvroSerDeManager.SCHEMA_LITERAL +"\" are not supported. " + + "hiveTable=" + hiveTable.getDbName() + "." + hiveTable.getTableName()); } /** diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java index 655c764c794..c74afaeec2a 100644 --- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java +++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java @@ -76,7 +76,8 @@ private DbAndTable getDbAndTable(T dataset) { throw new IllegalStateException(String.format("Dataset urn [%s] doesn't follow expected pattern. " + "Expected pattern = %s", dataset.getUrn(), pattern.pattern())); } - return new DbAndTable(m.group(1), m.group(2)); + + return new DbAndTable(m.group(1), HiveMetaStoreUtils.getHiveTableName(m.group(2))); } boolean containsNonOptionalUnion(HiveTable table) { diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java index 33cee4929dc..14041cdecb3 100644 --- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java +++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java @@ -17,17 +17,10 @@ package org.apache.gobblin.iceberg.predicates; -import com.google.common.io.Files; import java.io.File; import java.util.Collections; -import lombok.extern.slf4j.Slf4j; + import org.apache.commons.io.FileUtils; -import org.apache.gobblin.configuration.State; -import org.apache.gobblin.dataset.Dataset; -import org.apache.gobblin.dataset.test.SimpleDatasetForTesting; -import org.apache.gobblin.hive.HiveTable; -import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils; -import org.apache.gobblin.util.ConfigUtils; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; @@ -40,16 +33,30 @@ import org.testng.annotations.BeforeSuite; import org.testng.annotations.Test; +import com.google.common.io.Files; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.dataset.Dataset; +import org.apache.gobblin.dataset.test.SimpleDatasetForTesting; +import org.apache.gobblin.hive.HiveTable; +import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils; +import org.apache.gobblin.util.ConfigUtils; + + @Slf4j +// depends on icebergMetadataWriterTest to avoid concurrency between other HiveMetastoreTest(s) in CI. +// You can uncomment the dependsOnGroups if you want to test this class in isolation @Test(dependsOnGroups = "icebergMetadataWriterTest") public class DatasetHiveSchemaContainsNonOptionalUnionTest extends HiveMetastoreTest { - private static String dbName = "dbname_" + - DatasetHiveSchemaContainsNonOptionalUnionTest.class.getSimpleName().toLowerCase(); + private static String dbName = "dbName"; private static File tmpDir; private static State state; private static String dbUri; - private static String testTable = "test_table"; + private static String testTable = "test_table01"; + private static String datasetUrn = String.format("/data/%s/streaming/test-Table01/hourly/2023/01/01", dbName); @AfterSuite public void clean() throws Exception { @@ -77,14 +84,14 @@ public void setup() throws Exception { metastoreClient.createTable(HiveMetaStoreUtils.getTable(testTable)); state = ConfigUtils.configToState(ConfigUtils.propertiesToConfig(hiveConf.getAllProperties())); - state.setProp(DatasetHiveSchemaContainsNonOptionalUnion.PATTERN, "/data/(\\w+)/(\\w+)"); + state.setProp(DatasetHiveSchemaContainsNonOptionalUnion.PATTERN, "/data/(\\w+)/.*/([\\w\\d_-]+)/hourly.*"); Assert.assertNotNull(metastoreClient.getTable(dbName, DatasetHiveSchemaContainsNonOptionalUnionTest.testTable)); } @Test public void testContainsNonOptionalUnion() throws Exception { DatasetHiveSchemaContainsNonOptionalUnion predicate = new DatasetHiveSchemaContainsNonOptionalUnion(state.getProperties()); - Dataset dataset = new SimpleDatasetForTesting("/data/" + dbName + "/" + testTable); + Dataset dataset = new SimpleDatasetForTesting(datasetUrn); Assert.assertTrue(predicate.test(dataset)); }