Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Managed IO for Iceberg cannot query by partition #33497

Closed
2 of 17 tasks
saathwik-tk opened this issue Jan 5, 2025 · 13 comments · Fixed by #33549
Closed
2 of 17 tasks

[Bug]: Managed IO for Iceberg cannot query by partition #33497

saathwik-tk opened this issue Jan 5, 2025 · 13 comments · Fixed by #33549

Comments

@saathwik-tk
Copy link

saathwik-tk commented Jan 5, 2025

What happened?

Lets say, I have field1 (string type) as my partition field, and there are many other fields, field2, field3, field4....
Querying the data with " select * from iceberg_table where field1 = 'field1_example_value'; " returns no data
but when querying with " select * from iceberg_table where field2 = 'field2_example_value'; " returns the data as expected.
I have ingested the data through Managed IO of Iceberg,
pipeline.apply(Managed.write(Managed.ICEBERG).withConfig(config_map);
Please have a look into this issue.

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@damccorm
Copy link
Contributor

damccorm commented Jan 6, 2025

@ahmedabu98 any ideas here?

@damccorm damccorm added P2 and removed P1 labels Jan 6, 2025
@ahmedabu98
Copy link
Contributor

Hey @saathwik-tk, can you provide a little more context?

  • which catalog are you using?
  • what does your write configuration look like?
  • did you create the Iceberg table beforehand?

Also a reproducible example would be helpful to find an RCA

@saathwik-tk
Copy link
Author

saathwik-tk commented Jan 8, 2025

Hi @ahmedabu98 ,
I am using hive catalog, for which hive metastore running at port 9083,

Yes, I have created the iceberg table beforehand with the query,
CREATE TABLE catalog_name.schema_name.table_name( field1 varchar, field2 varchar... )
WITH (format = 'PARQUET', format_version = 2, location = 's3a://bucket_name/storage_path', partitioning = ARRAY['field1'])
I have created this by using Trino Iceberg Connector, referring to https://trino.io/docs/current/connector/iceberg.html

and my configuration map looks as below,

Map<String,Object> configProperties = ImmutableMap.<String,Object>builder()
.put("io-impl","org.apache.iceberg.aws.s3.S3FileIO")
.put("catalog-impl","org.apache.iceberg.hive.HiveCatalog")
// few s3 configurations as required
.build();
Map<String, Object> catalogConfig = ImmutableMap.<String, Object>builder()
.put("uri","my-thrift-uri:my-port-no")
.put("warehouse","my-s3-warehouse-path")
.put("type", "hive")
.build();
ImmutableMap<String, Object> config_map = ImmutableMap.<String, Object>builder()
.put("table", "iceberg.iceberg_table")
.put("catalog_name", "iceberg_catalog")
.put("catalog_properties", catalogConfig)
.put("config_properties",configProperties)
.build();

@ahmedabu98
Copy link
Contributor

I've been trying to reproduce it but haven't gotten something yet.
Are you running this on a specific Beam version? or against HEAD?

@ahmedabu98
Copy link
Contributor

ahmedabu98 commented Jan 9, 2025

Nevermind I got a reproduction:

Code snippet
public static void main(String[] args) throws IOException {
  String project = "apache-beam-testing";
  String BQMS_CATALOG = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog";
  String warehouse = "gs://ahmedabualsaud-apache-beam-testing/iceberg-testing";
  Catalog catalog =
      CatalogUtil.loadCatalog(
          BQMS_CATALOG,
          "bqms_catalog",
          ImmutableMap.<String, String>builder()
              .put("gcp_project", project)
              .put("gcp_location", "us-central1")
              .put("warehouse", warehouse)
              .build(),
          new Configuration());

  Schema schema = Schema.builder().addInt64Field("long").addStringField("str").build();
  org.apache.iceberg.Schema iceSchema = IcebergUtils.beamSchemaToIcebergSchema(schema);
  PartitionSpec spec = PartitionSpec.builderFor(iceSchema).identity("str").build();
  String tableId = "ahmedabualsaud.my_table_with_spec";
  TableIdentifier identifier = TableIdentifier.parse(tableId);
  Table table = catalog.createTable(identifier, iceSchema, spec);
  List<Row> rows =
      LongStream.range(0, 1000)
          .mapToObj(l -> Row.withSchema(schema).addValues(l, "value_" + l).build())
          .collect(Collectors.toList());
  
  Pipeline p = Pipeline.create();
  p.apply(Create.of(rows))
      .setRowSchema(schema)
      .apply(
          Managed.write(Managed.ICEBERG)
              .withConfig(
                  ImmutableMap.of(
                      "table",
                      tableId,
                      "catalog_properties",
                      ImmutableMap.of(
                          "catalog-impl", BQMS_CATALOG,
                          "gcp_project", project,
                          "gcp_location", "us-central1",
                          "warehouse", warehouse))));
  p.run().waitUntilFinish();
}

I get no results when executing this query in BigQuery:

SELECT * FROM `apache-beam-testing.ahmedabualsaud.my_table_with_spec` where str = "value_811"

This works however:

SELECT * FROM `apache-beam-testing.ahmedabualsaud.my_table_with_spec` where str like "value_811"

@ahmedabu98
Copy link
Contributor

ahmedabu98 commented Jan 9, 2025

But while investigating, I found a bigger problem. Most records contain wrong/duplicate values for the partitioned column. For example, the above query gives me this result:
image
When actually I would only expect one result: {long=811, str=value_811}

I checked the warehouse and the str=value_811 partition has only one datafile. Directly reading this one datafile gives me the expected record {long=811, str=value_811}:

Code snippet
InputFile inputFile =
    table
        .io()
        .newInputFile(
            "gs://ahmedabualsaud-apache-beam-testing/iceberg-testing/ahmedabualsaud.db/my_table_spec_9/data/str=value_811/b1f74ef8-958e-4289-9cb6-39c060ec0bc9_b4d6a26f-f2f8-4bd6-ab62-2b7d4a9db22f_1.parquet");

CloseableIterable<Record> iterable =
    Parquet.read(inputFile)
        .project(table.schema())
        .createReaderFunc(
            fileSchema -> GenericParquetReaders.buildReader(table.schema(), fileSchema))
        .build();
for (Record rec : iterable) {
  System.out.println("xxx rec: " + rec);
}

Another case:
The following query gives me no results:

SELECT * FROM `apache-beam-testing.ahmedabualsaud.my_table_with_spec` where str like "value_812"

But when I manually read the datafile under partition str=value_812, I find the correct record: {long=812, str=value_812}

@ahmedabu98
Copy link
Contributor

So it looks like we're writing the correct records to the correct partitions, but we get the wrong results when reading it back...

I can see the issue even when reading it with the Beam Iceberg connector:

Code snippet
p
    .apply(
        Managed.read(Managed.ICEBERG)
            .withConfig(
                ImmutableMap.of(
                    "table",
                    tableId,
                    "catalog_properties",
                    ImmutableMap.of(
                        "catalog-impl", BQMS_CATALOG,
                        "gcp_project", project,
                        "gcp_location", "us-central1",
                        "warehouse", warehouse))))
    .getSinglePCollection()
    .apply(
        MapElements.into(TypeDescriptors.nulls())
            .via(
                r -> {
                  long l = r.getInt64("long");
                  String s = r.getString("str");
                  if (!s.contains(String.valueOf(l))) {
                    System.out.println("WEIRD! " + r);
                  }
                  return null;
                }));
p.run().waitUntilFinish();
WEIRD! Row: 
long:751
str:value_811

@kennknowles
Copy link
Member

I may have missed it, but what is the result of SELECT * FROM apache-beam-testing.ahmedabualsaud.my_table_with_spec` ?

I'm trying to distinguish which of these is true, and maybe I missed it but I don't see the answer here:

  • the expected data is returned only in some circumstances, and is not returned when filtered to the expected partition
  • the expected data is never returned even though it is in the data file

@ahmedabu98
Copy link
Contributor

It's the latter of those

@kennknowles
Copy link
Member

OK that's probably the simpler case. It sounds like that data file isn't "part of the table".

@kennknowles kennknowles added this to the 2.62.0 Release milestone Jan 9, 2025
@ahmedabu98
Copy link
Contributor

ahmedabu98 commented Jan 9, 2025

Okay when I read datafiles like this, I do see the bad data:

Code snippet
FileIO io = table.io();
EncryptionManager encryptionManager = table.encryption();
TableScan scan = table.newScan();
for (CombinedScanTask combinedScanTask : scan.planTasks()) {
  InputFilesDecryptor decryptor =
      new InputFilesDecryptor(combinedScanTask, io, encryptionManager);
  for (FileScanTask fileScanTask : combinedScanTask.tasks()) {
    Map<Integer, ?> idToConstants =
        constantsMap(fileScanTask, IdentityPartitionConverters::convertConstant, iceSchema);
    InputFile inputFile = decryptor.getInputFile(fileScanTask);
    CloseableIterable<Record> iterable =
        Parquet.read(inputFile)
            .split(fileScanTask.start(), fileScanTask.length())
            .project(table.schema())
            .createReaderFunc(
                fileSchema ->
                    GenericParquetReaders.buildReader(
                        table.schema(), fileSchema, idToConstants))
            .filter(fileScanTask.residual())
            .build();
    for (Record rec : iterable) {
      System.out.println("xxx " + rec);
    }
  }
}

...
private static Map<Integer, ?> constantsMap(
    FileScanTask task,
    BiFunction<Type, Object, Object> converter,
    org.apache.iceberg.Schema schema) {
  PartitionSpec spec = task.spec();
  Set<Integer> idColumns = spec.identitySourceIds();
  org.apache.iceberg.Schema partitionSchema = TypeUtil.select(schema, idColumns);
  boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty();

  if (projectsIdentityPartitionColumns) {
    return PartitionUtil.constantsMap(task, converter);
  } else {
    return Collections.emptyMap();
  }
}

The key difference being the additional idToConstants passed to the reader. I noticed this was added to our Iceberg source recently in #33332, and AFAICT it replaces the record's column value with the stored partition value.

So the datafile is indeed part of the table but the partition value takes precedence. Seems that we are probably setting the wrong partition value.

@ahmedabu98
Copy link
Contributor

ahmedabu98 commented Jan 9, 2025

Ahh the problem is that we're using a single PartitionKey object to resolve partitions, and using the same object to create writers.

What ends up happening is a race condition on which partition we resolve to by the end of the bundle. That partition value gets set for all writers in that bundle.

Instead, we should be making one copy of partitionKey for each new writer.

@ahmedabu98
Copy link
Contributor

@saathwik-tk I opened a PR to fix this in #33549. I ran another set of questions and from what I can tell it also fixes your original issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants