Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions presto-docs/src/main/sphinx/cache/local.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,26 @@ and restart the Presto coordinator and workers:
In the above example configuration,

* ``hive.node-selection-strategy=SOFT_AFFINITY`` instructs Presto scheduler to take data affinity
into consideration when secheduling tasks to workers that enables meaningful data caching effectiveness.
This configuration property is defaul to ``NO_PREFERENCE`` and SDK cache is only enabled when set to ``SOFT_AFFINITY``.
into consideration when scheduling tasks to workers that enables meaningful data caching effectiveness.
This configuration property defaults to ``NO_PREFERENCE`` and SDK cache is only enabled when set to ``SOFT_AFFINITY``.
Other configuration on coordinator that can impact data affinity includes
``node-scheduler.max-pending-splits-per-task`` (the max pending splits per task) and
``node-scheduler.max-splits-per-node`` (the max splits per node).
* ``cache.enabled=true`` turns on the SDK cache and ``cache.type=ALLUXIO`` sets it to Alluxio.
* ``cache.alluxio.max-cache-size=500GB`` sets storage space to be 500GB.
* ``cache.base-directory=/tmp/alluxio-cache`` specifies a local directory ``/tmp/alluxio-cache``. Note that this Presto server must have both read and write permission to access this local directory.

When affinity scheduling is enabled, a set of preferred nodes is assigned to a certain file section. The default file section size is ``256MB``.
For example, if the file size is 512MB, two different affinity preferences will be assigned:
- ``[0MB..256MB] -> NodeA, NodeB``
- ``[256MB+1B..512MB] -> NodeC, NodeD``

The section is selected based on the split start offset.
A split that has its first byte in the first section is preferred to be scheduled on ``NodeA`` or ``NodeB``.

Change the size of the section by setting the ``hive.affinity-scheduling-file-section-size`` configuration property
or the ``affinity_scheduling_file_section_size`` session property.


Monitoring
----------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ public class HiveClientConfig
private Duration parquetQuickStatsFileMetadataFetchTimeout = new Duration(60, TimeUnit.SECONDS);
private int parquetQuickStatsMaxConcurrentCalls = 500;
private int quickStatsMaxConcurrentCalls = 100;
private DataSize affinitySchedulingFileSectionSize = new DataSize(256, MEGABYTE);

@Min(0)
public int getMaxInitialSplits()
Expand Down Expand Up @@ -1802,4 +1803,17 @@ public int getMaxParallelParsingConcurrency()
{
return this.maxParallelParsingConcurrency;
}

@NotNull
public DataSize getAffinitySchedulingFileSectionSize()
{
return affinitySchedulingFileSectionSize;
}

@Config("hive.affinity-scheduling-file-section-size")
public HiveClientConfig setAffinitySchedulingFileSectionSize(DataSize affinitySchedulingFileSectionSize)
{
this.affinitySchedulingFileSectionSize = affinitySchedulingFileSectionSize;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class HiveFileSplit
private final long fileModifiedTime;
private final Optional<byte[]> extraFileInfo;
private final Map<String, String> customSplitInfo;
private final long affinitySchedulingFileSectionIndex;

/**
* @param path the absolute path to the file that contains the split
Expand All @@ -56,7 +57,8 @@ public HiveFileSplit(
@JsonProperty("fileSize") long fileSize,
@JsonProperty("fileModifiedTime") long fileModifiedTime,
@JsonProperty("extraFileInfo") Optional<byte[]> extraFileInfo,
@JsonProperty("customSplitInfo") Map<String, String> customSplitInfo)
@JsonProperty("customSplitInfo") Map<String, String> customSplitInfo,
@JsonProperty("affinitySchedulingFileSectionIndex") long affinitySchedulingFileSectionIndex)
{
checkArgument(start >= 0, "start must be non-negative");
checkArgument(length >= 0, "length must be non-negative");
Expand All @@ -73,6 +75,7 @@ public HiveFileSplit(
this.fileModifiedTime = fileModifiedTime;
this.extraFileInfo = extraFileInfo;
this.customSplitInfo = ImmutableMap.copyOf(customSplitInfo);
this.affinitySchedulingFileSectionIndex = affinitySchedulingFileSectionIndex;
}

@JsonProperty
Expand Down Expand Up @@ -117,6 +120,12 @@ public Map<String, String> getCustomSplitInfo()
return customSplitInfo;
}

@JsonProperty
public long getAffinitySchedulingFileSectionIndex()
{
return affinitySchedulingFileSectionIndex;
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public final class HiveSessionProperties
public static final String QUICK_STATS_INLINE_BUILD_TIMEOUT = "quick_stats_inline_build_timeout";
public static final String QUICK_STATS_BACKGROUND_BUILD_TIMEOUT = "quick_stats_background_build_timeout";
public static final String DYNAMIC_SPLIT_SIZES_ENABLED = "dynamic_split_sizes_enabled";
public static final String AFFINITY_SCHEDULING_FILE_SECTION_SIZE = "affinity_scheduling_file_section_size";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -629,7 +630,12 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
hiveClientConfig.getQuickStatsBackgroundBuildTimeout(),
false,
value -> Duration.valueOf((String) value),
Duration::toString));
Duration::toString),
dataSizeSessionProperty(
AFFINITY_SCHEDULING_FILE_SECTION_SIZE,
"Size of file section for affinity scheduling",
hiveClientConfig.getAffinitySchedulingFileSectionSize(),
false));
}

public List<PropertyMetadata<?>> getSessionProperties()
Expand Down Expand Up @@ -1092,4 +1098,9 @@ public static Duration getQuickStatsBackgroundBuildTimeout(ConnectorSession sess
{
return session.getProperty(QUICK_STATS_BACKGROUND_BUILD_TIMEOUT, Duration.class);
}

public static DataSize getAffinitySchedulingFileSectionSize(ConnectorSession session)
{
return session.getProperty(AFFINITY_SCHEDULING_FILE_SECTION_SIZE, DataSize.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public List<HostAddress> getAddresses()
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
if (getNodeSelectionStrategy() == SOFT_AFFINITY) {
return nodeProvider.get(fileSplit.getPath(), 2);
return nodeProvider.get(fileSplit.getPath() + "#" + fileSplit.getAffinitySchedulingFileSectionIndex(), 2);
}
return addresses;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_EXCEEDED_SPLIT_BUFFERING_LIMIT;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static com.facebook.presto.hive.HiveSessionProperties.getAffinitySchedulingFileSectionSize;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxSplitSize;
import static com.facebook.presto.hive.HiveSessionProperties.getMinimumAssignedSplitWeight;
Expand Down Expand Up @@ -101,6 +102,7 @@ class HiveSplitSource
private final AtomicBoolean loggedHighMemoryWarning = new AtomicBoolean();
private final HiveSplitWeightProvider splitWeightProvider;
private final double splitScanRatio;
private final long affinitySchedulingFileSectionSizeInBytes;

private HiveSplitSource(
ConnectorSession session,
Expand Down Expand Up @@ -140,6 +142,7 @@ private HiveSplitSource(
splitScanRatio = 1.0;
}
this.splitScanRatio = max(min(splitScanRatio, 1.0), 0.1);
affinitySchedulingFileSectionSizeInBytes = getAffinitySchedulingFileSectionSize(session).toBytes();
}

public static HiveSplitSource allAtOnce(
Expand Down Expand Up @@ -517,7 +520,8 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) {
internalSplit.getFileSize(),
internalSplit.getFileModifiedTime(),
internalSplit.getExtraFileInfo(),
internalSplit.getCustomSplitInfo());
internalSplit.getCustomSplitInfo(),
internalSplit.getStart() / affinitySchedulingFileSectionSizeInBytes);

resultBuilder.add(new HiveSplit(
fileSplit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle
outputFile.length(),
Instant.now().toEpochMilli(),
Optional.empty(),
ImmutableMap.of());
ImmutableMap.of(),
0);

HiveSplit split = new HiveSplit(
fileSplit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ private static HiveFileSplit getHiveFileSplit(Class<? extends FileSplit> fileSpl
CUSTOM_FILE_SPLIT_CLASS_KEY, HoodieRealtimeFileSplit.class.getName(),
HUDI_DELTA_FILEPATHS_KEY, "",
HUDI_BASEPATH_KEY, getTableBasePath(TABLE_NAME),
HUDI_MAX_COMMIT_TIME_KEY, "20210524095413"));
HUDI_MAX_COMMIT_TIME_KEY, "20210524095413"),
0);
case "org.apache.hudi.hadoop.realtime.HoodieRealtimeBootstrapBaseFileSplit":
ImmutableMap.Builder<String, String> customSplitInfo = new ImmutableMap.Builder<>();
customSplitInfo.put(CUSTOM_FILE_SPLIT_CLASS_KEY, HoodieRealtimeBootstrapBaseFileSplit.class.getName());
Expand All @@ -149,13 +150,14 @@ HUDI_BASEPATH_KEY, getTableBasePath(TABLE_NAME),
customSplitInfo.put(BOOTSTRAP_FILE_SPLIT_START, "0");
customSplitInfo.put(BOOTSTRAP_FILE_SPLIT_LEN, "435165");
return new HiveFileSplit(
getTableBasePath(TABLE_NAME) + "/testPartition/" + FILE_NAME,
0,
435165,
435165,
1621850079,
Optional.empty(),
customSplitInfo.build());
getTableBasePath(TABLE_NAME) + "/testPartition/" + FILE_NAME,
0,
435165,
435165,
1621850079,
Optional.empty(),
customSplitInfo.build(),
0);
default:
throw new IllegalArgumentException("Unknown file split class " + fileSplitClass.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.facebook.presto.hive.TestHiveUtil.nonDefaultTimeZone;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;

public class TestHiveClientConfig
{
Expand Down Expand Up @@ -162,7 +163,8 @@ public void testDefaults()
.setQuickStatsReaperExpiry(new Duration(5, TimeUnit.MINUTES))
.setParquetQuickStatsFileMetadataFetchTimeout(new Duration(60, TimeUnit.SECONDS))
.setMaxConcurrentQuickStatsCalls(100)
.setMaxConcurrentParquetQuickStatsCalls(500));
.setMaxConcurrentParquetQuickStatsCalls(500)
.setAffinitySchedulingFileSectionSize(new DataSize(256, MEGABYTE)));
}

@Test
Expand Down Expand Up @@ -287,6 +289,7 @@ public void testExplicitPropertyMappings()
.put("hive.quick-stats.parquet.file-metadata-fetch-timeout", "30s")
.put("hive.quick-stats.parquet.max-concurrent-calls", "399")
.put("hive.quick-stats.max-concurrent-calls", "101")
.put("hive.affinity-scheduling-file-section-size", "512MB")
.build();

HiveClientConfig expected = new HiveClientConfig()
Expand Down Expand Up @@ -406,7 +409,8 @@ public void testExplicitPropertyMappings()
.setQuickStatsReaperExpiry(new Duration(15, TimeUnit.MINUTES))
.setParquetQuickStatsFileMetadataFetchTimeout(new Duration(30, TimeUnit.SECONDS))
.setMaxConcurrentParquetQuickStatsCalls(399)
.setMaxConcurrentQuickStatsCalls(101);
.setMaxConcurrentQuickStatsCalls(101)
.setAffinitySchedulingFileSectionSize(new DataSize(512, MEGABYTE));

ConfigAssertions.assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,8 @@ private void testCursorProvider(HiveRecordCursorProvider cursorProvider,
split.getLength(),
Instant.now().toEpochMilli(),
Optional.empty(),
ImmutableMap.of());
ImmutableMap.of(),
0);

Configuration configuration = new Configuration();
configuration.set("io.compression.codecs", LzoCodec.class.getName() + "," + LzopCodec.class.getName());
Expand Down Expand Up @@ -1005,7 +1006,8 @@ private void testPageSourceFactory(HiveBatchPageSourceFactory sourceFactory,
split.getLength(),
Instant.now().toEpochMilli(),
Optional.empty(),
ImmutableMap.of());
ImmutableMap.of(),
0);

Optional<ConnectorPageSource> pageSource = HivePageSourceProvider.createHivePageSource(
ImmutableSet.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class TestHiveFileSplit
@Test
public void testGetters()
{
HiveFileSplit split = new HiveFileSplit("path", 0, 200, 3, 400, Optional.of(new byte[21]), Collections.emptyMap());
HiveFileSplit split = new HiveFileSplit("path", 0, 200, 3, 400, Optional.of(new byte[21]), Collections.emptyMap(), 0);
assertEquals(split.getPath(), "path");
assertEquals(split.getLength(), 200L);
assertEquals(split.getStart(), 0L);
Expand All @@ -39,6 +39,6 @@ public void testGetters()
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "start must be non-negative")
public void testNegativeStart()
{
new HiveFileSplit("path", -1, 200, 3, 400, Optional.of(new byte[21]), Collections.emptyMap());
new HiveFileSplit("path", -1, 200, 3, 400, Optional.of(new byte[21]), Collections.emptyMap(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ private static ConnectorPageSource createPageSource(HiveTransactionHandle transa
outputFile.length(),
outputFile.lastModified(),
Optional.empty(),
ImmutableMap.of());
ImmutableMap.of(),
0);

HiveSplit split = new HiveSplit(
fileSplit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public void testGenerateCacheQuota()
10,
Instant.now().toEpochMilli(),
Optional.empty(),
ImmutableMap.of());
ImmutableMap.of(),
0);
HiveSplit split = new HiveSplit(
fileSplit,
SCHEMA_NAME,
Expand Down Expand Up @@ -220,7 +221,8 @@ public void testUseRecordReaderWithInputFormatAnnotationAndCustomSplit()
200,
Instant.now().toEpochMilli(),
Optional.empty(),
customSplitInfo);
customSplitInfo,
0);
Optional<ConnectorPageSource> pageSource = HivePageSourceProvider.createHivePageSource(
ImmutableSet.of(recordCursorProvider),
ImmutableSet.of(hiveBatchPageSourceFactory),
Expand Down Expand Up @@ -272,7 +274,8 @@ public void testNotUseRecordReaderWithInputFormatAnnotationWithoutCustomSplit()
200,
Instant.now().toEpochMilli(),
Optional.empty(),
ImmutableMap.of());
ImmutableMap.of(),
0);

Optional<ConnectorPageSource> pageSource = HivePageSourceProvider.createHivePageSource(
ImmutableSet.of(recordCursorProvider),
Expand Down Expand Up @@ -448,7 +451,8 @@ private static HiveSplit getHiveSplit(HiveStorageFormat hiveStorageFormat)
10,
Instant.now().toEpochMilli(),
Optional.empty(),
ImmutableMap.of());
ImmutableMap.of(),
0);

return new HiveSplit(
fileSplit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public void testJsonRoundTrip()
88,
Instant.now().toEpochMilli(),
Optional.empty(),
customSplitInfo);
customSplitInfo,
0);

byte[] rowIdPartitionComponent = {(byte) 76, (byte) 58};
HiveSplit expected = new HiveSplit(
Expand Down
Loading