Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
options.commitMaxRetryWait(),
options.commitStrictModeLastSafeSnapshot().orElse(null),
options.rowTrackingEnabled(),
options.dataEvolutionEnabled(),
options.commitDiscardDuplicateFiles(),
conflictDetection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
@Nullable private Long strictModeLastSafeSnapshot;
private final InternalRowPartitionComputer partitionComputer;
private final boolean rowTrackingEnabled;
private final boolean dataEvolutionEnabled;
private final boolean discardDuplicateFiles;
private final ConflictDetection conflictDetection;

Expand Down Expand Up @@ -186,6 +187,7 @@ public FileStoreCommitImpl(
long commitMaxRetryWait,
@Nullable Long strictModeLastSafeSnapshot,
boolean rowTrackingEnabled,
boolean dataEvolutionEnabled,
boolean discardDuplicateFiles,
ConflictDetection conflictDetection) {
this.snapshotCommit = snapshotCommit;
Expand Down Expand Up @@ -230,6 +232,7 @@ public FileStoreCommitImpl(
this.statsFileHandler = statsFileHandler;
this.bucketMode = bucketMode;
this.rowTrackingEnabled = rowTrackingEnabled;
this.dataEvolutionEnabled = dataEvolutionEnabled;
this.discardDuplicateFiles = discardDuplicateFiles;
this.conflictDetection = conflictDetection;
}
Expand Down Expand Up @@ -1120,8 +1123,16 @@ CommitResult tryCommitOnce(
}

// the added records subtract the deleted records from
long deltaRecordCount = recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles);
long totalRecordCount = previousTotalRecordCount + deltaRecordCount;
long deltaRecordCount;
long totalRecordCount;

if (dataEvolutionEnabled) {
deltaRecordCount = nextRowIdStart - firstRowIdStart;
totalRecordCount = nextRowIdStart;
} else {
deltaRecordCount = recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to modify here, we should also modify split row count just like python.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can modify here too, it depends on how it defines

totalRecordCount = previousTotalRecordCount + deltaRecordCount;
}

