diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClient.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClient.java index 7ebb7bdcfb5be..438ffbf04d81f 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClient.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClient.java @@ -152,6 +152,8 @@ public class HiveClient private final DateTimeZone timeZone; private final Executor executor; private final DataSize maxSplitSize; + private final DataSize maxInitialSplitSize; + private final int maxInitialSplits; @Inject public HiveClient(HiveConnectorId connectorId, @@ -173,7 +175,9 @@ public HiveClient(HiveConnectorId connectorId, hiveClientConfig.getMaxOutstandingSplits(), hiveClientConfig.getMaxSplitIteratorThreads(), hiveClientConfig.getMinPartitionBatchSize(), - hiveClientConfig.getMaxPartitionBatchSize()); + hiveClientConfig.getMaxPartitionBatchSize(), + hiveClientConfig.getMaxInitialSplitSize(), + hiveClientConfig.getMaxInitialSplits()); } public HiveClient(HiveConnectorId connectorId, @@ -187,7 +191,9 @@ public HiveClient(HiveConnectorId connectorId, int maxOutstandingSplits, int maxSplitIteratorThreads, int minPartitionBatchSize, - int maxPartitionBatchSize) + int maxPartitionBatchSize, + DataSize maxInitialSplitSize, + int maxInitialSplits) { this.connectorId = checkNotNull(connectorId, "connectorId is null").toString(); @@ -197,6 +203,8 @@ public HiveClient(HiveConnectorId connectorId, this.maxSplitIteratorThreads = maxSplitIteratorThreads; this.minPartitionBatchSize = minPartitionBatchSize; this.maxPartitionBatchSize = maxPartitionBatchSize; + this.maxInitialSplitSize = checkNotNull(maxInitialSplitSize, "maxInitialSplitSize is null"); + this.maxInitialSplits = maxInitialSplits; this.metastore = checkNotNull(metastore, "metastore is null"); this.hdfsEnvironment = checkNotNull(hdfsEnvironment, "hdfsEnvironment is null"); @@ -754,7 +762,9 @@ public ConnectorSplitSource getPartitionSplits(ConnectorTableHandle tableHandle, directoryLister, executor, maxPartitionBatchSize, - hiveTableHandle.getSession()).get(); + hiveTableHandle.getSession(), + maxInitialSplitSize, + maxInitialSplits).get(); } private Iterable getPartitions(final Table table, final SchemaTableName tableName, List partitionNames) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java index 8cf06a923ada1..e3d627a3b345f 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java @@ -20,7 +20,6 @@ import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; import io.airlift.units.DataSize; -import io.airlift.units.DataSize.Unit; import io.airlift.units.Duration; import io.airlift.units.MinDuration; @@ -32,18 +31,22 @@ import java.util.TimeZone; import java.util.concurrent.TimeUnit; +import static io.airlift.units.DataSize.Unit.MEGABYTE; + public class HiveClientConfig { private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings(); private TimeZone timeZone = TimeZone.getDefault(); - private DataSize maxSplitSize = new DataSize(64, Unit.MEGABYTE); + private DataSize maxSplitSize = new DataSize(64, MEGABYTE); private int maxOutstandingSplits = 1_000; private int maxGlobalSplitIteratorThreads = 1_000; private int maxSplitIteratorThreads = 50; private int minPartitionBatchSize = 10; private int maxPartitionBatchSize = 100; + private int maxInitialSplits = 200; + private DataSize maxInitialSplitSize; private Duration metastoreCacheTtl = new Duration(1, TimeUnit.HOURS); private Duration metastoreRefreshInterval = new Duration(2, TimeUnit.MINUTES); @@ -67,6 +70,33 @@ public class HiveClientConfig private List resourceConfigFiles; + public int getMaxInitialSplits() + { + return maxInitialSplits; + } + + @Config("hive.max-initial-splits") + public HiveClientConfig setMaxInitialSplits(int maxInitialSplits) + { + this.maxInitialSplits = maxInitialSplits; + return this; + } + + public DataSize getMaxInitialSplitSize() + { + if (maxInitialSplitSize == null) { + return new DataSize(maxSplitSize.getValue() / 2, maxSplitSize.getUnit()); + } + return maxInitialSplitSize; + } + + @Config("hive.max-initial-split-size") + public HiveClientConfig setMaxInitialSplitSize(DataSize maxInitialSplitSize) + { + this.maxInitialSplitSize = maxInitialSplitSize; + return this; + } + @NotNull public TimeZone getTimeZone() { diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSourceProvider.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSourceProvider.java index a4b2248e17873..c8e9a10961013 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSourceProvider.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSourceProvider.java @@ -116,6 +116,8 @@ public Object getInfo() private final ClassLoader classLoader; private final DataSize maxSplitSize; private final int maxPartitionBatchSize; + private final DataSize maxInitialSplitSize; + private long remainingInitialSplits; private final ConnectorSession session; HiveSplitSourceProvider(String connectorId, @@ -131,7 +133,9 @@ public Object getInfo() DirectoryLister directoryLister, Executor executor, int maxPartitionBatchSize, - ConnectorSession session) + ConnectorSession session, + DataSize maxInitialSplitSize, + int maxInitialSplits) { this.connectorId = connectorId; this.table = table; @@ -148,6 +152,8 @@ public Object getInfo() this.executor = executor; this.session = session; this.classLoader = Thread.currentThread().getContextClassLoader(); + this.maxInitialSplitSize = maxInitialSplitSize; + this.remainingInitialSplits = maxInitialSplits; } public ConnectorSplitSource get() @@ -356,8 +362,14 @@ private List createHiveSplits( // get the addresses for the block List addresses = toHostAddress(blockLocation.getHosts()); + long maxBytes = maxSplitSize.toBytes(); + + if (remainingInitialSplits > 0) { + maxBytes = maxInitialSplitSize.toBytes(); + } + // divide the block into uniform chunks that are smaller than the max split size - int chunks = Math.max(1, (int) (blockLocation.getLength() / maxSplitSize.toBytes())); + int chunks = Math.max(1, (int) (blockLocation.getLength() / maxBytes)); // when block does not divide evenly into chunks, make the chunk size slightly bigger than necessary long targetChunkSize = (long) Math.ceil(blockLocation.getLength() * 1.0 / chunks); @@ -379,6 +391,7 @@ private List createHiveSplits( session)); chunkOffset += chunkLength; + remainingInitialSplits--; } checkState(chunkOffset == blockLocation.getLength(), "Error splitting blocks"); } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java index a85358613bded..16a5f7481f094 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java @@ -210,7 +210,9 @@ protected void setup(String host, int port, String databaseName, String timeZone maxOutstandingSplits, maxThreads, hiveClientConfig.getMinPartitionBatchSize(), - hiveClientConfig.getMaxPartitionBatchSize()); + hiveClientConfig.getMaxPartitionBatchSize(), + hiveClientConfig.getMaxInitialSplitSize(), + hiveClientConfig.getMaxInitialSplits()); metadata = client; splitManager = client; diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java index bf675d0458bbb..d6636099ce34d 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java @@ -50,6 +50,8 @@ public void testDefaults() .setMetastoreTimeout(new Duration(10, TimeUnit.SECONDS)) .setMinPartitionBatchSize(10) .setMaxPartitionBatchSize(100) + .setMaxInitialSplits(200) + .setMaxInitialSplitSize(new DataSize(32, Unit.MEGABYTE)) .setDfsTimeout(new Duration(10, TimeUnit.SECONDS)) .setDfsConnectTimeout(new Duration(500, TimeUnit.MILLISECONDS)) .setDfsConnectMaxRetries(5) @@ -84,6 +86,8 @@ public void testExplicitPropertyMappings() .put("hive.dfs.connect.timeout", "20s") .put("hive.dfs.connect.max-retries", "10") .put("hive.config.resources", "/foo.xml,/bar.xml") + .put("hive.max-initial-splits", "10") + .put("hive.max-initial-split-size", "16MB") .put("dfs.domain-socket-path", "/foo") .put("hive.s3.aws-access-key", "abc123") .put("hive.s3.aws-secret-key", "secret") @@ -107,6 +111,8 @@ public void testExplicitPropertyMappings() .setMetastoreTimeout(new Duration(20, TimeUnit.SECONDS)) .setMinPartitionBatchSize(1) .setMaxPartitionBatchSize(1000) + .setMaxInitialSplits(10) + .setMaxInitialSplitSize(new DataSize(16, Unit.MEGABYTE)) .setDfsTimeout(new Duration(33, TimeUnit.SECONDS)) .setDfsConnectTimeout(new Duration(20, TimeUnit.SECONDS)) .setDfsConnectMaxRetries(10)