Skip to content

Commit

Permalink
1. Fix TBrokerScanRangeParams required src_slot_ids (StarRocks#3)
Browse files Browse the repository at this point in the history
2. Add PB file check on FE

Signed-off-by: zenoyang <[email protected]>
  • Loading branch information
zenoyang authored Aug 5, 2024
1 parent a1ee59a commit 7e6a1b2
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public void init(BrokerDesc brokerDesc) throws LoadException {
// scan range params
TBrokerScanRangeParams params = new TBrokerScanRangeParams();
params.setStrict_mode(false);
params.addToSrc_slot_ids(-1);
if (brokerDesc.hasBroker()) {
params.setProperties(brokerDesc.getProperties());
params.setUse_broker(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class SegmentLoadPendingTask extends LoadTask {
private static final Logger LOG = LogManager.getLogger(SegmentLoadPendingTask.class);

private static final String SPARK_OUTPUT_DATA_FILE_SUFFIX = "dat";
private static final String SPARK_OUTPUT_PB_FILE_SUFFIX = "pb";
private static final String SPARK_OUTPUT_SCHEMA_FILE_SUFFIX = "schema";
private static final String SPARK_OUTPUT_TABLET_DATA_DIR = "/data";

Expand Down Expand Up @@ -78,11 +81,12 @@ private void parseAllFilePath() throws UserException {
+ callback.getCallbackId());
}

long totalFileSize = 0;
int totalFileNum = 0;
long totalDataFileSize = 0;
int totalDataFileNum = 0;
SegmentPendingTaskAttachment segmentAttachment = (SegmentPendingTaskAttachment) attachment;
List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
String path = fileGroup.getFilePaths().get(0);
Set<String> allPbFilePrefix = new HashSet<>();

if (brokerDesc.hasBroker()) {
BrokerUtil.parseFile(path, brokerDesc, fileStatuses, false, true);
Expand All @@ -99,11 +103,15 @@ private void parseAllFilePath() throws UserException {
// tableId/partitionId/indexId/tabletId/tablet.schema
String relativePath = filePath.substring(filePath.indexOf(String.valueOf(tableId)));

if (filePath.endsWith(SPARK_OUTPUT_DATA_FILE_SUFFIX) && filePath.contains(SPARK_OUTPUT_TABLET_DATA_DIR)) {
if (filePath.contains(SPARK_OUTPUT_TABLET_DATA_DIR) && filePath.endsWith(SPARK_OUTPUT_DATA_FILE_SUFFIX)) {
String tabletMeta = relativePath.split(SPARK_OUTPUT_TABLET_DATA_DIR)[0];
segmentAttachment.addDataFileInfo(tabletMeta, Pair.create(filePath, fileStatus.size));
totalFileNum++;
totalFileSize += fileStatus.getSize();
totalDataFileNum++;
totalDataFileSize += fileStatus.getSize();
} else if (filePath.contains(SPARK_OUTPUT_TABLET_DATA_DIR)
&& filePath.endsWith(SPARK_OUTPUT_PB_FILE_SUFFIX)) {
String tabletMeta = relativePath.split(SPARK_OUTPUT_TABLET_DATA_DIR)[0];
allPbFilePrefix.add(tabletMeta);
} else if (filePath.endsWith(SPARK_OUTPUT_SCHEMA_FILE_SUFFIX)) {
String tabletMeta = relativePath.substring(0, relativePath.lastIndexOf("/"));
segmentAttachment.addSchemaFilePath(tabletMeta, filePath);
Expand All @@ -116,9 +124,15 @@ private void parseAllFilePath() throws UserException {
+ callback.getCallbackId());
}

if (allPbFilePrefix.size() != segmentAttachment.getTabletMetaToDataFileInfo().keySet().size()
|| !allPbFilePrefix.containsAll(segmentAttachment.getTabletMetaToDataFileInfo().keySet())) {
throw new UserException("Segment Load file path is invalid, segment pb files does not match data files. " +
"job: " + callback.getCallbackId());
}

LOG.info("get {} files to be loaded. total size: {}. cost: {} ms, job: {}",
totalFileNum, totalFileSize, (System.currentTimeMillis() - start), callback.getCallbackId());
totalDataFileNum, totalDataFileSize, (System.currentTimeMillis() - start), callback.getCallbackId());

((SegmentLoadJob) callback).setLoadFileInfo(totalFileNum, totalFileSize);
((SegmentLoadJob) callback).setLoadFileInfo(totalDataFileNum, totalDataFileSize);
}
}

0 comments on commit 7e6a1b2

Please sign in to comment.