Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2167] Allow filtering of Hive datasets by underlying HDFS folder location #4069

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,14 @@

package org.apache.gobblin.data.management.copy.hive;

import com.google.common.base.Throwables;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

import javax.annotation.Nonnull;

import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.regex.Pattern;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
Expand All @@ -43,12 +37,18 @@
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import javax.annotation.Nonnull;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.config.client.ConfigClient;
import org.apache.gobblin.config.client.ConfigClientCache;
import org.apache.gobblin.config.client.ConfigClientUtils;
Expand Down Expand Up @@ -80,6 +80,9 @@ public class HiveDatasetFinder implements IterableDatasetFinder<HiveDataset> {
public static final String DEFAULT_TABLE_PATTERN = "*";
public static final String TABLE_FILTER = HIVE_DATASET_PREFIX + ".tableFilter";

// Property used to filter tables only physically within a folder, represented by a regex
public static final String TABLE_FOLDER_ALLOWLIST_FILTER = HIVE_DATASET_PREFIX + ".tableFolderAllowlistFilter";

/*
* By setting the prefix, only config keys with this prefix will be used to build a HiveDataset.
* By passing scoped configurations the same config keys can be used in different contexts.
Expand Down Expand Up @@ -118,6 +121,8 @@ public class HiveDatasetFinder implements IterableDatasetFinder<HiveDataset> {
protected final Function<Table, String> configStoreDatasetUriBuilder;
protected final Optional<Predicate<Table>> tableFilter;

protected final Optional<Pattern> tableFolderAllowlistRegex;

protected final String datasetConfigPrefix;
protected final ConfigClient configClient;
private final Config jobConfig;
Expand Down Expand Up @@ -194,6 +199,8 @@ protected HiveDatasetFinder(FileSystem fs, Properties properties, HiveMetastoreC
} else {
this.tableFilter = Optional.absent();
}
this.tableFolderAllowlistRegex = properties.containsKey(TABLE_FOLDER_ALLOWLIST_FILTER) ?
Optional.of(Pattern.compile(properties.getProperty(TABLE_FOLDER_ALLOWLIST_FILTER))): Optional.absent();
}

protected static HiveMetastoreClientPool createClientPool(Properties properties) throws IOException {
Expand Down Expand Up @@ -262,7 +269,10 @@ protected HiveDataset computeNext() {

try (AutoReturnableObject<IMetaStoreClient> client = HiveDatasetFinder.this.clientPool.getClient()) {
Table table = client.get().getTable(dbAndTable.getDb(), dbAndTable.getTable());
if (tableFilter.isPresent() && !tableFilter.get().apply(table)) {
if ((tableFilter.isPresent() && !tableFilter.get().apply(table))
|| !shouldAllowTableLocation(tableFolderAllowlistRegex, table)) {
log.info("Ignoring table {} as its underlying location {} does not pass allowlist regex {}", dbAndTable,
table.getSd().getLocation(), tableFolderAllowlistRegex.get());
continue;
}

Expand Down Expand Up @@ -294,6 +304,12 @@ protected HiveDataset computeNext() {
};
}

protected static boolean shouldAllowTableLocation(Optional<Pattern> regex, Table table) {
if (!regex.isPresent()) {
return true;
}
return regex.get().matcher(table.getSd().getLocation()).matches();
}

/**
* @deprecated Use {@link #createHiveDataset(Table, Config)} instead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,51 @@ public void testDatasetConfig() throws Exception {

}

@Test
public void testHiveTableFolderAllowlistFilter() throws Exception {
List<HiveDatasetFinder.DbAndTable> dbAndTables = Lists.newArrayList();
dbAndTables.add(new HiveDatasetFinder.DbAndTable("db1", "table1"));
// This table is created on /tmp/test
HiveMetastoreClientPool pool = getTestPool(dbAndTables);

Properties properties = new Properties();
properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + WhitelistBlacklist.WHITELIST, "");
// Try a regex with multiple groups
properties.put(HiveDatasetFinder.TABLE_FOLDER_ALLOWLIST_FILTER, "(/tmp/|a).*");

HiveDatasetFinder finder = new TestHiveDatasetFinder(FileSystem.getLocal(new Configuration()), properties, pool);
List<HiveDataset> datasets = Lists.newArrayList(finder.getDatasetsIterator());

Assert.assertEquals(datasets.size(), 1);

properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + WhitelistBlacklist.WHITELIST, "");
// The table located at /tmp/test should be filtered
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be helpful to call out that the dataset table is created at path /tmp/test on line 221 since there is another assertion above using a different regex which doesn't filter the table

properties.put(HiveDatasetFinder.TABLE_FOLDER_ALLOWLIST_FILTER, "/a/b");

finder = new TestHiveDatasetFinder(FileSystem.getLocal(new Configuration()), properties, pool);
datasets = Lists.newArrayList(finder.getDatasetsIterator());

Assert.assertEquals(datasets.size(), 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can also add a test for the case where the regex is empty or null


// Test empty filter
properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + WhitelistBlacklist.WHITELIST, "");
// The table located at /tmp/test should be filtered
properties.put(HiveDatasetFinder.TABLE_FOLDER_ALLOWLIST_FILTER, "");

finder = new TestHiveDatasetFinder(FileSystem.getLocal(new Configuration()), properties, pool);
datasets = Lists.newArrayList(finder.getDatasetsIterator());

Assert.assertEquals(datasets.size(), 0);

// Test no regex config
properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + WhitelistBlacklist.WHITELIST, "");

finder = new TestHiveDatasetFinder(FileSystem.getLocal(new Configuration()), properties, pool);
datasets = Lists.newArrayList(finder.getDatasetsIterator());

Assert.assertEquals(datasets.size(), 0);
}

private HiveMetastoreClientPool getTestPool(List<HiveDatasetFinder.DbAndTable> dbAndTables) throws Exception {

SetMultimap<String, String> entities = HashMultimap.create();
Expand Down
Loading