Skip to content

Conversation

@Guosmilesmile
Copy link
Contributor

This pr is aim to adds readers in Flink for _row_id and _last_updated_sequence_number to support lineage in flink.
This change mainly aligns with/references #12836

@Guosmilesmile
Copy link
Contributor Author

Guosmilesmile commented Sep 22, 2025

mark ci fail . https://github.com/apache/iceberg/actions/runs/17906247731/job/50907838966?pr=14148

Deprecated Gradle features were used in this build, making it incompatible with Gradle 9.0.
Execution failed for task ':iceberg-aws:checkClassUniqueness'.
> Could not resolve all artifacts for configuration ':iceberg-aws:runtimeClasspath'.
   > Could not resolve dev.failsafe:failsafe:3.3.2.
     Required by:
         project :iceberg-aws
      > Could not resolve dev.failsafe:failsafe:3.3.2.
         > Could not get resource 'https://repo.maven.apache.org/maven2/dev/failsafe/failsafe/3.3.2/failsafe-3.3.2.pom'.
            > Could not GET 'https://repo.maven.apache.org/maven2/dev/failsafe/failsafe/3.3.2/failsafe-3.3.2.pom'.
               > Got socket exception during request. It might be caused by SSL misconfiguration
                  > Connection reset

@pvary
Copy link
Contributor

pvary commented Sep 22, 2025

Only some minor comments.

@mxm: Could you please take a look?


OutputFile output = new InMemoryOutputFile();
try (FileAppender<Record> writer =
Parquet.write(Files.localOutput(testFile))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for moving from local file to in memory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #12836, spark and Core have changed their behavior, so I aligned this part.

Comment on lines -121 to +120
ParquetValueReader<?> reader = readersById.get(id);
if (idToConstant.containsKey(id)) {
// containsKey is used because the constant may be null
int fieldMaxDefinitionLevel =
maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel);
reorderedFields.add(
ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel));
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
reorderedFields.add(ParquetValueReaders.position());
} else if (id == MetadataColumns.IS_DELETED.fieldId()) {
reorderedFields.add(ParquetValueReaders.constant(false));
} else if (reader != null) {
reorderedFields.add(reader);
} else if (field.initialDefault() != null) {
reorderedFields.add(
ParquetValueReaders.constant(
RowDataUtil.convertConstant(field.type(), field.initialDefault()),
maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel)));
} else if (field.isOptional()) {
reorderedFields.add(ParquetValueReaders.nulls());
} else {
throw new IllegalArgumentException(
String.format("Missing required field: %s", field.name()));
}
ParquetValueReader<?> reader =
ParquetValueReaders.replaceWithMetadataReader(
id, readersById.get(id), idToConstant, constantDefinitionLevel);
reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, how did you decide to make this refactoring? You consolidate several branches of the if statement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #12836, the core layer integrated the same code between Spark and Core. I noticed that the code for Flink is also the same, so I reused it.

import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RawValueData;
import org.apache.flink.table.data.RowData;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR needs to go against the 2.1 target (currently: 2.0), as #13714 has been merged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we focus on the 2.0.0 version for this PR first, and then backport it to 2.1.0 and 1.20 later?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with both.

@Guosmilesmile
Copy link
Contributor Author

Mark ci fail . https://github.com/apache/iceberg/actions/runs/17911586761/job/50924149718?pr=14148
Error: Exception in thread "main" java.net.SocketException: Connection reset at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:337) at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:364) at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:845) at java.base/java.net.Socket$SocketInputStream.read(Socket.java:978)

@pvary
Copy link
Contributor

pvary commented Sep 23, 2025

@mxm: Any comments?

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@pvary pvary merged commit 6829c3e into apache:main Sep 23, 2025
18 checks passed
@pvary
Copy link
Contributor

pvary commented Sep 23, 2025

Merged to main.
Thanks for adding the columns @Guosmilesmile and @mxm for the review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants