Skip to content

Commit 916d570

Browse files
authored
[GOBBLIN-2167] Allow filtering of Hive datasets by underlying HDFS folder location (#4069)
* Add regex filter for table based on location
1 parent e3baf91 commit 916d570

File tree

2 files changed

+69
-8
lines changed

2 files changed

+69
-8
lines changed

gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinder.java

+24-8
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,14 @@
1717

1818
package org.apache.gobblin.data.management.copy.hive;
1919

20-
import com.google.common.base.Throwables;
2120
import java.io.IOException;
2221
import java.lang.reflect.InvocationTargetException;
2322
import java.net.URISyntaxException;
2423
import java.util.Collection;
2524
import java.util.Iterator;
2625
import java.util.List;
2726
import java.util.Properties;
28-
29-
import javax.annotation.Nonnull;
30-
31-
import lombok.Data;
32-
import lombok.Getter;
33-
import lombok.extern.slf4j.Slf4j;
27+
import java.util.regex.Pattern;
3428

3529
import org.apache.commons.lang3.StringUtils;
3630
import org.apache.commons.lang3.reflect.ConstructorUtils;
@@ -43,12 +37,18 @@
4337
import com.google.common.base.Optional;
4438
import com.google.common.base.Preconditions;
4539
import com.google.common.base.Predicate;
40+
import com.google.common.base.Throwables;
4641
import com.google.common.collect.AbstractIterator;
4742
import com.google.common.collect.Iterables;
4843
import com.google.common.collect.Lists;
4944
import com.typesafe.config.Config;
5045
import com.typesafe.config.ConfigFactory;
5146

47+
import javax.annotation.Nonnull;
48+
import lombok.Data;
49+
import lombok.Getter;
50+
import lombok.extern.slf4j.Slf4j;
51+
5252
import org.apache.gobblin.config.client.ConfigClient;
5353
import org.apache.gobblin.config.client.ConfigClientCache;
5454
import org.apache.gobblin.config.client.ConfigClientUtils;
@@ -80,6 +80,9 @@ public class HiveDatasetFinder implements IterableDatasetFinder<HiveDataset> {
8080
public static final String DEFAULT_TABLE_PATTERN = "*";
8181
public static final String TABLE_FILTER = HIVE_DATASET_PREFIX + ".tableFilter";
8282

83+
// Property used to filter tables only physically within a folder, represented by a regex
84+
public static final String TABLE_FOLDER_ALLOWLIST_FILTER = HIVE_DATASET_PREFIX + ".tableFolderAllowlistFilter";
85+
8386
/*
8487
* By setting the prefix, only config keys with this prefix will be used to build a HiveDataset.
8588
* By passing scoped configurations the same config keys can be used in different contexts.
@@ -118,6 +121,8 @@ public class HiveDatasetFinder implements IterableDatasetFinder<HiveDataset> {
118121
protected final Function<Table, String> configStoreDatasetUriBuilder;
119122
protected final Optional<Predicate<Table>> tableFilter;
120123

124+
protected final Optional<Pattern> tableFolderAllowlistRegex;
125+
121126
protected final String datasetConfigPrefix;
122127
protected final ConfigClient configClient;
123128
private final Config jobConfig;
@@ -194,6 +199,8 @@ protected HiveDatasetFinder(FileSystem fs, Properties properties, HiveMetastoreC
194199
} else {
195200
this.tableFilter = Optional.absent();
196201
}
202+
this.tableFolderAllowlistRegex = properties.containsKey(TABLE_FOLDER_ALLOWLIST_FILTER) ?
203+
Optional.of(Pattern.compile(properties.getProperty(TABLE_FOLDER_ALLOWLIST_FILTER))): Optional.absent();
197204
}
198205

199206
protected static HiveMetastoreClientPool createClientPool(Properties properties) throws IOException {
@@ -262,7 +269,10 @@ protected HiveDataset computeNext() {
262269

263270
try (AutoReturnableObject<IMetaStoreClient> client = HiveDatasetFinder.this.clientPool.getClient()) {
264271
Table table = client.get().getTable(dbAndTable.getDb(), dbAndTable.getTable());
265-
if (tableFilter.isPresent() && !tableFilter.get().apply(table)) {
272+
if ((tableFilter.isPresent() && !tableFilter.get().apply(table))
273+
|| !shouldAllowTableLocation(tableFolderAllowlistRegex, table)) {
274+
log.info("Ignoring table {} as its underlying location {} does not pass allowlist regex {}", dbAndTable,
275+
table.getSd().getLocation(), tableFolderAllowlistRegex.get());
266276
continue;
267277
}
268278

@@ -294,6 +304,12 @@ protected HiveDataset computeNext() {
294304
};
295305
}
296306

307+
protected static boolean shouldAllowTableLocation(Optional<Pattern> regex, Table table) {
308+
if (!regex.isPresent()) {
309+
return true;
310+
}
311+
return regex.get().matcher(table.getSd().getLocation()).matches();
312+
}
297313

298314
/**
299315
* @deprecated Use {@link #createHiveDataset(Table, Config)} instead

gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinderTest.java

+45
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,51 @@ public void testDatasetConfig() throws Exception {
215215

216216
}
217217

218+
@Test
219+
public void testHiveTableFolderAllowlistFilter() throws Exception {
220+
List<HiveDatasetFinder.DbAndTable> dbAndTables = Lists.newArrayList();
221+
dbAndTables.add(new HiveDatasetFinder.DbAndTable("db1", "table1"));
222+
// This table is created on /tmp/test
223+
HiveMetastoreClientPool pool = getTestPool(dbAndTables);
224+
225+
Properties properties = new Properties();
226+
properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + WhitelistBlacklist.WHITELIST, "");
227+
// Try a regex with multiple groups
228+
properties.put(HiveDatasetFinder.TABLE_FOLDER_ALLOWLIST_FILTER, "(/tmp/|a).*");
229+
230+
HiveDatasetFinder finder = new TestHiveDatasetFinder(FileSystem.getLocal(new Configuration()), properties, pool);
231+
List<HiveDataset> datasets = Lists.newArrayList(finder.getDatasetsIterator());
232+
233+
Assert.assertEquals(datasets.size(), 1);
234+
235+
properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + WhitelistBlacklist.WHITELIST, "");
236+
// The table located at /tmp/test should be filtered
237+
properties.put(HiveDatasetFinder.TABLE_FOLDER_ALLOWLIST_FILTER, "/a/b");
238+
239+
finder = new TestHiveDatasetFinder(FileSystem.getLocal(new Configuration()), properties, pool);
240+
datasets = Lists.newArrayList(finder.getDatasetsIterator());
241+
242+
Assert.assertEquals(datasets.size(), 0);
243+
244+
// Test empty filter
245+
properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + WhitelistBlacklist.WHITELIST, "");
246+
// The table located at /tmp/test should be filtered
247+
properties.put(HiveDatasetFinder.TABLE_FOLDER_ALLOWLIST_FILTER, "");
248+
249+
finder = new TestHiveDatasetFinder(FileSystem.getLocal(new Configuration()), properties, pool);
250+
datasets = Lists.newArrayList(finder.getDatasetsIterator());
251+
252+
Assert.assertEquals(datasets.size(), 0);
253+
254+
// Test no regex config
255+
properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + WhitelistBlacklist.WHITELIST, "");
256+
257+
finder = new TestHiveDatasetFinder(FileSystem.getLocal(new Configuration()), properties, pool);
258+
datasets = Lists.newArrayList(finder.getDatasetsIterator());
259+
260+
Assert.assertEquals(datasets.size(), 0);
261+
}
262+
218263
private HiveMetastoreClientPool getTestPool(List<HiveDatasetFinder.DbAndTable> dbAndTables) throws Exception {
219264

220265
SetMultimap<String, String> entities = HashMultimap.create();

0 commit comments

Comments
 (0)