Skip to content

Conversation

@stevenzwu
Copy link
Contributor

@stevenzwu stevenzwu commented Feb 24, 2023

this closes issue #1698.

There are two motivations as described by issue #1698.

  1. provide a more stable serialization (than Java serialization) for Flink checkpoint
  2. can be used by REST catalog for scan planning or committing files

/**
* Return the schema for this file scan task.
*/
default Schema schema() {
Copy link
Contributor Author

@stevenzwu stevenzwu Feb 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed so that FileScanTaskParser (added in this PR) can serialize the schema. Then during the deserialization part, schema can be pass into the constructor of BaseFileScanTask.

Keep it at this level (not base ContentScanTask interface or lower) to limit the scope of change.

return file;
}

protected Schema schema() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exposed as protected so that BaseFileScanTask can use it to implement the FileScanTask#schema() method

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Little odd that we reverse engineer the schema from the string here, but seems like the most backwards compatible thing we can do here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree it is a little odd. On the other hand, partition spec is in the same model in this class. As you said, otherwise we would have to change the constructors of a bunch of classes. The current choice of passing schema and spec as strings is to make those scan tasks serializable.

  @Override
  public PartitionSpec spec() {
    if (spec == null) {
      synchronized (this) {
        if (spec == null) {
          this.spec = PartitionSpecParser.fromJson(schema(), specString);
        }
      }
    }
    return spec;
  }

cc @nastra

import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.JsonUtil;

class ContentFileParser {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since DataFile and DeleteFile has the same structure, calling this ContentFileParser without any generic type.

private ByteBuffer keyMetadata = null;
private List<Long> splitOffsets = null;
private List<Integer> equalityFieldIds = null;
private Integer sortOrderId = SortOrder.unsorted().orderId();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relocated the line here to follow the same order of definition

private Map<Integer, ByteBuffer> upperBounds = null;
private ByteBuffer keyMetadata = null;
private List<Long> splitOffsets = null;
private List<Integer> equalityFieldIds = null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a setter for equalityFieldIds so that the parser unit test can cover this field too.


private final PartitionSpec spec;

ContentFileParser(PartitionSpec spec) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike other JSON parser with a static singleton pattern, ContentFileParser depends on the partition spec. Hence this is a regular class and constructor.

Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did a high-level pass over the parsers themselves and left a few comments. I haven't had a chance to look closer at the tests yet

Copy link
Contributor Author

@stevenzwu stevenzwu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nastra thx a lot for the initial review. I addressed the comments in the latest commit

@stevenzwu stevenzwu force-pushed the issue-1698-split-json branch from a8062a7 to 4d57100 Compare April 5, 2023 02:41
Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the late re-review @stevenzwu, I've left a few more comments.

Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been mainly focusing on the JSON parsers and left a few comments, but overall this looks almost ready. It would be great to get some additional input from another reviewer

import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class TestContentFileParser {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to also add a test with a plain JSON string to see how the full JSON looks like. And then maybe also another test with a plain JSON string where all optional fields (metrics, equality field ids, sort order id, split offsets, ...) are missing


JsonNode pNode = node.get(property);
Preconditions.checkArgument(
pNode.isTextual(), "Cannot parse from non-text value: %s: %s", property, pNode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe we should mention that we're trying to parse this from text to a binary representation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also fixed a couple other error msgs with the same problem.

@stevenzwu
Copy link
Contributor Author

Spark CI build failed with some seemingly env problem

        Caused by:
        java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwxr-xr-x
            at org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:724)
            at org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:654)
            at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:586)
            at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:548)
            at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:174)
            at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:129)
            at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
            at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
            at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
            at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
            at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:293)
            at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:492)
            at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:352)
            at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:71)
            at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:70)
            at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:224)
            at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
            at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102)

@stevenzwu stevenzwu force-pushed the issue-1698-split-json branch from 61e40a7 to 0016f36 Compare June 24, 2023 03:01
@stevenzwu stevenzwu force-pushed the issue-1698-split-json branch from a465d34 to 8105811 Compare June 25, 2023 14:35
@stevenzwu
Copy link
Contributor Author

merging after rebase

@stevenzwu stevenzwu merged commit b8db3f0 into apache:master Jun 26, 2023
@puchengy
Copy link
Contributor

puchengy commented Aug 22, 2025

@stevenzwu we are seeing Trino OOM issue during scan planning and it might be because we introduce table schema to each file scan task in this PR. The issue happens in conjunction with very wide table schema and ParallelIterable usage. I wonder your thought on this? One possible way is to store schema id instead of actual schema to save memory usage.

LMK if this is the right place to discuss or we can move somewhere else.

@stevenzwu
Copy link
Contributor Author

@puchengy please create a new issue to track and discuss this problem.

Agree with the overhead of serializing the schema for scan task. If we were to just serialize schema id, the serializer would need to get hold of the schemas. It would require major refactoring of the call stack to allow pass-in. At that time, we opted into the simpler approach. But we can discuss the alternative.

Does Trino use the JSON parser for file scan task?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants