Skip to content

Commit

Permalink
add more logging and metric to compaction task generator (apache#14469)
Browse files Browse the repository at this point in the history
  • Loading branch information
dang-stripe authored Nov 19, 2024
1 parent b9965c8 commit 89622c0
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ rules:
database: "$2"
table: "$1$3"
tableType: "$4"
- pattern: "\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"MinionMetrics\", name=\"pinot\\.minion\\.(([^.]+)\\.)?([^.]*)_(OFFLINE|REALTIME)\\.(\\w+)\\.(taskExecution|taskQueueing|numberTasks|numberTasksExecuted|numberTasksCompleted|numberTasksCancelled|numberTasksFailed|numberTasksFatalFailed|segmentBytesDownloaded|segmentDownloadCount|segmentBytesUploaded|segmentUploadCount|recordsPurgedCount|recordsProcessedCount)\"><>(\\w+)"
- pattern: "\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"MinionMetrics\", name=\"pinot\\.minion\\.(([^.]+)\\.)?([^.]*)_(OFFLINE|REALTIME)\\.(\\w+)\\.(taskExecution|taskQueueing|numberTasks|numberTasksExecuted|numberTasksCompleted|numberTasksCancelled|numberTasksFailed|numberTasksFatalFailed|segmentBytesDownloaded|segmentDownloadCount|segmentBytesUploaded|segmentUploadCount|recordsPurgedCount|recordsProcessedCount|compactedRecordsCount)\"><>(\\w+)"
name: "pinot_minion_$6_$7"
cache: true
labels:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public enum MinionMeter implements AbstractMetrics.Meter {
SEGMENT_BYTES_DOWNLOADED("bytes", false),
SEGMENT_BYTES_UPLOADED("bytes", false),
RECORDS_PROCESSED_COUNT("rows", false),
RECORDS_PURGED_COUNT("rows", false);
RECORDS_PURGED_COUNT("rows", false),
COMPACTED_RECORDS_COUNT("rows", false);

private final String _meterName;
private final String _unit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.metrics.MinionMeter;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
Expand Down Expand Up @@ -93,22 +94,28 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
.build();
}

int totalDocsAfterCompaction;
try (CompactedPinotSegmentRecordReader compactedRecordReader = new CompactedPinotSegmentRecordReader(indexDir,
validDocIds)) {
SegmentGeneratorConfig config = getSegmentGeneratorConfig(workingDir, tableConfig, segmentMetadata, segmentName,
getSchema(tableNameWithType));
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(config, compactedRecordReader);
driver.build();
totalDocsAfterCompaction = driver.getSegmentStats().getTotalDocCount();
}

File compactedSegmentFile = new File(workingDir, segmentName);
SegmentConversionResult result =
new SegmentConversionResult.Builder().setFile(compactedSegmentFile).setTableNameWithType(tableNameWithType)
.setSegmentName(segmentName).build();
_minionMetrics.addMeteredTableValue(tableNameWithType, MinionMeter.COMPACTED_RECORDS_COUNT,
segmentMetadata.getTotalDocs() - totalDocsAfterCompaction);

long endMillis = System.currentTimeMillis();
LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis));
LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms. Total docs before compaction: {}. "
+ "Total docs after compaction: {}.", taskType, configs, (endMillis - startMillis),
segmentMetadata.getTotalDocs(), totalDocsAfterCompaction);

return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {

SegmentSelectionResult segmentSelectionResult =
processValidDocIdsMetadata(taskConfigs, completedSegmentsMap, validDocIdsMetadataList);
int skippedSegmentsCount = validDocIdsMetadataList.size()
- segmentSelectionResult.getSegmentsForCompaction().size()
- segmentSelectionResult.getSegmentsForDeletion().size();
LOGGER.info("Selected {} segments for compaction, {} segments for deletion and skipped {} segments for table: {}",
segmentSelectionResult.getSegmentsForCompaction().size(),
segmentSelectionResult.getSegmentsForDeletion().size(), skippedSegmentsCount, tableNameWithType);

if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) {
pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentSelectionResult.getSegmentsForDeletion(),
Expand Down Expand Up @@ -218,10 +224,20 @@ public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, Stri
long totalDocs = validDocIdsMetadata.getTotalDocs();
double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 100;
if (totalInvalidDocs == totalDocs) {
LOGGER.debug("Segment {} contains only invalid records, adding it to the deletion list", segmentName);
segmentsForDeletion.add(segment.getSegmentName());
} else if (invalidRecordPercent >= invalidRecordsThresholdPercent
&& totalInvalidDocs >= invalidRecordsThresholdCount) {
LOGGER.debug("Segment {} contains {} invalid records out of {} total records "
+ "(count threshold: {}, percent threshold: {}), adding it to the compaction list",
segmentName, totalInvalidDocs, totalDocs, invalidRecordsThresholdCount,
invalidRecordsThresholdPercent);
segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs));
} else {
LOGGER.debug("Segment {} contains {} invalid records out of {} total records "
+ "(count threshold: {}, percent threshold: {}), skipping it for compaction",
segmentName, totalInvalidDocs, totalDocs, invalidRecordsThresholdCount,
invalidRecordsThresholdPercent);
}
break;
}
Expand Down

0 comments on commit 89622c0

Please sign in to comment.