// write new delta files into manifest files
deltaStatistics = new ArrayList<>(PartitionEntry.merge(deltaFiles));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,18 @@ public List<SplitGroup> splitForBatch(List<DataFileMeta> input) {
.map(
f -> {
boolean rawConvertible = f.stream().allMatch(file -> file.size() == 1);
long rowCount =
f.stream()
.mapToLong(entries -> entries.get(0).rowCount())
.sum();
List<DataFileMeta> groupFiles =
f.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
return rawConvertible
? SplitGroup.rawConvertibleGroup(groupFiles)
: SplitGroup.nonRawConvertibleGroup(groupFiles);
? SplitGroup.rawConvertibleGroup(groupFiles).rowCount(rowCount)
: SplitGroup.nonRawConvertibleGroup(groupFiles)
.rowCount(rowCount);
})
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class DataSplit implements Split {

private static final long serialVersionUID = 7L;
private static final long MAGIC = -2394839472490812314L;
private static final int VERSION = 8;
private static final int VERSION = 9;

private long snapshotId = 0;
private BinaryRow partition;
Expand All @@ -78,6 +78,7 @@ public class DataSplit implements Split {

private boolean isStreaming = false;
private boolean rawConvertible;
private long rowCount = -1L;

public DataSplit() {}

Expand Down Expand Up @@ -136,6 +137,9 @@ public OptionalLong earliestFileCreationEpochMillis() {

@Override
public long rowCount() {
if (this.rowCount > 0L) {
return this.rowCount;
}
long rowCount = 0;
for (DataFileMeta file : dataFiles) {
rowCount += file.rowCount();
Expand Down Expand Up @@ -309,7 +313,8 @@ public boolean equals(Object o) {
&& Objects.equals(beforeFiles, dataSplit.beforeFiles)
&& Objects.equals(beforeDeletionFiles, dataSplit.beforeDeletionFiles)
&& Objects.equals(dataFiles, dataSplit.dataFiles)
&& Objects.equals(dataDeletionFiles, dataSplit.dataDeletionFiles);
&& Objects.equals(dataDeletionFiles, dataSplit.dataDeletionFiles)
&& Objects.equals(rowCount, dataSplit.rowCount);
}

@Override
Expand All @@ -325,7 +330,8 @@ public int hashCode() {
dataFiles,
dataDeletionFiles,
isStreaming,
rawConvertible);
rawConvertible,
rowCount);
}

@Override
Expand Down Expand Up @@ -364,6 +370,7 @@ protected void assign(DataSplit other) {
this.dataDeletionFiles = other.dataDeletionFiles;
this.isStreaming = other.isStreaming;
this.rawConvertible = other.rawConvertible;
this.rowCount = other.rowCount;
}

public void serialize(DataOutputView out) throws IOException {
Expand Down Expand Up @@ -396,8 +403,8 @@ public void serialize(DataOutputView out) throws IOException {
DeletionFile.serializeList(out, dataDeletionFiles);

out.writeBoolean(isStreaming);

out.writeBoolean(rawConvertible);
out.writeLong(rowCount);
}

public static DataSplit deserialize(DataInputView in) throws IOException {
Expand Down Expand Up @@ -433,6 +440,10 @@ public static DataSplit deserialize(DataInputView in) throws IOException {

boolean isStreaming = in.readBoolean();
boolean rawConvertible = in.readBoolean();
long rowCount = -1L;
if (version >= 9) {
rowCount = in.readLong();
}

DataSplit.Builder builder =
builder()
Expand All @@ -444,7 +455,8 @@ public static DataSplit deserialize(DataInputView in) throws IOException {
.withBeforeFiles(beforeFiles)
.withDataFiles(dataFiles)
.isStreaming(isStreaming)
.rawConvertible(rawConvertible);
.rawConvertible(rawConvertible)
.withRowCount(rowCount);

if (beforeDeletionFiles != null) {
builder.withBeforeDeletionFiles(beforeDeletionFiles);
Expand Down Expand Up @@ -473,7 +485,7 @@ private static FunctionWithIOException<DataInputView, DataFileMeta> getFileMetaS
DataFileMetaFirstRowIdLegacySerializer serializer =
new DataFileMetaFirstRowIdLegacySerializer();
return serializer::deserialize;
} else if (version == 8) {
} else if (version == 8 || version == 9) {
DataFileMetaSerializer serializer = new DataFileMetaSerializer();
return serializer::deserialize;
} else {
Expand Down Expand Up @@ -556,6 +568,11 @@ public Builder rawConvertible(boolean rawConvertible) {
return this;
}

public Builder withRowCount(long rowCount) {
this.split.rowCount = rowCount;
return this;
}

public DataSplit build() {
checkArgument(split.partition != null);
checkArgument(split.bucket != -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class SplitGroup {

public final List<DataFileMeta> files;
public final boolean rawConvertible;
public long rowCount = -1L;

private SplitGroup(List<DataFileMeta> files, boolean rawConvertible) {
this.files = files;
Expand All @@ -49,5 +50,10 @@ public static SplitGroup rawConvertibleGroup(List<DataFileMeta> files) {
public static SplitGroup nonRawConvertibleGroup(List<DataFileMeta> files) {
return new SplitGroup(files, false);
}

public SplitGroup rowCount(long rowCount) {
this.rowCount = rowCount;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ private List<DataSplit> generateSplits(
String bucketPath = pathFactory.bucketPath(partition, bucket).toString();
builder.withDataFiles(dataFiles)
.rawConvertible(splitGroup.rawConvertible)
.withRowCount(splitGroup.rowCount > 0 ? splitGroup.rowCount : -1)
.withBucketPath(bucketPath);
if (deletionVectors && deletionFilesMap != null) {
builder.withDataDeletionFiles(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataTypes;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -179,6 +180,19 @@ public InternalRow next() {
assertThat(i.get()).isEqualTo(1);
}

@Test
public void testSnapshotRowCount() throws Exception {
createTableDefault();
commitDefault(writeDataDefault(100, 1));
long rowCount =
getTableDefault().newScan().plan().splits().stream()
.mapToLong(Split::rowCount)
.sum();
assertThat(rowCount).isEqualTo(100);
assertThat(getTableDefault().snapshotManager().latestSnapshot().totalRecordCount())
.isEqualTo(100);
}

protected Schema schemaDefault() {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,36 @@ public void testWithRowIds() throws Exception {
assertThat(i.get()).isEqualTo(2);
}

@Test
public void testSplit() throws Exception {
createTableDefault();
Schema schema = schemaDefault();
BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
try (BatchTableWrite write = builder.newWrite().withWriteType(schema.rowType())) {
write.write(
GenericRow.of(1, BinaryString.fromString("a"), BinaryString.fromString("b")));
BatchTableCommit commit = builder.newCommit();
List<CommitMessage> commitables = write.prepareCommit();
commit.commit(commitables);
}

RowType writeType1 = schema.rowType().project(Collections.singletonList("f2"));
try (BatchTableWrite write1 = builder.newWrite().withWriteType(writeType1)) {
write1.write(GenericRow.of(BinaryString.fromString("c")));

BatchTableCommit commit = builder.newCommit();
List<CommitMessage> commitables = write1.prepareCommit();
setFirstRowId(commitables, 0L);
commit.commit(commitables);
}

ReadBuilder readBuilder = getTableDefault().newReadBuilder();

long rowCount =
readBuilder.newScan().plan().splits().stream().mapToLong(Split::rowCount).sum();
assertThat(rowCount).isEqualTo(1L);
}

protected Schema schemaDefault() {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,78 @@ public void testSerializerCompatibleV8() throws Exception {
assertThat(actual).isEqualTo(split);
}

@Test
public void testSerializerCompatibleV9() throws Exception {
SimpleStats keyStats =
new SimpleStats(
singleColumn("min_key"),
singleColumn("max_key"),
fromLongArray(new Long[] {0L}));
SimpleStats valueStats =
new SimpleStats(
singleColumn("min_value"),
singleColumn("max_value"),
fromLongArray(new Long[] {0L}));

DataFileMeta dataFile =
DataFileMeta.create(
"my_file",
1024 * 1024,
1024,
singleColumn("min_key"),
singleColumn("max_key"),
keyStats,
valueStats,
15,
200,
5,
3,
Arrays.asList("extra1", "extra2"),
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
11L,
new byte[] {1, 2, 4},
FileSource.COMPACT,
Arrays.asList("field1", "field2", "field3"),
"hdfs:///path/to/warehouse",
12L,
Arrays.asList("a", "b", "c", "f"));
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);

DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 33L);
List<DeletionFile> deletionFiles = Collections.singletonList(deletionFile);

BinaryRow partition = new BinaryRow(1);
BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition);
binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa"));
binaryRowWriter.complete();

DataSplit split =
DataSplit.builder()
.withSnapshot(18)
.withPartition(partition)
.withBucket(20)
.withTotalBuckets(32)
.withDataFiles(dataFiles)
.withDataDeletionFiles(deletionFiles)
.withBucketPath("my path")
.withRowCount(1024L)
.build();

assertThat(InstantiationUtil.clone(InstantiationUtil.clone(split))).isEqualTo(split);

byte[] v6Bytes =
IOUtils.readFully(
DataSplitCompatibleTest.class
.getClassLoader()
.getResourceAsStream("compatibility/datasplit-v9"),
true);

DataSplit actual =
InstantiationUtil.deserializeObject(v6Bytes, DataSplit.class.getClassLoader());
assertThat(actual).isEqualTo(split);
assertThat(actual.rowCount()).isEqualTo(1024L);
}

private DataFileMeta newDataFile(long rowCount) {
return newDataFile(rowCount, null, null);
}
Expand Down
Binary file not shown.
12 changes: 11 additions & 1 deletion paimon-python/pypaimon/read/scanner/full_starting_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,13 +435,23 @@ def weight_func(file_list: List[DataFileMeta]) -> int:
packed_files: List[List[List[DataFileMeta]]] = self._pack_for_ordered(split_by_row_id, weight_func,
self.target_split_size)

rowCounts: List[int] = [
sum([m[0].row_count for m in bunch])
for bunch in packed_files
]

# Flatten the packed files and build splits
flatten_packed_files: List[List[DataFileMeta]] = [
[file for sub_pack in pack for file in sub_pack]
for pack in packed_files
]

splits += self._build_split_from_pack(flatten_packed_files, file_entries, False)
splits_temp = self._build_split_from_pack(flatten_packed_files, file_entries, False)

for i in range(len(splits_temp)):
splits_temp[i].set_row_count(rowCounts[i])

splits += splits_temp

if self.idx_of_this_subtask is not None:
self._compute_split_start_end_row(splits, plan_start_row, plan_end_row)
Expand Down
3 changes: 3 additions & 0 deletions paimon-python/pypaimon/read/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,6 @@ def file_size(self) -> int:
@property
def file_paths(self) -> List[str]:
return self._file_paths

def set_row_count(self, row_count: int) -> None:
self._row_count = row_count
5 changes: 4 additions & 1 deletion paimon-python/pypaimon/tests/blob_table_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,10 @@ def test_blob_write_read_large_data_end_to_end_with_rolling(self):
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
result = table_read.to_arrow(table_scan.plan().splits())
splits = table_scan.plan().splits()
result = table_read.to_arrow(splits)

self.assertEqual(sum([s._row_count for s in splits]), 40)

# Verify the data
self.assertEqual(result.num_rows, 40, "Should have 40 rows")
Expand Down
Loading