diff --git a/pom.xml b/pom.xml index b58ab7cd1ba64..dcd36597ae726 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 pom presto-root @@ -28,9 +28,9 @@ - scm:git:git://github.com/facebook/presto.git - https://github.com/facebook/presto - 0.143 + scm:git:git://github.com/twitter-forks/presto.git + https://github.com/twitter-forks/presto + 0.143-tw-0.21 @@ -663,7 +663,7 @@ org.apache.zookeeper zookeeper - 3.3.6 + 3.4.6 junit @@ -673,6 +673,14 @@ log4j log4j + + org.slf4j + slf4j-jdk14 + + + org.slf4j + slf4j-log4j12 + @@ -729,6 +737,61 @@ hive-apache-jdbc 0.13.1-1 + + + + com.twitter + presto-thrift-java + 0.0.1 + + + com.twitter + util-core_2.11 + + + com.twitter + util-core-java + + + com.twitter + util-function_2.10 + + + com.twitter + util-function-java + + + commons-logging + commons-logging + + + org.scala-lang.modules + scala-parser-combinators_2.11 + + + + + com.twitter + util-logging_2.11 + 6.33.0 + + + commons-logging + commons-logging + + + + + org.scala-lang + scala-library + 2.11.7 + + + commons-logging + commons-logging + + + diff --git a/presto-base-jdbc/pom.xml b/presto-base-jdbc/pom.xml index f97c8bba0c51b..f5509ae7ace0d 100644 --- a/presto-base-jdbc/pom.xml +++ b/presto-base-jdbc/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-base-jdbc diff --git a/presto-benchmark-driver/pom.xml b/presto-benchmark-driver/pom.xml index 428c8c6494768..f4798cbed1db5 100644 --- a/presto-benchmark-driver/pom.xml +++ b/presto-benchmark-driver/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-benchmark-driver diff --git a/presto-benchmark/pom.xml b/presto-benchmark/pom.xml index 2d6d529d0fae3..9cdb2ba7a288c 100644 --- a/presto-benchmark/pom.xml +++ b/presto-benchmark/pom.xml @@ -5,7 +5,7 @@ presto-root com.facebook.presto - 0.143 + 0.143-tw-0.21 presto-benchmark diff --git a/presto-blackhole/pom.xml b/presto-blackhole/pom.xml index f939fcd80924f..cee636f284573 100644 --- a/presto-blackhole/pom.xml +++ b/presto-blackhole/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-blackhole diff --git a/presto-bytecode/pom.xml b/presto-bytecode/pom.xml index b6558dd1ae17a..3f9d4cc315b87 100644 --- a/presto-bytecode/pom.xml +++ b/presto-bytecode/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-bytecode diff --git a/presto-cassandra/pom.xml b/presto-cassandra/pom.xml index d75efa6ae5d03..1584fab21a1ae 100644 --- a/presto-cassandra/pom.xml +++ b/presto-cassandra/pom.xml @@ -4,7 +4,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-cassandra diff --git a/presto-cli/pom.xml b/presto-cli/pom.xml index 841119ce5d509..0112a2ec8df35 100644 --- a/presto-cli/pom.xml +++ b/presto-cli/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-cli diff --git a/presto-cli/src/main/java/com/facebook/presto/cli/ClientOptions.java b/presto-cli/src/main/java/com/facebook/presto/cli/ClientOptions.java index 0ab462b6026ed..77b8be9091656 100644 --- a/presto-cli/src/main/java/com/facebook/presto/cli/ClientOptions.java +++ b/presto-cli/src/main/java/com/facebook/presto/cli/ClientOptions.java @@ -17,6 +17,7 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableMap; import com.google.common.net.HostAndPort; +import com.sun.security.auth.module.UnixSystem; import io.airlift.airline.Option; import io.airlift.http.client.spnego.KerberosConfig; import io.airlift.units.Duration; @@ -68,8 +69,9 @@ public class ClientOptions @Option(name = "--keystore-password", title = "keystore password", description = "Keystore password") public String keystorePassword; - @Option(name = "--user", title = "user", description = "Username") - public String user = System.getProperty("user.name"); + // Pick the user name for the logged in user. + // Do not let it be overridden by users. + public String user = new UnixSystem().getUsername(); @Option(name = "--source", title = "source", description = "Name of source making query") public String source = "presto-cli"; diff --git a/presto-client/pom.xml b/presto-client/pom.xml index 1afa4e2137b90..86694f5e83512 100644 --- a/presto-client/pom.xml +++ b/presto-client/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-client diff --git a/presto-docs/pom.xml b/presto-docs/pom.xml index 0aa73f400de00..802a03d546d30 100644 --- a/presto-docs/pom.xml +++ b/presto-docs/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-docs diff --git a/presto-example-http/pom.xml b/presto-example-http/pom.xml index e2cce02150c6e..93ecc58122af9 100644 --- a/presto-example-http/pom.xml +++ b/presto-example-http/pom.xml @@ -4,7 +4,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-example-http diff --git a/presto-hive-cdh4/pom.xml b/presto-hive-cdh4/pom.xml index d6249e4c76498..98bf09737d7f0 100644 --- a/presto-hive-cdh4/pom.xml +++ b/presto-hive-cdh4/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-hive-cdh4 diff --git a/presto-hive-cdh5/pom.xml b/presto-hive-cdh5/pom.xml index 45dafbefb8e4c..5a5325b78ffd5 100644 --- a/presto-hive-cdh5/pom.xml +++ b/presto-hive-cdh5/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-hive-cdh5 diff --git a/presto-hive-hadoop1/pom.xml b/presto-hive-hadoop1/pom.xml index 5dd2f1cdb7b15..c497b8c804e20 100644 --- a/presto-hive-hadoop1/pom.xml +++ b/presto-hive-hadoop1/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-hive-hadoop1 diff --git a/presto-hive-hadoop2/pom.xml b/presto-hive-hadoop2/pom.xml index 24e1bd0aa0d15..08a42164f5a95 100644 --- a/presto-hive-hadoop2/pom.xml +++ b/presto-hive-hadoop2/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-hive-hadoop2 diff --git a/presto-hive/pom.xml b/presto-hive/pom.xml index 51de1140a3857..f2b6bfe180240 100644 --- a/presto-hive/pom.xml +++ b/presto-hive/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-hive @@ -43,6 +43,44 @@ hive-apache + + org.apache.curator + curator-recipes + 2.8.0 + + + + org.apache.curator + curator-framework + 2.8.0 + + + + org.apache.curator + curator-client + 2.8.0 + + + + org.apache.curator + curator-test + 2.8.0 + test + + + + org.apache.zookeeper + zookeeper + 3.4.6 + test + + + + com.101tec + zkclient + test + + org.apache.thrift libthrift @@ -88,6 +126,12 @@ configuration + + com.googlecode.json-simple + json-simple + 1.1 + + com.google.guava guava diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java b/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java index 0c854f66abcda..cd8d88e0d9e82 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java @@ -19,6 +19,7 @@ import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.HostAddress; import com.facebook.presto.spi.predicate.TupleDomain; +import com.facebook.presto.twitter.hive.util.UgiUtils; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import io.airlift.units.DataSize; @@ -38,8 +39,10 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.security.UserGroupInformation; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; import java.util.Deque; @@ -138,8 +141,15 @@ public BackgroundHiveSplitLoader( public void start(HiveSplitSource splitSource) { this.hiveSplitSource = splitSource; + + UserGroupInformation ugi = null; + + if (HiveSessionProperties.getReadAsQueryUser(session)) { + ugi = UgiUtils.getUgi(session.getUser()); + } + for (int i = 0; i < maxPartitionBatchSize; i++) { - ResumableTasks.submit(executor, new HiveSplitLoaderTask()); + ResumableTasks.submit(executor, new HiveSplitLoaderTask(ugi)); } } @@ -152,8 +162,30 @@ public void stop() private class HiveSplitLoaderTask implements ResumableTask { + private UserGroupInformation ugi; + + public HiveSplitLoaderTask(UserGroupInformation ugi) + { + this.ugi = ugi; + } + @Override public TaskStatus process() + { + if (ugi != null) { + try { + return ugi.doAs((PrivilegedExceptionAction) this::doProcess); + } + catch (IOException | InterruptedException e) { + throw new RuntimeException("Could not runAs " + session.getUser(), e); + } + } + else { + return doProcess(); + } + } + + private TaskStatus doProcess() { while (true) { if (stopped) { diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java index d8c26f635e5df..f4d51c6424e53 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java @@ -68,6 +68,8 @@ public class HiveClientConfig private boolean allowCorruptWritesForTesting; + private boolean readAsQueryUser = false; + private Duration metastoreCacheTtl = new Duration(1, TimeUnit.HOURS); private Duration metastoreRefreshInterval = new Duration(1, TimeUnit.SECONDS); private int maxMetastoreRefreshThreads = 100; @@ -293,6 +295,19 @@ public HiveClientConfig setAllowCorruptWritesForTesting(boolean allowCorruptWrit return this; } + public boolean getReadAsQueryUser() + { + return readAsQueryUser; + } + + @Config("hive.read-as-query-user") + @ConfigDescription("When querying hive read data as the user submitting the query instead of as the presto daemon user") + public HiveClientConfig setReadAsQueryUser(boolean readAsQueryUser) + { + this.readAsQueryUser = readAsQueryUser; + return this; + } + public boolean getAllowAddColumn() { return this.allowAddColumn; diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java index 5365fb1dde4c3..e7e0bb2d136b0 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java @@ -86,8 +86,6 @@ public void configure(Binder binder) newExporter(binder).export(NamenodeStats.class).as(generatedNameOf(NamenodeStats.class)); binder.bind(HiveMetastoreClientFactory.class).in(Scopes.SINGLETON); - binder.bind(HiveCluster.class).to(StaticHiveCluster.class).in(Scopes.SINGLETON); - configBinder(binder).bindConfig(StaticMetastoreConfig.class); binder.bind(TypeManager.class).toInstance(typeManager); binder.bind(PageIndexerFactory.class).toInstance(pageIndexerFactory); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveConnectorFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveConnectorFactory.java index 39ee1755289d0..81f0d9d75498b 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveConnectorFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveConnectorFactory.java @@ -27,6 +27,9 @@ import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider; import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager; import com.facebook.presto.spi.type.TypeManager; +import com.facebook.presto.twitter.hive.MetastoreStaticClusterModule; +import com.facebook.presto.twitter.hive.MetastoreZkDiscoveryBasedModule; +import com.facebook.presto.twitter.hive.ZookeeperServersetMetastoreConfig; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; import com.google.inject.Injector; @@ -97,6 +100,14 @@ public Connector create(String connectorId, Map config) new MBeanModule(), new JsonModule(), new HiveClientModule(connectorId, metastore, typeManager, pageIndexerFactory), + installModuleIf( + ZookeeperServersetMetastoreConfig.class, + zkMetastoreConfig -> zkMetastoreConfig.getZookeeperServerHostAndPort() == null, + new MetastoreStaticClusterModule()), + installModuleIf( + ZookeeperServersetMetastoreConfig.class, + zkMetastoreConfig -> zkMetastoreConfig.getZookeeperServerHostAndPort() != null, + new MetastoreZkDiscoveryBasedModule()), installModuleIf( SecurityConfig.class, security -> ALLOW_ALL_ACCESS_CONTROL.equalsIgnoreCase(security.getSecuritySystem()), diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveHdfsConfiguration.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveHdfsConfiguration.java index 8ad5acc65c9f0..dc6d0f46f3ede 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveHdfsConfiguration.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveHdfsConfiguration.java @@ -18,25 +18,19 @@ import javax.inject.Inject; import java.net.URI; -import java.util.Map; import static java.util.Objects.requireNonNull; public class HiveHdfsConfiguration implements HdfsConfiguration { - private static final Configuration DEFAULT_CONFIGURATION = new Configuration(); - @SuppressWarnings("ThreadLocalNotStaticFinal") private final ThreadLocal hadoopConfiguration = new ThreadLocal() { @Override protected Configuration initialValue() { - Configuration config = new Configuration(false); - for (Map.Entry entry : DEFAULT_CONFIGURATION) { - config.set(entry.getKey(), entry.getValue()); - } + Configuration config = new Configuration(); updater.updateConfiguration(config); return config; } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java index 9b125ca9798a5..ee6ddbd7225dc 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java @@ -23,14 +23,18 @@ import com.facebook.presto.spi.predicate.TupleDomain; import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.TypeManager; +import com.facebook.presto.twitter.hive.util.UgiUtils; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.joda.time.DateTimeZone; import javax.inject.Inject; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.Optional; import java.util.Properties; @@ -68,6 +72,24 @@ public HivePageSourceProvider( @Override public ConnectorPageSource createPageSource(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, List columns) + { + if (HiveSessionProperties.getReadAsQueryUser(session)) { + UserGroupInformation ugi = UgiUtils.getUgi(session.getUser()); + try { + return ugi.doAs((PrivilegedExceptionAction) () -> + doCreatePageSource(session, split, columns) + ); + } + catch (IOException | InterruptedException e) { + throw new RuntimeException("Could not runAs " + session.getUser(), e); + } + } + else { + return doCreatePageSource(session, split, columns); + } + } + + private ConnectorPageSource doCreatePageSource(ConnectorSession session, ConnectorSplit split, List columns) { HiveSplit hiveSplit = checkType(split, HiveSplit.class, "split"); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java index 833afdd000806..f2d4007a27f4e 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java @@ -34,6 +34,7 @@ public final class HiveSessionProperties private static final String ORC_STREAM_BUFFER_SIZE = "orc_stream_buffer_size"; private static final String PARQUET_PREDICATE_PUSHDOWN_ENABLED = "parquet_predicate_pushdown_enabled"; private static final String PARQUET_OPTIMIZED_READER_ENABLED = "parquet_optimized_reader_enabled"; + private static final String READ_AS_QUERY_USER = "read_as_query_user"; private final List> sessionProperties; @@ -46,6 +47,11 @@ public HiveSessionProperties(HiveClientConfig config) "Only schedule splits on workers colocated with data node", config.isForceLocalScheduling(), false), + booleanSessionProperty( + READ_AS_QUERY_USER, + "Query reads happen as the user submitting the query", + config.getReadAsQueryUser(), + true), dataSizeSessionProperty( ORC_MAX_MERGE_DISTANCE, "ORC: Maximum size of gap between two reads to merge into a single read", @@ -108,6 +114,11 @@ public static boolean isParquetPredicatePushdownEnabled(ConnectorSession session return session.getProperty(PARQUET_PREDICATE_PUSHDOWN_ENABLED, Boolean.class); } + public static boolean getReadAsQueryUser(ConnectorSession session) + { + return session.getProperty(READ_AS_QUERY_USER, Boolean.class); + } + public static PropertyMetadata dataSizeSessionProperty(String name, String description, DataSize defaultValue, boolean hidden) { return new PropertyMetadata<>( diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java index 5aea4fe9ab876..b545ef61e7541 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java @@ -35,9 +35,13 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import javax.inject.Inject; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -220,7 +224,7 @@ private Iterable getPartitionMetadata(Table table, Schema throw new PrestoException(INTERNAL_ERROR, "Partition not loaded: " + hivePartition); } - // verify all partition is online + // verify all partitions are online String protectMode = partition.getParameters().get(ProtectMode.PARAMETER_NAME); String partName = createPartitionName(partition, table); if (protectMode != null && getProtectModeFromString(protectMode).offline) { @@ -231,32 +235,8 @@ private Iterable getPartitionMetadata(Table table, Schema throw new PartitionOfflineException(tableName, partName, format("Partition '%s' is offline for Presto: %s", partName, prestoOffline)); } - // Verify that the partition schema matches the table schema. - // Either adding or dropping columns from the end of the table - // without modifying existing partitions is allowed, but every - // column that exists in both the table and partition must have - // the same type. - List tableColumns = table.getSd().getCols(); - List partitionColumns = partition.getSd().getCols(); - if ((tableColumns == null) || (partitionColumns == null)) { - throw new PrestoException(HIVE_INVALID_METADATA, format("Table '%s' or partition '%s' has null columns", tableName, partName)); - } - for (int i = 0; i < min(partitionColumns.size(), tableColumns.size()); i++) { - String tableType = tableColumns.get(i).getType(); - String partitionType = partitionColumns.get(i).getType(); - if (!tableType.equals(partitionType)) { - throw new PrestoException(HIVE_PARTITION_SCHEMA_MISMATCH, format("" + - "There is a mismatch between the table and partition schemas. " + - "The column '%s' in table '%s' is declared as type '%s', " + - "but partition '%s' declared column '%s' as type '%s'.", - tableColumns.get(i).getName(), - tableName, - tableType, - partName, - partitionColumns.get(i).getName(), - partitionType)); - } - } + verifySchemaEvolution(tableName, table.getSd().getCols(), + partName, partition.getSd().getCols()); results.add(new HivePartitionMetadata(hivePartition, partition)); } @@ -266,6 +246,61 @@ private Iterable getPartitionMetadata(Table table, Schema return concat(partitionBatches); } + /** + * Verify that the partition schema is backward compatible with the table schema. Either + * adding or dropping columns from the end of the table without modifying existing partitions is + * allowed, but every column that exists in both the table and partition must have the same type. + * + * If the type is a nested type (i.e. a struct), verify all partition struct fields exist in the + * table struct fields in with the same name, type and order. Af that condition is met, additional + * table struct fields are permitted. + * + * @throws PrestoException if the schema has not evolved in a supported way + */ + private void verifySchemaEvolution(SchemaTableName tableName, List tableColumns, + String partitionName, List partitionColumns) throws PrestoException + { + if ((tableColumns == null) || (partitionColumns == null)) { + throw new PrestoException(HIVE_INVALID_METADATA, format("Table '%s' or partition '%s' has null columns", tableName, partitionName)); + } + for (int i = 0; i < min(partitionColumns.size(), tableColumns.size()); i++) { + HiveType tableType = HiveType.valueOf(tableColumns.get(i).getType()); + HiveType partitionType = HiveType.valueOf(partitionColumns.get(i).getType()); + boolean validEvolution; + if (isStruct(tableType) && isStruct(partitionType)) { + ArrayList tableFieldTypes = getStructFields(tableType); + ArrayList partitionFieldTypes = getStructFields(partitionType); + validEvolution = tableFieldTypes.subList(0, partitionFieldTypes.size()).equals(partitionFieldTypes); + } + else { + validEvolution = tableType.equals(partitionType); + } + + if (!validEvolution) { + throw new PrestoException(HIVE_PARTITION_SCHEMA_MISMATCH, format("" + + "There is a mismatch between the table and partition schemas. " + + "The column '%s' in table '%s' is declared as type '%s', " + + "but partition '%s' declared column '%s' as type '%s'.", + tableColumns.get(i).getName(), + tableName, + tableType, + partitionName, + partitionColumns.get(i).getName(), + partitionType)); + } + } + } + + private static boolean isStruct(HiveType type) + { + return type.getCategory() == Category.STRUCT; + } + + private static ArrayList getStructFields(HiveType structHiveType) + { + return ((StructTypeInfo) structHiveType.getTypeInfo()).getAllStructFieldTypeInfos(); + } + /** * Partition the given list in exponentially (power of 2) increasing batch sizes starting at 1 up to maxBatchSize */ diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetHiveRecordCursor.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetHiveRecordCursor.java index 46b06da792dec..c47d4948416fe 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetHiveRecordCursor.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetHiveRecordCursor.java @@ -729,7 +729,7 @@ public ParquetStructConverter(Type prestoType, String columnName, GroupType entr List prestoTypeParameters = prestoType.getTypeParameters(); List fieldTypes = entryType.getFields(); checkArgument( - prestoTypeParameters.size() == fieldTypes.size(), + prestoTypeParameters.size() >= fieldTypes.size(), "Schema mismatch, metastore schema for row column %s has %s fields but parquet schema has %s fields", columnName, prestoTypeParameters.size(), @@ -739,7 +739,7 @@ public ParquetStructConverter(Type prestoType, String columnName, GroupType entr this.fieldIndex = fieldIndex; ImmutableList.Builder converters = ImmutableList.builder(); - for (int i = 0; i < prestoTypeParameters.size(); i++) { + for (int i = 0; i < fieldTypes.size(); i++) { parquet.schema.Type fieldType = fieldTypes.get(i); converters.add(createConverter(prestoTypeParameters.get(i), columnName + "." + fieldType.getName(), fieldType, i)); } @@ -784,7 +784,7 @@ public void end() for (BlockConverter converter : converters) { converter.afterValue(); } - while (currentEntryBuilder.getPositionCount() < converters.size()) { + while (currentEntryBuilder.getPositionCount() < rowType.getTypeParameters().size()) { currentEntryBuilder.appendNull(); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTypeUtils.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTypeUtils.java index 536c9a3b711de..bdd315f172183 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTypeUtils.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTypeUtils.java @@ -26,7 +26,7 @@ private ParquetTypeUtils() public static parquet.schema.Type getParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames) { if (useParquetColumnNames) { - return getParquetTypeByName(column.getName(), messageType); + return findParquetTypeByName(column, messageType); } if (column.getHiveColumnIndex() < messageType.getFieldCount()) { @@ -35,6 +35,28 @@ public static parquet.schema.Type getParquetType(HiveColumnHandle column, Messag return null; } + /** + * Find the column type by name using returning the first match with the following logic: + *
    + *
  • direct match
  • + *
  • case-insensitive match
  • + *
  • if the name ends with _, remove it and direct match
  • + *
  • if the name ends with _, remove it and case-insensitive match
  • + *
