Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ public class HiveClient
private final DateTimeZone timeZone;
private final Executor executor;
private final DataSize maxSplitSize;
private final DataSize maxInitialSplitSize;
private final int maxInitialSplits;

@Inject
public HiveClient(HiveConnectorId connectorId,
Expand All @@ -173,7 +175,9 @@ public HiveClient(HiveConnectorId connectorId,
hiveClientConfig.getMaxOutstandingSplits(),
hiveClientConfig.getMaxSplitIteratorThreads(),
hiveClientConfig.getMinPartitionBatchSize(),
hiveClientConfig.getMaxPartitionBatchSize());
hiveClientConfig.getMaxPartitionBatchSize(),
hiveClientConfig.getMaxInitialSplitSize(),
hiveClientConfig.getMaxInitialSplits());
}

public HiveClient(HiveConnectorId connectorId,
Expand All @@ -187,7 +191,9 @@ public HiveClient(HiveConnectorId connectorId,
int maxOutstandingSplits,
int maxSplitIteratorThreads,
int minPartitionBatchSize,
int maxPartitionBatchSize)
int maxPartitionBatchSize,
DataSize maxInitialSplitSize,
int maxInitialSplits)
{
this.connectorId = checkNotNull(connectorId, "connectorId is null").toString();

Expand All @@ -197,6 +203,8 @@ public HiveClient(HiveConnectorId connectorId,
this.maxSplitIteratorThreads = maxSplitIteratorThreads;
this.minPartitionBatchSize = minPartitionBatchSize;
this.maxPartitionBatchSize = maxPartitionBatchSize;
this.maxInitialSplitSize = checkNotNull(maxInitialSplitSize, "maxInitialSplitSize is null");
this.maxInitialSplits = maxInitialSplits;

this.metastore = checkNotNull(metastore, "metastore is null");
this.hdfsEnvironment = checkNotNull(hdfsEnvironment, "hdfsEnvironment is null");
Expand Down Expand Up @@ -754,7 +762,9 @@ public ConnectorSplitSource getPartitionSplits(ConnectorTableHandle tableHandle,
directoryLister,
executor,
maxPartitionBatchSize,
hiveTableHandle.getSession()).get();
hiveTableHandle.getSession(),
maxInitialSplitSize,
maxInitialSplits).get();
}

private Iterable<Partition> getPartitions(final Table table, final SchemaTableName tableName, List<String> partitionNames)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.DataSize;
import io.airlift.units.DataSize.Unit;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;

Expand All @@ -32,18 +31,22 @@
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

import static io.airlift.units.DataSize.Unit.MEGABYTE;

public class HiveClientConfig
{
private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings();

private TimeZone timeZone = TimeZone.getDefault();

private DataSize maxSplitSize = new DataSize(64, Unit.MEGABYTE);
private DataSize maxSplitSize = new DataSize(64, MEGABYTE);
private int maxOutstandingSplits = 1_000;
private int maxGlobalSplitIteratorThreads = 1_000;
private int maxSplitIteratorThreads = 50;
private int minPartitionBatchSize = 10;
private int maxPartitionBatchSize = 100;
private int maxInitialSplits = 200;
private DataSize maxInitialSplitSize;

private Duration metastoreCacheTtl = new Duration(1, TimeUnit.HOURS);
private Duration metastoreRefreshInterval = new Duration(2, TimeUnit.MINUTES);
Expand All @@ -67,6 +70,33 @@ public class HiveClientConfig

private List<String> resourceConfigFiles;

public int getMaxInitialSplits()
{
return maxInitialSplits;
}

@Config("hive.max-initial-splits")
public HiveClientConfig setMaxInitialSplits(int maxInitialSplits)
{
this.maxInitialSplits = maxInitialSplits;
return this;
}

public DataSize getMaxInitialSplitSize()
{
if (maxInitialSplitSize == null) {
return new DataSize(maxSplitSize.getValue() / 2, maxSplitSize.getUnit());
}
return maxInitialSplitSize;
}

@Config("hive.max-initial-split-size")
public HiveClientConfig setMaxInitialSplitSize(DataSize maxInitialSplitSize)
{
this.maxInitialSplitSize = maxInitialSplitSize;
return this;
}

@NotNull
public TimeZone getTimeZone()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ public Object getInfo()
private final ClassLoader classLoader;
private final DataSize maxSplitSize;
private final int maxPartitionBatchSize;
private final DataSize maxInitialSplitSize;
private long remainingInitialSplits;
private final ConnectorSession session;

HiveSplitSourceProvider(String connectorId,
Expand All @@ -131,7 +133,9 @@ public Object getInfo()
DirectoryLister directoryLister,
Executor executor,
int maxPartitionBatchSize,
ConnectorSession session)
ConnectorSession session,
DataSize maxInitialSplitSize,
int maxInitialSplits)
{
this.connectorId = connectorId;
this.table = table;
Expand All @@ -148,6 +152,8 @@ public Object getInfo()
this.executor = executor;
this.session = session;
this.classLoader = Thread.currentThread().getContextClassLoader();
this.maxInitialSplitSize = maxInitialSplitSize;
this.remainingInitialSplits = maxInitialSplits;
}

public ConnectorSplitSource get()
Expand Down Expand Up @@ -356,8 +362,14 @@ private List<HiveSplit> createHiveSplits(
// get the addresses for the block
List<HostAddress> addresses = toHostAddress(blockLocation.getHosts());

long maxBytes = maxSplitSize.toBytes();

if (remainingInitialSplits > 0) {
maxBytes = maxInitialSplitSize.toBytes();
}

// divide the block into uniform chunks that are smaller than the max split size
int chunks = Math.max(1, (int) (blockLocation.getLength() / maxSplitSize.toBytes()));
int chunks = Math.max(1, (int) (blockLocation.getLength() / maxBytes));
// when block does not divide evenly into chunks, make the chunk size slightly bigger than necessary
long targetChunkSize = (long) Math.ceil(blockLocation.getLength() * 1.0 / chunks);

Expand All @@ -379,6 +391,7 @@ private List<HiveSplit> createHiveSplits(
session));

chunkOffset += chunkLength;
remainingInitialSplits--;
}
checkState(chunkOffset == blockLocation.getLength(), "Error splitting blocks");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,9 @@ protected void setup(String host, int port, String databaseName, String timeZone
maxOutstandingSplits,
maxThreads,
hiveClientConfig.getMinPartitionBatchSize(),
hiveClientConfig.getMaxPartitionBatchSize());
hiveClientConfig.getMaxPartitionBatchSize(),
hiveClientConfig.getMaxInitialSplitSize(),
hiveClientConfig.getMaxInitialSplits());

metadata = client;
splitManager = client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public void testDefaults()
.setMetastoreTimeout(new Duration(10, TimeUnit.SECONDS))
.setMinPartitionBatchSize(10)
.setMaxPartitionBatchSize(100)
.setMaxInitialSplits(200)
.setMaxInitialSplitSize(new DataSize(32, Unit.MEGABYTE))
.setDfsTimeout(new Duration(10, TimeUnit.SECONDS))
.setDfsConnectTimeout(new Duration(500, TimeUnit.MILLISECONDS))
.setDfsConnectMaxRetries(5)
Expand Down Expand Up @@ -84,6 +86,8 @@ public void testExplicitPropertyMappings()
.put("hive.dfs.connect.timeout", "20s")
.put("hive.dfs.connect.max-retries", "10")
.put("hive.config.resources", "/foo.xml,/bar.xml")
.put("hive.max-initial-splits", "10")
.put("hive.max-initial-split-size", "16MB")
.put("dfs.domain-socket-path", "/foo")
.put("hive.s3.aws-access-key", "abc123")
.put("hive.s3.aws-secret-key", "secret")
Expand All @@ -107,6 +111,8 @@ public void testExplicitPropertyMappings()
.setMetastoreTimeout(new Duration(20, TimeUnit.SECONDS))
.setMinPartitionBatchSize(1)
.setMaxPartitionBatchSize(1000)
.setMaxInitialSplits(10)
.setMaxInitialSplitSize(new DataSize(16, Unit.MEGABYTE))
.setDfsTimeout(new Duration(33, TimeUnit.SECONDS))
.setDfsConnectTimeout(new Duration(20, TimeUnit.SECONDS))
.setDfsConnectMaxRetries(10)
Expand Down