+ */ + private static parquet.schema.Type findParquetTypeByName(HiveColumnHandle column, MessageType messageType) + { + String name = column.getName(); + Type type = getParquetTypeByName(name, messageType); + + // when a parquet field is a hive keyword we append an _ to it in hive. When doing + // a name-based lookup, we need to strip it off again if we didn't get a direct match. + if (type == null && name.endsWith("_")) { + type = getParquetTypeByName(name.substring(0, name.length() - 1), messageType); + } + return type; + } + private static parquet.schema.Type getParquetTypeByName(String columnName, MessageType messageType) { if (messageType.containsField(columnName)) { diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java b/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java index 5d34229e140cf..79f59a2b1966a 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java @@ -33,7 +33,6 @@ import java.util.Properties; import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; -import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND; import static java.util.Objects.requireNonNull; public class HiveFileIterator @@ -94,7 +93,8 @@ protected LocatedFileStatus computeNext() return endOfData(); } catch (FileNotFoundException e) { - throw new PrestoException(HIVE_FILE_NOT_FOUND, "Partition location does not exist: " + path); + // We are okay if the path does not exist. + return endOfData(); } catch (IOException e) { throw new PrestoException(HIVE_FILESYSTEM_ERROR, e); diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreStaticClusterModule.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreStaticClusterModule.java new file mode 100644 index 0000000000000..b8c1b7dc9a60d --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreStaticClusterModule.java @@ -0,0 +1,34 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive; + +import com.facebook.presto.hive.HiveCluster; +import com.facebook.presto.hive.StaticHiveCluster; +import com.facebook.presto.hive.StaticMetastoreConfig; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Scopes; + +import static io.airlift.configuration.ConfigBinder.configBinder; + +public class MetastoreStaticClusterModule + implements Module +{ + @Override + public void configure(Binder binder) + { + binder.bind(HiveCluster.class).to(StaticHiveCluster.class).in(Scopes.SINGLETON); + configBinder(binder).bindConfig(StaticMetastoreConfig.class); + } +} diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreZkDiscoveryBasedModule.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreZkDiscoveryBasedModule.java new file mode 100644 index 0000000000000..775a5afaf4c81 --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreZkDiscoveryBasedModule.java @@ -0,0 +1,32 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive; + +import com.facebook.presto.hive.HiveCluster; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Scopes; + +import static io.airlift.configuration.ConfigBinder.configBinder; + +public class MetastoreZkDiscoveryBasedModule + implements Module +{ + @Override + public void configure(Binder binder) + { + binder.bind(HiveCluster.class).to(ZookeeperServersetHiveCluster.class).in(Scopes.SINGLETON); + configBinder(binder).bindConfig(ZookeeperServersetMetastoreConfig.class); + } +} diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperMetastoreMonitor.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperMetastoreMonitor.java new file mode 100644 index 0000000000000..e1d0f2011468d --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperMetastoreMonitor.java @@ -0,0 +1,119 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive; + +import com.google.common.net.HostAndPort; + +import io.airlift.log.Logger; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.ZKPaths; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + +public class ZookeeperMetastoreMonitor implements PathChildrenCacheListener +{ + public static final Logger log = Logger.get(ZookeeperMetastoreMonitor.class); + private CuratorFramework client; + private PathChildrenCache cache; + private ConcurrentMap servers; // (Node_Name->HostAndPort) + + public ZookeeperMetastoreMonitor(String zkServer, String watchPath, int maxRetries, int retrySleepTime) + throws Exception + { + client = CuratorFrameworkFactory.newClient(zkServer, new ExponentialBackoffRetry(retrySleepTime, maxRetries)); + client.start(); + + cache = new PathChildrenCache(client, watchPath, true); // true indicating cache node contents in addition to the stat + try { + cache.start(); + } + catch (Exception ex) { + throw new RuntimeException("Curator PathCache Creation failed: " + ex.getMessage()); + } + + cache.getListenable().addListener(this); + servers = new ConcurrentHashMap<>(); + } + + public void close() + { + client.close(); + + try { + cache.close(); + } + catch (IOException ex) { + // do nothing + } + } + + public List getServers() + { + return servers.values().stream().collect(Collectors.toList()); + } + + private HostAndPort deserialize(byte[] bytes) + { + String serviceEndpoint = "serviceEndpoint"; + JSONObject data = (JSONObject) JSONValue.parse(new String(bytes)); + if (data != null && data.containsKey(serviceEndpoint)) { + Map hostPortMap = (Map) data.get(serviceEndpoint); + String host = hostPortMap.get("host").toString(); + int port = Integer.parseInt(hostPortMap.get("port").toString()); + return HostAndPort.fromParts(host, port); + } + else { + log.warn("failed to deserialize child node data"); + throw new IllegalArgumentException("No host:port found"); + } + } + + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception + { + switch (event.getType()) { + case CHILD_ADDED: + case CHILD_UPDATED: { + HostAndPort hostPort = deserialize(event.getData().getData()); + String node = ZKPaths.getNodeFromPath(event.getData().getPath()); + log.info("child updated: " + node + ": " + hostPort); + servers.put(node, hostPort); + break; + } + + case CHILD_REMOVED: { + String node = ZKPaths.getNodeFromPath(event.getData().getPath()); + log.info("child removed: " + node); + servers.remove(node); + break; + } + + default: + log.info("connection state changed: " + event.getType()); + break; + } + } +} diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetHiveCluster.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetHiveCluster.java new file mode 100644 index 0000000000000..83642e41c50ff --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetHiveCluster.java @@ -0,0 +1,67 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive; + +import com.facebook.presto.hive.HiveCluster; +import com.facebook.presto.hive.HiveMetastoreClientFactory; +import com.facebook.presto.hive.metastore.HiveMetastoreClient; +import com.facebook.presto.spi.PrestoException; +import com.google.common.net.HostAndPort; +import io.airlift.log.Logger; +import org.apache.thrift.transport.TTransportException; + +import javax.inject.Inject; + +import java.util.List; + +import static com.facebook.presto.hive.HiveErrorCode.HIVE_METASTORE_ERROR; +import static java.util.Objects.requireNonNull; + +public class ZookeeperServersetHiveCluster + implements HiveCluster +{ + private static final Logger log = Logger.get(ZookeeperServersetHiveCluster.class); + private final HiveMetastoreClientFactory clientFactory; + private ZookeeperMetastoreMonitor zkMetastoreMonitor; + + @Inject + public ZookeeperServersetHiveCluster(ZookeeperServersetMetastoreConfig config, HiveMetastoreClientFactory clientFactory) + throws Exception + { + String zkServerHostAndPort = requireNonNull(config.getZookeeperServerHostAndPort(), "zkServerHostAndPort is null"); + String zkMetastorePath = requireNonNull(config.getZookeeperMetastorePath(), "zkMetastorePath is null"); + int zkRetries = requireNonNull(config.getZookeeperMaxRetries(), "zkMaxRetried is null"); + int zkRetrySleepTime = requireNonNull(config.getZookeeperRetrySleepTime(), "zkRetrySleepTime is null"); + this.clientFactory = requireNonNull(clientFactory, "clientFactory is null"); + this.zkMetastoreMonitor = new ZookeeperMetastoreMonitor(zkServerHostAndPort, zkMetastorePath, zkRetries, zkRetrySleepTime); + } + + @Override + public HiveMetastoreClient createMetastoreClient() + { + List metastores = zkMetastoreMonitor.getServers(); + TTransportException lastException = null; + for (HostAndPort metastore : metastores) { + try { + log.info("Connecting to metastore at: " + metastore.toString()); + return clientFactory.create(metastore.getHostText(), metastore.getPort()); + } + catch (TTransportException e) { + lastException = e; + } + } + + throw new PrestoException(HIVE_METASTORE_ERROR, "Failed connecting to Hive metastore", lastException); + } +} diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetMetastoreConfig.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetMetastoreConfig.java new file mode 100644 index 0000000000000..26e36b469d0ca --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetMetastoreConfig.java @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +public class ZookeeperServersetMetastoreConfig +{ + private String zookeeperServerHostAndPort; + private String zookeeperMetastorePath; + private int zookeeperRetrySleepTime = 500; // ms + private int zookeeperMaxRetries = 3; + + public String getZookeeperServerHostAndPort() + { + return zookeeperServerHostAndPort; + } + + @Config("hive.metastore.zookeeper.uri") + @ConfigDescription("Zookeeper Host and Port") + public ZookeeperServersetMetastoreConfig setZookeeperServerHostAndPort(String zookeeperServerHostAndPort) + { + this.zookeeperServerHostAndPort = zookeeperServerHostAndPort; + return this; + } + + public String getZookeeperMetastorePath() + { + return zookeeperMetastorePath; + } + + @Config("hive.metastore.zookeeper.path") + @ConfigDescription("Hive metastore Zookeeper path") + public ZookeeperServersetMetastoreConfig setZookeeperMetastorePath(String zkPath) + { + this.zookeeperMetastorePath = zkPath; + return this; + } + + @NotNull + public int getZookeeperRetrySleepTime() + { + return zookeeperRetrySleepTime; + } + + @Config("hive.metastore.zookeeper.retry.sleeptime") + @ConfigDescription("Zookeeper sleep time between reties") + public ZookeeperServersetMetastoreConfig setZookeeperRetrySleepTime(int zookeeperRetrySleepTime) + { + this.zookeeperRetrySleepTime = zookeeperRetrySleepTime; + return this; + } + + @Min(1) + public int getZookeeperMaxRetries() + { + return zookeeperMaxRetries; + } + + @Config("hive.metastore.zookeeper.max.retries") + @ConfigDescription("Zookeeper max reties") + public ZookeeperServersetMetastoreConfig setZookeeperMaxRetries(int zookeeperMaxRetries) + { + this.zookeeperMaxRetries = zookeeperMaxRetries; + return this; + } +} diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/util/UgiUtils.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/util/UgiUtils.java new file mode 100644 index 0000000000000..6d540bbe3c5ac --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/util/UgiUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive.util; + +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Utility class to handle creating and caching the UserGroupInformation object. + */ +public class UgiUtils +{ + private UgiUtils() {} + + // Every instance of a UserGroupInformation object for a given user has a unique hashcode, due + // to the hashCode() impl. If we don't cache the UGI per-user here, there will be a memory leak + // in the PrestoFileSystemCache. + private static final Map UGI_CACHE = new ConcurrentHashMap<>(); + + public static UserGroupInformation getUgi(String user) + { + UserGroupInformation ugi = UGI_CACHE.get(user); + + if (ugi == null) { + // Configure hadoop to allow presto daemon user to impersonate all presto users + // See https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-common/Superusers.html + try { + ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); + } + catch (IOException e) { + throw new RuntimeException("Could not get login user from UserGroupInformation", e); + } + UGI_CACHE.put(user, ugi); + } + + return ugi; + } +} diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java index c4cf4f91c9d39..14ff33018b808 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java @@ -67,6 +67,7 @@ import com.facebook.presto.testing.TestingConnectorSession; import com.facebook.presto.type.ArrayType; import com.facebook.presto.type.MapType; +import com.facebook.presto.type.RowType; import com.facebook.presto.type.TypeRegistry; import com.facebook.presto.util.ImmutableCollectors; import com.google.common.collect.ImmutableList; @@ -2273,6 +2274,22 @@ protected void assertGetRecords( } } + // STRUCT + index = columnIndex.get("t_struct"); + if (index != null) { + if ((rowNumber % 31) == 0) { + assertNull(row.getField(index)); + } + else { + assertTrue(row.getField(index) instanceof List); + List values = (List) row.getField(index); + assertEquals(values.size(), 3); + assertEquals(values.get(0), "test abc"); + assertEquals(values.get(1), 0.1); + assertNull(values.get(2)); + } + } + // MAP>> index = columnIndex.get("t_complex"); if (index != null) { @@ -2482,7 +2499,7 @@ else if (TIMESTAMP.equals(column.getType())) { else if (DATE.equals(column.getType())) { assertInstanceOf(value, SqlDate.class); } - else if (column.getType() instanceof ArrayType) { + else if (column.getType() instanceof ArrayType || column.getType() instanceof RowType) { assertInstanceOf(value, List.class); } else if (column.getType() instanceof MapType) { diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java index 56a2c896bc857..236d3ab35a233 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java @@ -67,6 +67,7 @@ public void testDefaults() .setHiveStorageFormat(HiveStorageFormat.RCBINARY) .setHiveCompressionCodec(HiveCompressionCodec.GZIP) .setRespectTableFormat(true) + .setReadAsQueryUser(false) .setImmutablePartitions(false) .setMaxPartitionsPerWriter(100) .setUseParquetColumnNames(false) @@ -134,6 +135,7 @@ public void testExplicitPropertyMappings() .put("hive.max-concurrent-file-renames", "100") .put("hive.assume-canonical-partition-keys", "true") .put("hive.parquet.use-column-names", "true") + .put("hive.read-as-query-user", "true") .put("hive.orc.use-column-names", "true") .put("hive.s3.aws-access-key", "abc123") .put("hive.s3.aws-secret-key", "secret") @@ -187,6 +189,7 @@ public void testExplicitPropertyMappings() .setVerifyChecksum(false) .setResourceConfigFiles(ImmutableList.of("/foo.xml", "/bar.xml")) .setHiveStorageFormat(HiveStorageFormat.SEQUENCEFILE) + .setReadAsQueryUser(true) .setHiveCompressionCodec(HiveCompressionCodec.NONE) .setRespectTableFormat(false) .setImmutablePartitions(true) diff --git a/presto-hive/src/test/java/com/facebook/presto/twitter/hive/TestZookeeperMetastoreMonitor.java b/presto-hive/src/test/java/com/facebook/presto/twitter/hive/TestZookeeperMetastoreMonitor.java new file mode 100644 index 0000000000000..1f89464aedce3 --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/twitter/hive/TestZookeeperMetastoreMonitor.java @@ -0,0 +1,157 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive; + +import com.facebook.presto.twitter.hive.util.TestUtils; +import com.google.common.collect.ImmutableList; +import com.google.common.net.HostAndPort; +import io.airlift.log.Logger; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.exception.ZkMarshallingError; +import org.I0Itec.zkclient.serialize.ZkSerializer; +import org.apache.curator.test.TestingServer; +import org.json.simple.JSONObject; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.assertTrue; + +public class TestZookeeperMetastoreMonitor +{ + private static final Logger log = Logger.get(TestZookeeperMetastoreMonitor.class); + + private ZookeeperMetastoreMonitor zkMetastoreMonitor; + private TestingServer zkServer; + private ZkClient zkClient; + private final String zkPath = "/metastores"; + + public TestZookeeperMetastoreMonitor() + throws Exception + { + zkServer = new TestingServer(TestUtils.findUnusedPort()); + zkClient = new ZkClient(zkServer.getConnectString(), 30_000, 30_000); + + // Set the serializer + zkClient.setZkSerializer(new ZkSerializer() { + @Override + public byte[] serialize(Object o) throws ZkMarshallingError + { + try { + return o.toString().getBytes(StandardCharsets.UTF_8); + } + catch (Exception e) { + log.warn("Exception in serializing " + e); + } + return "".getBytes(); + } + + @Override + public Object deserialize(byte[] bytes) throws ZkMarshallingError + { + return null; + } + }); + } + + @AfterClass + public void destroy() + throws IOException + { + zkMetastoreMonitor.close(); + zkClient.close(); + zkServer.close(); + } + + @BeforeTest + public void setUp() + throws Exception + { + log.info("Cleaning up zookeeper"); + zkClient.getChildren("/").stream() + .filter(child -> !child.equals("zookeeper")) + .forEach(child -> zkClient.deleteRecursive("/" + child)); + + zkClient.unsubscribeAll(); + + zkClient.createPersistent(zkPath); + zkMetastoreMonitor = new ZookeeperMetastoreMonitor(zkServer.getConnectString(), zkPath, 3, 500); + } + + @Test + public void testGetServers() throws Exception + { + List servers; + List expected; + assertTrue(zkMetastoreMonitor.getServers().isEmpty()); + + addServerToZk("nameNode1", "host1", 10001); + // Sleep for some time so that event can be propagated. + TimeUnit.MILLISECONDS.sleep(100); + servers = zkMetastoreMonitor.getServers(); + expected = ImmutableList.of(HostAndPort.fromParts("host1", 10001)); + assertTrue(servers.containsAll(expected) && expected.containsAll(servers)); + + addServerToZk("nameNode2", "host2", 10002); + // Sleep for some time so that event can be propagated. + TimeUnit.MILLISECONDS.sleep(100); + servers = zkMetastoreMonitor.getServers(); + expected = ImmutableList.of(HostAndPort.fromParts("host1", 10001), HostAndPort.fromParts("host2", 10002)); + assertTrue(servers.containsAll(expected) && expected.containsAll(servers)); + + // Change value of an existing name node + addServerToZk("nameNode2", "host2", 10003); + // Sleep for some time so that event can be propagated. + TimeUnit.MILLISECONDS.sleep(100); + servers = zkMetastoreMonitor.getServers(); + expected = ImmutableList.of(HostAndPort.fromParts("host1", 10001), HostAndPort.fromParts("host2", 10003)); + assertTrue(servers.containsAll(expected) && expected.containsAll(servers)); + + // Delete an existing name node + zkClient.delete(getPathForNameNode("nameNode1")); + // Sleep for some time so that event can be propagated. + TimeUnit.MILLISECONDS.sleep(100); + servers = zkMetastoreMonitor.getServers(); + expected = ImmutableList.of(HostAndPort.fromParts("host2", 10003)); + assertTrue(servers.containsAll(expected) && expected.containsAll(servers), servers.toString()); + } + + private void addServerToZk(String nameNode, String host, int port) + { + JSONObject serviceEndpoint = new JSONObject(); + serviceEndpoint.put("host", host); + serviceEndpoint.put("port", port); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serviceEndpoint", serviceEndpoint); + + String path = getPathForNameNode(nameNode); + + if (!zkClient.exists(path)) { + zkClient.createPersistent(path, jsonObject.toJSONString()); + } + else { + zkClient.writeData(path, jsonObject.toJSONString()); + } + } + + private String getPathForNameNode(String nameNode) + { + return zkPath + "/" + nameNode; + } +} diff --git a/presto-hive/src/test/java/com/facebook/presto/twitter/hive/TestZookeeperServersetMetastoreConfig.java b/presto-hive/src/test/java/com/facebook/presto/twitter/hive/TestZookeeperServersetMetastoreConfig.java new file mode 100644 index 0000000000000..6992a752dea17 --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/twitter/hive/TestZookeeperServersetMetastoreConfig.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive; + +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Map; + +import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; + +public class TestZookeeperServersetMetastoreConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(ZookeeperServersetMetastoreConfig.class) + .setZookeeperMaxRetries(3) + .setZookeeperRetrySleepTime(500) + .setZookeeperMetastorePath(null) + .setZookeeperServerHostAndPort(null)); + } + + @Test + public void testExplicitPropertyMappingsSingleMetastore() + { + Map properties = new ImmutableMap.Builder() + .put("hive.metastore.zookeeper.uri", "localhost:2181") + .put("hive.metastore.zookeeper.path", "/zookeeper/path/") + .put("hive.metastore.zookeeper.retry.sleeptime", "200") + .put("hive.metastore.zookeeper.max.retries", "2") + .build(); + + ZookeeperServersetMetastoreConfig expected = new ZookeeperServersetMetastoreConfig() + .setZookeeperServerHostAndPort("localhost:2181") + .setZookeeperMetastorePath("/zookeeper/path/") + .setZookeeperRetrySleepTime(200) + .setZookeeperMaxRetries(2); + + assertFullMapping(properties, expected); + } +} diff --git a/presto-hive/src/test/java/com/facebook/presto/twitter/hive/util/TestUtils.java b/presto-hive/src/test/java/com/facebook/presto/twitter/hive/util/TestUtils.java new file mode 100644 index 0000000000000..379ad3877e325 --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/twitter/hive/util/TestUtils.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive.util; + +import java.io.IOException; +import java.net.ServerSocket; + +public final class TestUtils +{ + private TestUtils() {} + + public static int findUnusedPort() + throws IOException + { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + } +} diff --git a/presto-hive/src/test/sql/create-test-hive13.sql b/presto-hive/src/test/sql/create-test-hive13.sql index 70c1d349d1e1b..29235f1ba9961 100644 --- a/presto-hive/src/test/sql/create-test-hive13.sql +++ b/presto-hive/src/test/sql/create-test-hive13.sql @@ -82,7 +82,6 @@ SELECT * FROM presto_test_types_textfile ; --- Parquet fails when trying to use complex nested types. CREATE TABLE presto_test_types_parquet ( t_string STRING , t_tinyint TINYINT @@ -96,12 +95,15 @@ CREATE TABLE presto_test_types_parquet ( , t_binary BINARY , t_map MAP , t_array_string ARRAY -, t_array_struct ARRAY> +, t_array_struct ARRAY> +, t_struct STRUCT ) +PARTITIONED BY (dummy INT) STORED AS PARQUET ; INSERT INTO TABLE presto_test_types_parquet +PARTITION (dummy=0) SELECT t_string , t_tinyint @@ -116,9 +118,12 @@ SELECT , t_map , t_array_string , t_array_struct +, t_array_struct[0] FROM presto_test_types_textfile ; +ALTER TABLE presto_test_types_parquet +CHANGE COLUMN t_struct t_struct STRUCT; ALTER TABLE presto_test_types_textfile ADD COLUMNS (new_column INT); ALTER TABLE presto_test_types_sequencefile ADD COLUMNS (new_column INT); diff --git a/presto-jdbc/pom.xml b/presto-jdbc/pom.xml index 84c75bbc045bf..0649fdbca865f 100644 --- a/presto-jdbc/pom.xml +++ b/presto-jdbc/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-jdbc diff --git a/presto-jmx/pom.xml b/presto-jmx/pom.xml index b9d05f4c16c9f..6e482242ae9e4 100644 --- a/presto-jmx/pom.xml +++ b/presto-jmx/pom.xml @@ -4,7 +4,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-jmx diff --git a/presto-kafka/pom.xml b/presto-kafka/pom.xml index 2aa0870b3928a..5bb58fec8ffc1 100644 --- a/presto-kafka/pom.xml +++ b/presto-kafka/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-kafka diff --git a/presto-kafka/src/test/java/com/facebook/presto/kafka/util/EmbeddedZookeeper.java b/presto-kafka/src/test/java/com/facebook/presto/kafka/util/EmbeddedZookeeper.java index fddf5c8457469..0f319cee1cbce 100644 --- a/presto-kafka/src/test/java/com/facebook/presto/kafka/util/EmbeddedZookeeper.java +++ b/presto-kafka/src/test/java/com/facebook/presto/kafka/util/EmbeddedZookeeper.java @@ -14,7 +14,7 @@ package com.facebook.presto.kafka.util; import com.google.common.io.Files; -import org.apache.zookeeper.server.NIOServerCnxn; +import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; @@ -32,7 +32,7 @@ public class EmbeddedZookeeper private final int port; private final File zkDataDir; private final ZooKeeperServer zkServer; - private final NIOServerCnxn.Factory cnxnFactory; + private final NIOServerCnxnFactory cnxnFactory; private final AtomicBoolean started = new AtomicBoolean(); private final AtomicBoolean stopped = new AtomicBoolean(); @@ -53,7 +53,8 @@ public EmbeddedZookeeper(int port) FileTxnSnapLog ftxn = new FileTxnSnapLog(zkDataDir, zkDataDir); zkServer.setTxnLogFactory(ftxn); - cnxnFactory = new NIOServerCnxn.Factory(new InetSocketAddress(this.port), 0); + cnxnFactory = new NIOServerCnxnFactory(); + cnxnFactory.configure(new InetSocketAddress(this.port), 0); } public void start() diff --git a/presto-main/pom.xml b/presto-main/pom.xml index 5647a209f61fd..51a6e06ebe656 100644 --- a/presto-main/pom.xml +++ b/presto-main/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-main @@ -296,6 +296,24 @@ tpch test
+ + + + com.twitter + presto-thrift-java + + + com.twitter + util-logging_2.11 + + + org.apache.thrift + libthrift + + + org.scala-lang + scala-library + diff --git a/presto-main/src/main/java/com/facebook/presto/event/EventProcessor.java b/presto-main/src/main/java/com/facebook/presto/event/EventProcessor.java new file mode 100644 index 0000000000000..eef91027a4a1c --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/event/EventProcessor.java @@ -0,0 +1,90 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.event; + +import com.facebook.presto.event.query.QueryCompletionEvent; +import com.facebook.presto.event.query.QueryCreatedEvent; +import com.facebook.presto.event.query.QueryEvent; +import com.facebook.presto.event.query.QueryEventHandler; +import com.facebook.presto.event.query.SplitCompletionEvent; +import com.google.inject.Inject; +import io.airlift.event.client.AbstractEventClient; +import io.airlift.event.client.EventType; +import io.airlift.log.Logger; + +import java.io.IOException; +import java.util.Set; + +/** + * Class that listens for airlift events and sends presto events to handlers + */ +public class EventProcessor extends AbstractEventClient +{ + private static final String QUERY_CREATED = "QueryCreated"; + private static final String QUERY_COMPLETION = "QueryCompletion"; + private static final String SPLIT_COMPLETION = "SplitCompletion"; + private static final Logger log = Logger.get(EventProcessor.class); + + private Set> queryCreatedEventHandlers; + private Set> queryCompletionEventHandlers; + private Set> splitCompletionEventHandlers; + + @Inject + public EventProcessor( + Set> queryCreatedEventHandlers, + Set> queryCompletionEventHandlers, + Set> splitCompletionEventHandlers) + { + this.queryCreatedEventHandlers = queryCreatedEventHandlers; + this.queryCompletionEventHandlers = queryCompletionEventHandlers; + this.splitCompletionEventHandlers = splitCompletionEventHandlers; + } + + @Override + protected void postEvent(T event) + throws IOException + { + EventType eventTypeAnnotation = event.getClass().getAnnotation(EventType.class); + if (eventTypeAnnotation == null) { + return; + } + + String type = eventTypeAnnotation.value(); + + switch (type) { + case QUERY_CREATED: + handle(queryCreatedEventHandlers, type, (QueryCreatedEvent) event); + break; + case QUERY_COMPLETION: + handle(queryCompletionEventHandlers, type, (QueryCompletionEvent) event); + break; + case SPLIT_COMPLETION: + handle(splitCompletionEventHandlers, type, (SplitCompletionEvent) event); + break; + } + } + + private void handle(Set> handlers, String type, E event) + { + for (QueryEventHandler handler : handlers) { + try { + handler.handle(event); + } + catch (Throwable e) { + log.error(e, String.format( + "Exception processing %s event for query %s", type, event.getQueryId())); + } + } + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/event/query/QueryCompletionEvent.java b/presto-main/src/main/java/com/facebook/presto/event/query/QueryCompletionEvent.java index c005649e77e25..3c95ffe66ef03 100644 --- a/presto-main/src/main/java/com/facebook/presto/event/query/QueryCompletionEvent.java +++ b/presto-main/src/main/java/com/facebook/presto/event/query/QueryCompletionEvent.java @@ -31,7 +31,7 @@ @Immutable @EventType("QueryCompletion") -public class QueryCompletionEvent +public class QueryCompletionEvent implements QueryEvent { private final QueryId queryId; private final String transactionId; diff --git a/presto-main/src/main/java/com/facebook/presto/event/query/QueryCreatedEvent.java b/presto-main/src/main/java/com/facebook/presto/event/query/QueryCreatedEvent.java index 64871433ecee5..6be296bba8e73 100644 --- a/presto-main/src/main/java/com/facebook/presto/event/query/QueryCreatedEvent.java +++ b/presto-main/src/main/java/com/facebook/presto/event/query/QueryCreatedEvent.java @@ -24,7 +24,7 @@ @Immutable @EventType("QueryCreated") -public class QueryCreatedEvent +public class QueryCreatedEvent implements QueryEvent { private final QueryId queryId; private final String transactionId; diff --git a/presto-main/src/main/java/com/facebook/presto/event/query/QueryEvent.java b/presto-main/src/main/java/com/facebook/presto/event/query/QueryEvent.java new file mode 100644 index 0000000000000..da43e387ee22a --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/event/query/QueryEvent.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.event.query; + +public interface QueryEvent +{ + String getQueryId(); +} diff --git a/presto-main/src/main/java/com/facebook/presto/event/query/QueryEventHandler.java b/presto-main/src/main/java/com/facebook/presto/event/query/QueryEventHandler.java new file mode 100644 index 0000000000000..2aa0bd381a5fc --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/event/query/QueryEventHandler.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.event.query; + +public interface QueryEventHandler +{ + void handle(T event); +} diff --git a/presto-main/src/main/java/com/facebook/presto/event/query/SplitCompletionEvent.java b/presto-main/src/main/java/com/facebook/presto/event/query/SplitCompletionEvent.java index f9374c21b09d1..4a9e206b9962f 100644 --- a/presto-main/src/main/java/com/facebook/presto/event/query/SplitCompletionEvent.java +++ b/presto-main/src/main/java/com/facebook/presto/event/query/SplitCompletionEvent.java @@ -29,7 +29,7 @@ @Immutable @EventType("SplitCompletion") -public class SplitCompletionEvent +public class SplitCompletionEvent implements QueryEvent { private final QueryId queryId; private final StageId stageId; diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java b/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java index f6d470e88352d..7238f4f3f94b0 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java @@ -26,6 +26,7 @@ import io.airlift.discovery.client.ServiceSelector; import io.airlift.discovery.client.ServiceType; import io.airlift.http.client.HttpClient; +import io.airlift.log.Logger; import io.airlift.node.NodeInfo; import io.airlift.units.Duration; @@ -60,6 +61,8 @@ public final class DiscoveryNodeManager implements InternalNodeManager { + private static final Logger log = Logger.get(DiscoveryNodeManager.class); + private static final Duration MAX_AGE = new Duration(5, TimeUnit.SECONDS); private static final Splitter DATASOURCES_SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings(); @@ -124,12 +127,18 @@ public void startPollingNodeStates() Set deadNodes = difference(nodeStates.keySet(), aliveNodeIds).immutableCopy(); nodeStates.keySet().removeAll(deadNodes); + if (deadNodes.size() > 0) { + log.warn("Dead nodes: %s", deadNodes); + } + // Add new nodes for (Node node : aliveNodes) { nodeStates.putIfAbsent(node.getNodeIdentifier(), new RemoteNodeState(httpClient, uriBuilderFrom(node.getHttpUri()).appendPath("/v1/info/state").build())); } + log.debug("Number of alive nodes: %d", nodeStates.size()); + // Schedule refresh nodeStates.values().forEach(RemoteNodeState::asyncRefresh); }, 1, 5, TimeUnit.SECONDS); diff --git a/presto-main/src/main/java/com/facebook/presto/server/HttpRemoteTask.java b/presto-main/src/main/java/com/facebook/presto/server/HttpRemoteTask.java index b97b0767a4199..f6bab1fc32dde 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/HttpRemoteTask.java +++ b/presto-main/src/main/java/com/facebook/presto/server/HttpRemoteTask.java @@ -52,6 +52,7 @@ import io.airlift.http.client.HttpClient; import io.airlift.http.client.HttpStatus; import io.airlift.http.client.Request; +import io.airlift.http.client.StaticBodyGenerator; import io.airlift.http.client.StatusResponseHandler.StatusResponse; import io.airlift.json.JsonCodec; import io.airlift.log.Logger; @@ -477,6 +478,14 @@ private synchronized void sendUpdate() updateErrorTracker.startRequest(); + // TODO: (billg) remove this logging or contribute it upstream + if (log.isDebugEnabled()) { + String size = "unknown"; + if (request.getBodyGenerator() instanceof StaticBodyGenerator) { + size = Integer.toString(((StaticBodyGenerator) request.getBodyGenerator()).getBody().length); + } + log.debug(String.format("scheduleUpdate POST %s, bodySize=%s sourcesSize=%s", request.getUri(), size, sources.size())); + } ListenableFuture> future = httpClient.executeAsync(request, createFullJsonResponseHandler(taskInfoCodec)); currentRequest = future; currentRequestStartNanos = System.nanoTime(); diff --git a/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java b/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java index 99c68965e45c2..f7185a279dc79 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java +++ b/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java @@ -151,7 +151,7 @@ public void run() protected Iterable getAdditionalModules() { - return ImmutableList.of(); + return com.facebook.presto.twitter.TwitterModuleLoader.getAdditionalModules(); } private static void updateDatasources(Announcer announcer, Metadata metadata, ServerConfig serverConfig, NodeSchedulerConfig schedulerConfig) diff --git a/presto-main/src/main/java/com/facebook/presto/twitter/TwitterModuleLoader.java b/presto-main/src/main/java/com/facebook/presto/twitter/TwitterModuleLoader.java new file mode 100644 index 0000000000000..eb23a7650a093 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/twitter/TwitterModuleLoader.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter; + +import com.facebook.presto.event.EventProcessor; +import com.facebook.presto.event.query.QueryCompletionEvent; +import com.facebook.presto.event.query.QueryCreatedEvent; +import com.facebook.presto.event.query.QueryEventHandler; +import com.facebook.presto.event.query.SplitCompletionEvent; +import com.facebook.presto.twitter.logging.QueryLogger; +import com.facebook.presto.twitter.logging.QueryScriber; +import com.google.common.collect.ImmutableList; +import com.google.inject.Module; +import com.google.inject.Scopes; +import com.google.inject.TypeLiteral; +import com.google.inject.multibindings.Multibinder; +import io.airlift.event.client.EventClient; + +/** + * Loader that initializes custom Twitter code to inject into Presto. Whenever + * possible we should use this pattern to inject custom functionality, since it + * makes it easier to differentiate our patches from the core OS code. + * + * If the functionality we wish to add/override isn't currently possible to via + * overriding a guice module, we should contribute the necessary modules/interfaces + * into the OS Presto code base to make it possible. + */ +public class TwitterModuleLoader +{ + private TwitterModuleLoader() + { + } + + public static Iterable getAdditionalModules() + { + return ImmutableList.of( + binder -> Multibinder.newSetBinder(binder, EventClient.class) + .addBinding() + .to(EventProcessor.class) + .in(Scopes.SINGLETON), + binder -> Multibinder.newSetBinder(binder, new TypeLiteral>() {}), + binder -> Multibinder.newSetBinder(binder, new TypeLiteral>() {}), + binder -> Multibinder.newSetBinder(binder, new TypeLiteral>(){}) + .addBinding() + .to(new TypeLiteral(){}) + .in(Scopes.SINGLETON), + binder -> Multibinder.newSetBinder(binder, new TypeLiteral>(){}) + .addBinding() + .to(QueryScriber.class) + .in(Scopes.SINGLETON) + ); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/twitter/logging/QueryLogger.java b/presto-main/src/main/java/com/facebook/presto/twitter/logging/QueryLogger.java new file mode 100644 index 0000000000000..02edd8da2440d --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/twitter/logging/QueryLogger.java @@ -0,0 +1,78 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.logging; + +import com.facebook.presto.event.query.QueryCompletionEvent; +import com.facebook.presto.event.query.QueryEventHandler; +import com.facebook.presto.spi.StandardErrorCode; +import io.airlift.log.Logger; +import io.airlift.units.Duration; + +import java.util.concurrent.TimeUnit; + +/** + * Class that logs query events to a file + */ +public class QueryLogger implements QueryEventHandler +{ + private static final int MAX_QUERY_LENGTH = 1000; + private static final String DASH = "-"; + private static final String COLON = ":"; + private static final String SPACE = " "; + private static final String ELIPSIS = "..."; + private static final String QUERY_COMPLETION = "QueryCompletion"; + + private static final Logger log = Logger.get(QueryLogger.class); + + @Override + public void handle(QueryCompletionEvent event) + { + String errorType = DASH; + String errorCode = DASH; + if (event.getErrorCode() != null) { + errorType = StandardErrorCode.toErrorType(event.getErrorCode()).toString(); + if (event.getErrorCodeName() != null) { + errorCode = event.getErrorCodeName() + COLON + event.getErrorCode(); + } + } + + Duration duration = (new Duration( + event.getQueryWallTimeMs(), TimeUnit.MILLISECONDS)) + .convertToMostSuccinctTimeUnit(); + + log.info(String.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s", + QUERY_COMPLETION, event.getQueryId(), toLogValue(event.getRemoteClientAddress()), + event.getQueryState(), errorType, errorCode, event.getUser(), duration, + event.getSplits(), event.getTotalRows(), event.getTotalBytes(), + cleanseAndTrimQuery(event.getQuery()))); + } + + private static String toLogValue(Object object) + { + if (object == null) { + return DASH; + } + else { + return object.toString(); + } + } + + private static String cleanseAndTrimQuery(String query) + { + if (query.length() > MAX_QUERY_LENGTH) { + query = query.substring(0, MAX_QUERY_LENGTH) + ELIPSIS; + } + return query.replace(System.getProperty("line.separator"), SPACE); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/twitter/logging/QueryScriber.java b/presto-main/src/main/java/com/facebook/presto/twitter/logging/QueryScriber.java new file mode 100644 index 0000000000000..5e55aa1ac195f --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/twitter/logging/QueryScriber.java @@ -0,0 +1,169 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.logging; + +import com.facebook.presto.event.query.QueryCompletionEvent; +import com.facebook.presto.event.query.QueryEventHandler; +import com.google.common.base.Optional; +import com.twitter.logging.BareFormatter$; +import com.twitter.logging.Level; +import com.twitter.logging.QueueingHandler; +import com.twitter.logging.ScribeHandler; +import com.twitter.presto.thriftjava.QueryState; +import io.airlift.log.Logger; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; + +import java.util.Base64; +import java.util.logging.LogRecord; + +/** + * Class that scribes query completion events + */ +public class QueryScriber implements QueryEventHandler +{ + private static final String SCRIBE_CATEGORY = "presto_query_completion"; + private static final int MAX_QUEUE_SIZE = 1000; + + private static final Logger log = Logger.get(QueryScriber.class); + + private QueueingHandler queueingHandler; + + // TSerializer is not thread safe + private final ThreadLocal serializer = new ThreadLocal() + { + @Override protected TSerializer initialValue() + { + return new TSerializer(); + } + }; + + public QueryScriber() + { + ScribeHandler scribeHandler = new ScribeHandler( + ScribeHandler.DefaultHostname(), + ScribeHandler.DefaultPort(), + SCRIBE_CATEGORY, + ScribeHandler.DefaultBufferTime(), + ScribeHandler.DefaultConnectBackoff(), + ScribeHandler.DefaultMaxMessagesPerTransaction(), + ScribeHandler.DefaultMaxMessagesToBuffer(), + BareFormatter$.MODULE$, + scala.Option.apply((Level) null)); + queueingHandler = new QueueingHandler(scribeHandler, MAX_QUEUE_SIZE); + } + + @Override + public void handle(QueryCompletionEvent event) + { + com.twitter.presto.thriftjava.QueryCompletionEvent thriftEvent = toThriftQueryCompletionEvent(event); + Optional message = serializeThriftToString(thriftEvent); + + if (message.isPresent()) { + LogRecord logRecord = new LogRecord(Level.ALL, message.get()); + queueingHandler.publish(logRecord); + } + else { + log.warn("Unable to serialize QueryCompletionEvent: " + event); + } + } + + /** + * Serialize a thrift object to bytes, compress, then encode as a base64 string. + */ + private Optional serializeThriftToString(TBase thriftMessage) + { + try { + return Optional.of( + Base64.getEncoder().encodeToString(serializer.get().serialize(thriftMessage))); + } + catch (TException e) { + log.warn(e, "Could not serialize thrift object" + thriftMessage); + return Optional.absent(); + } + } + + private static com.twitter.presto.thriftjava.QueryCompletionEvent toThriftQueryCompletionEvent(QueryCompletionEvent event) + { + com.twitter.presto.thriftjava.QueryCompletionEvent thriftEvent = + new com.twitter.presto.thriftjava.QueryCompletionEvent(); + + thriftEvent.query_id = event.getQueryId(); + thriftEvent.transaction_id = event.getTransactionId(); + thriftEvent.user = event.getUser(); + thriftEvent.principal = event.getPrincipal(); + thriftEvent.source = event.getSource(); + thriftEvent.server_version = event.getServerVersion(); + thriftEvent.environment = event.getEnvironment(); + thriftEvent.catalog = event.getCatalog(); + thriftEvent.schema = event.getSchema(); + thriftEvent.remote_client_address = event.getRemoteClientAddress(); + thriftEvent.user_agent = event.getUserAgent(); + thriftEvent.query_state = QueryState.valueOf(event.getQueryState()); + thriftEvent.uri = event.getUri(); + thriftEvent.field_names = event.getFieldNames(); + thriftEvent.query = event.getQuery(); + thriftEvent.create_time_ms = event.getCreateTime().getMillis(); + thriftEvent.execution_start_time_ms = event.getExecutionStartTime().getMillis(); + thriftEvent.end_time_ms = event.getEndTime().getMillis(); + thriftEvent.queued_time_ms = event.getQueuedTimeMs(); + if (event.getAnalysisTimeMs() != null) { + thriftEvent.analysis_time_ms = event.getAnalysisTimeMs(); + } + if (event.getDistributedPlanningTimeMs() != null) { + thriftEvent.distributed_planning_time_ms = event.getDistributedPlanningTimeMs(); + } + if (event.getTotalSplitWallTimeMs() != null) { + thriftEvent.total_split_wall_time_ms = event.getTotalSplitWallTimeMs(); + } + if (event.getTotalSplitCpuTimeMs() != null) { + thriftEvent.total_split_cpu_time_ms = event.getTotalSplitCpuTimeMs(); + } + if (event.getTotalBytes() != null) { + thriftEvent.total_bytes = event.getTotalBytes(); + } + if (event.getTotalRows() != null) { + thriftEvent.total_rows = event.getTotalRows(); + } + thriftEvent.splits = event.getSplits(); + if (event.getErrorCode() != null) { + thriftEvent.error_code_id = event.getErrorCode(); + } + thriftEvent.error_code_name = event.getErrorCodeName(); + thriftEvent.failure_type = event.getFailureType(); + thriftEvent.failure_message = event.getFailureMessage(); + thriftEvent.failure_task = event.getFailureTask(); + thriftEvent.failure_host = event.getFailureHost(); + thriftEvent.output_stage_json = event.getOutputStageJson(); + thriftEvent.failures_json = event.getFailuresJson(); + thriftEvent.inputs_json = event.getInputsJson(); + thriftEvent.session_properties_json = event.getSessionPropertiesJson(); + thriftEvent.query_wall_time_ms = event.getQueryWallTimeMs(); + if (event.getBytesPerSec() != null) { + thriftEvent.bytes_per_sec = event.getBytesPerSec(); + } + if (event.getBytesPerCpuSec() != null) { + thriftEvent.bytes_per_cpu_sec = event.getBytesPerCpuSec(); + } + if (event.getRowsPerSec() != null) { + thriftEvent.rows_per_sec = event.getRowsPerSec(); + } + if (event.getRowsPerCpuSec() != null) { + thriftEvent.rows_per_cpu_sec = event.getRowsPerCpuSec(); + } + + return thriftEvent; + } +} diff --git a/presto-ml/pom.xml b/presto-ml/pom.xml index 1899f5fcca69d..c017ed0e635d2 100644 --- a/presto-ml/pom.xml +++ b/presto-ml/pom.xml @@ -4,7 +4,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-ml diff --git a/presto-mysql/pom.xml b/presto-mysql/pom.xml index 35b56aa6b7242..a85b0b403e1fd 100644 --- a/presto-mysql/pom.xml +++ b/presto-mysql/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-mysql diff --git a/presto-orc/pom.xml b/presto-orc/pom.xml index 160e11c44e726..a67f058a17ede 100644 --- a/presto-orc/pom.xml +++ b/presto-orc/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-orc diff --git a/presto-parser/pom.xml b/presto-parser/pom.xml index a9c126945d7a0..97e0abd530f1f 100644 --- a/presto-parser/pom.xml +++ b/presto-parser/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-parser diff --git a/presto-postgresql/pom.xml b/presto-postgresql/pom.xml index 018204dea0e09..7b2409dcb2ea8 100644 --- a/presto-postgresql/pom.xml +++ b/presto-postgresql/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-postgresql diff --git a/presto-product-tests/pom.xml b/presto-product-tests/pom.xml index 6a0ad32d8c947..b14447d4255d1 100644 --- a/presto-product-tests/pom.xml +++ b/presto-product-tests/pom.xml @@ -5,7 +5,7 @@ presto-root com.facebook.presto - 0.143 + 0.143-tw-0.21 presto-product-tests diff --git a/presto-raptor/pom.xml b/presto-raptor/pom.xml index dee555abf4bb4..98098e7bca6bf 100644 --- a/presto-raptor/pom.xml +++ b/presto-raptor/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-raptor diff --git a/presto-record-decoder/pom.xml b/presto-record-decoder/pom.xml index e8b911e4b433f..007a962d5113b 100644 --- a/presto-record-decoder/pom.xml +++ b/presto-record-decoder/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-record-decoder diff --git a/presto-redis/pom.xml b/presto-redis/pom.xml index 8c618e94b57c5..d0e5fcb1a3557 100644 --- a/presto-redis/pom.xml +++ b/presto-redis/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-redis diff --git a/presto-server-rpm/pom.xml b/presto-server-rpm/pom.xml index d4a6474c05bd4..8c699831a5857 100644 --- a/presto-server-rpm/pom.xml +++ b/presto-server-rpm/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-server-rpm diff --git a/presto-server/pom.xml b/presto-server/pom.xml index 2a35482c59a90..0c4200ec3f967 100644 --- a/presto-server/pom.xml +++ b/presto-server/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-server diff --git a/presto-spi/pom.xml b/presto-spi/pom.xml index 39bbf912871a1..26814c291cf90 100644 --- a/presto-spi/pom.xml +++ b/presto-spi/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-spi diff --git a/presto-teradata-functions/pom.xml b/presto-teradata-functions/pom.xml index bb6947fe3db62..9d0629fca8e80 100644 --- a/presto-teradata-functions/pom.xml +++ b/presto-teradata-functions/pom.xml @@ -4,7 +4,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-teradata-functions diff --git a/presto-testing-server-launcher/pom.xml b/presto-testing-server-launcher/pom.xml index a1d92c2c2639f..fca9e2640ba56 100644 --- a/presto-testing-server-launcher/pom.xml +++ b/presto-testing-server-launcher/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 diff --git a/presto-tests/pom.xml b/presto-tests/pom.xml index a81e47fb1e3e5..4ab85400cfe83 100644 --- a/presto-tests/pom.xml +++ b/presto-tests/pom.xml @@ -5,7 +5,7 @@ presto-root com.facebook.presto - 0.143 + 0.143-tw-0.21 presto-tests diff --git a/presto-tpch/pom.xml b/presto-tpch/pom.xml index 62667b35f11d3..38342b08d9b7f 100644 --- a/presto-tpch/pom.xml +++ b/presto-tpch/pom.xml @@ -4,7 +4,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-tpch diff --git a/presto-verifier/pom.xml b/presto-verifier/pom.xml index 21c117a922adc..2d075a28a2140 100644 --- a/presto-verifier/pom.xml +++ b/presto-verifier/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.143 + 0.143-tw-0.21 presto-verifier