Skip to content

Conversation

@manishmalhotrawork
Copy link
Contributor

for #280.

@rdblue can you please review.
Raising as WIP PR, as this might need some changes.

Summary: parsing manifest-schema json to find out the partition-field_ids and initializing PartitionSpec based on last ID (if available ) other 1000.

@manishmalhotrawork
Copy link
Contributor Author

build is failed because of iceberg-spark:test Hive related.
Is there a way to find detail logs of the failed tests.
All tests are failed with this error.
Caused by: org.apache.thrift.transport.TTransportException at TestIcebergSourceHiveTables.java:152

So, possibly of intermittent error, because Metastore didnt start.
And we ran just re-run the PR build.

In local all tests ran fine.

@manishmalhotrawork
Copy link
Contributor Author

finally got the good PR build.

@manishmalhotrawork
Copy link
Contributor Author

@rdblue can you please check this one, thanks !

private static final String FIELD_ID = "field-id";

private PartitionSpecParser() {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove any non-functional changes, like this one that moves the private constructor.

return builder.build();
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid adding extra newlines. These can cause avoidable commit conflicts.

@rdblue
Copy link
Contributor

rdblue commented Oct 9, 2019

It looks like this is trying to assign the same IDs for a spec each time it is created, but I think the approach should be to assign IDs to each field in a spec. The JSON serialization should be updated to parse an ID for each field. That's a good place to start, just adding the ability to track an ID for each partition field

@manishmalhotrawork
Copy link
Contributor Author

manishmalhotrawork commented Oct 10, 2019

@rdblue thanks.

It looks like this is trying to assign the same IDs for a spec each time it is created, but I think the approach should be to assign IDs to each field in a spec.

PartitionSpecParser.getLastPartitionField(JsonNode manifestSchemaJson) is parsing the old PartitionSpec manifestSchema to find the max id from the field list. and this Id will be used as the init Id if new PartitionSpec is added. My understanding was to use the persistent partition field_id, and then use +1 of it for next partitionSpec. So that 2 partitionSpec will not have same partition_field Id.

The JSON serialization should be updated to parse an ID for each field. That's a good place to start, just adding the ability to track an ID for each partition field

I believe you meant when Avro file is created using AvroFileAppender?

@rdblue
Copy link
Contributor

rdblue commented Oct 10, 2019

this Id will be used as the init Id if new PartitionSpec is added

This is assigning an ID. Those IDs should be statically assigned when the partition spec is created the first time and stored with the field information when it is serialized. The highest assigned ID in any spec should be kept in table-level metadata, like the lastColumnId property.

I believe you meant when Avro file is created using AvroFileAppender?

No; JSON serialization of a partition spec should encode the IDs. That gets put into file metadata, but the main thing is to add field IDs to partition fields.

@manishmalhotrawork
Copy link
Contributor Author

this Id will be used as the init Id if new PartitionSpec is added

This is assigning an ID. Those IDs should be statically assigned when the partition spec is created the first time and stored with the field information when it is serialized. The highest assigned ID in any spec should be kept in table-level metadata, like the lastColumnId property.

Ok, I started that way, but currently PartitionSpec doesn't store the partitionFieldId ( 1000+) but stores the schema fieldId ( 1+) which is real schema id, It stores the partitionFieldId in the avro-schema of the file. Also the schema field Ids starts from 0, when converted from say from spark schema, or assigned manually when created the NestedField as the columns of the schema.

So, in case of partitionFieldId also, when its firstTime newMetadata is created we can assign, but next time it should be part of the PartitionSpec Object.

just to verify the partitionFieldIds, this is the manifest file schema ( partial till has partition field-id )

  "type": "record",
  "name": "manifest_entry",
  "fields": [
    {
      "name": "status",
      "type": "int",
      "field-id": 0
    },
    {
      "name": "snapshot_id",
      "type": "long",
      "field-id": 1
    },
    {
      "name": "data_file",
      "type": {
        "type": "record",
        "name": "r2",
        "fields": [
          {
            "name": "file_path",
            "type": "string",
            "field-id": 100
          },
          {
            "name": "file_format",
            "type": "string",
            "field-id": 101
          },
          {
            "name": "partition",
            "type": {
              "type": "record",
              "name": "r102",
              "fields": [
                **{
                  "name": "data_bucket",
                  "type": [
                    "null",
                    "int"
                  ],
                  "default": null,
                  "field-id": 1000
                }
              ]
            },**
            "field-id": 102
          }```

@rdblue
Copy link
Contributor

rdblue commented Oct 11, 2019

Yeah, the first step is to add a partition field ID in addition to the existing source field ID.

@manishmalhotrawork
Copy link
Contributor Author

manishmalhotrawork commented Oct 11, 2019

Yeah, the first step is to add a partition field ID in addition to the existing source field ID.

Cool. then we also have to also handle cases where table is created with old way (partition-field if in ) and not adding partitionSpec to that table?

@rdblue
Copy link
Contributor

rdblue commented Oct 11, 2019

then we also have to also handle cases where table is created with old way (partition-field if in ) and not adding partitionSpec to that table?

Yes. In that case, we can get IDs by assigning the same way they would be in the method that returns the partition schema.

@manishmalhotrawork
Copy link
Contributor Author

thanks @rdblue for giving more details !

Please see, with last few commits. I'm trying to add partitionFieldId to table-metadata.
So, please review if this approach make sense to you, then will add more test-cases as well.

  1. add partitionFieldId to PartitionField, and to be part of the table-metadata.
  2. providing that value statically, and keeping the lastPartitionFieldId at the TableMetadata level like lastColumnId.
  3. also tried to handle the cases where pre-existing tables will not have partitionFieldId in the schema. so keeping same logic of giving 1000+ as the value.
  4. able to run all test cases in local.

Type sourceType = schema.findType(field.sourceId());
Type resultType = field.transform().getResultType(sourceType);
// assign ids for partition fields starting at PARTITION_DATA_ID_START to leave room for data file's other fields
// assign ids for partition fields starting at 1000 to leave room for data file's other fields
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer assigning IDs, so the comment can be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me take care.

private final Transform<?, ?> transform;

PartitionField(int sourceId, String name, Transform<?, ?> transform) {
PartitionField(int sourceId, int partitionFieldId, String name, Transform<?, ?> transform) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name partitionFieldId is redundant because the class is PartitionField.

Let's use the same convention that is used in types. The PartitionField should have an id instance variable that is accessed by a fieldId method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

.add("value_counts", "null_value_counts", "lower_bounds", "upper_bounds")
.build();
private static final String PARTITION_SPEC = "partition-spec";
private static final String SCHEMA = "schema";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes aren't functional. Can you please remove them?

private static final String SPEC_ID = "spec-id";
private static final String FIELDS = "fields";
private static final String SOURCE_ID = "source-id";
private static final String PARTITION_FIELD_ID = "partition-field-id";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "field-id" is fine here. The "partition" part is clear from context.

@Override
public void commit() {
TableMetadata update = applyChangesToMapping(base.updateSchema(apply(), lastColumnId));
TableMetadata update = applyChangesToMapping(base.updateSchema(apply(), lastColumnId, lastPartitionFieldId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the last field ID passed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, its not required.
As its a schema update, and ideally we dont need keep lastPartitionFieldId at the schemaUpdate level.

}

public int lastPartitionFieldId() {
return lastPartitionFieldId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These names are good.

@manishmalhotrawork
Copy link
Contributor Author

thanks @rdblue.
realized one more thing, I believe we also need to reuse the partitionFieldId ?
by checking if the column already exist in the old partitionSpecs , so that it can reuse the same partitionFieldId ?
For that we have to maintain a Map<String, Integer> as partitionFieldIdByColumnName in the TableMetadata so that, when new partitionSpec is updated, it can find the existing one, and reuse . the fieldId.

@rdblue
Copy link
Contributor

rdblue commented Oct 18, 2019

Yes, we do want to reuse the fields across specs. We might want to make equality ignore the field ID for this purpose.

@manishmalhotrawork
Copy link
Contributor Author

manishmalhotrawork commented Oct 19, 2019

@rdblue please see, updated PR to reusing field-id .
one downside I see is, have to iterate the PartitionSpec two times, otherwise have to initialize two maps from same method.
TableMetadata.indexSpecs and TableMetadata.indexPartitionFieldIdByColumnName are the ones. Though this will happen, only at the time of creating a new TableMetadata.
Please share your thoughts.

@manishmalhotrawork
Copy link
Contributor Author

@rdblue it would be helpful, if you check this. thanks !

@manishmalhotrawork
Copy link
Contributor Author

@rdblue it would be helpful, if you review this. thanks !

public class PartitionSpec implements Serializable {
// start assigning IDs for partition fields at 1000
private static final int PARTITION_DATA_ID_START = 1000;
public static final int PARTITION_DATA_ID_START = 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be public or can it be package-private?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, why remove the comment?

return false;
}

// not considering field id, as field-id will be reused.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ID will be reused, but assignment is consistent because we assume that partition specs are not modified before the addition of partition field IDs. That means that tables start with only one spec that might not have IDs. Because we assign incrementally, IDs will always match when assigned using the default (1000, 1001, etc.).

Because we do have consistent IDs, I think this should check field ID here.

return false;
}

// not considering field id, as field-id will be reused.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: adding this comment removed a spacing line. Could you add it back?

this.schema = schema;
}

private int incrementAndGetPartitionFieldId() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about nextFieldId? That's a much shorter name, but is still descriptive.

} else {
partitionFieldId = partitionFieldId + 1;
}
builder.add(sourceId, partitionFieldId, name, transform);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems odd that partitionFieldId is incremented from the last value when missing. If a spec has one missing field ID, then it will be assigned based on the previous field's ID. I don't think this would cause problems because we expect either all fields to have assigned IDs, or no fields to have them.

I'd prefer to keep the logic for those cases separate to make this easier to follow. It isn't a good practice to rely on a hidden assumption that either all fields have ids or none do.

}

private static Map<String, Integer> indexPartitionFieldIdByColumnName(List<PartitionSpec> specs) {
Map<String, Integer> result = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Maps.newHashMap instead of instantiating one directly.


executorService.shutdown();
Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES));
Assert.assertTrue("Timeout", executorService.awaitTermination(5, TimeUnit.MINUTES));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these changes are still needed.

static final String SNAPSHOT_ID = "snapshot-id";
static final String TIMESTAMP_MS = "timestamp-ms";
static final String SNAPSHOT_LOG = "snapshot-log";
static final String FIELDS = "fields";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used?

List<PartitionField> fields = specs.get(specs.size() - 1).fields();
if (fields.size() > 0) {
// get the last lastPartitionFieldId
lastAssignedPartitionFieldId = fields.get(fields.size() - 1).fieldId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of getting the last ID, I think this should just keep track of the last assigned partition field ID, like the last-column-id. How about storing it as last-partition-id?

// increment and assign new id, if this column_transform has not used in partition yet.
(partitionFieldIdByColumnName == null) ? nextPartitionFieldId.incrementAndGet()
: ((partitionFieldIdByColumnName.containsKey(field.name())) ? partitionFieldIdByColumnName.get(field.name())
: nextPartitionFieldId.incrementAndGet()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is difficult to read nested ternary expressions. I recommend avoiding that pattern.


return specBuilder.build();
PartitionSpec freshSpec = specBuilder.build();
return freshSpec;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this change is unnecessary.

// get a fresh spec to ensure the spec ID is set to the new default
builder.add(freshSpec(newDefaultSpecId, schema, newPartitionSpec));
PartitionSpec freshSpec = freshSpecWithAssignIds(newDefaultSpecId, schema, schema, newPartitionSpec,
nextPartitionFieldId, partitionFieldIdByColumnName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should work like updateSchema, where a different class is responsible for reassigning IDs. The TableMetadata class should validate consistency and help with tracking (like the snapshot log) but it shouldn't modify other objects that are passed in, like schemas, snapshots, and partition specs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue
are we referring to this
Schema freshSchema = TypeUtil.assignFreshIds(schema, lastColumnId::incrementAndGet);

as TypeUtil.assignFreshIds assign id to the schema ?

PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema)
private static PartitionSpec freshSpecWithAssignIds(int specId, Schema newSchema, Schema oldSchema,
PartitionSpec partitionSpec, AtomicInteger nextPartitionFieldId,
Map<String, Integer> partitionFieldIdByColumnName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that it is necessary to have this method here. The freshSpec method just ensures that all specs have the correct schema associated.

PartitionSpec freshSpec = specBuilder.build();
AtomicInteger lastPartitionFieldId = new AtomicInteger(PartitionSpec.PARTITION_DATA_ID_START - 1);
PartitionSpec freshSpec = freshSpecWithAssignIds(INITIAL_SPEC_ID, freshSchema, schema, spec, lastPartitionFieldId,
null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this should assign fresh IDs to the partition spec fields, but you can just add the ID to the existing code.

Also, if you are using an AtomicInteger, you can use getAndIncrement to avoid needing to subtract 1 from the ID starting point.

@rdblue
Copy link
Contributor

rdblue commented Dec 23, 2019

@manishmalhotrawork, I've added review comments. Sorry I wasn't able to get back to this sooner!

@manishmalhotrawork
Copy link
Contributor Author

@rdblue np, thanks for reviewing !

May be I'll raise a new PR with the required changes, it would be cleaner.

@rdblue
Copy link
Contributor

rdblue commented Jan 7, 2020

Thanks for working on it, @manishmalhotrawork. If you do open a new PR, please remember to close this one. Up to you which one you want to do.

@jun-he
Copy link
Collaborator

jun-he commented Mar 3, 2020

@manishmalhotrawork
Can you let me know if you are still working on it? If you are busy, I will continue this work and finish it. Thanks.

@rdblue
Copy link
Contributor

rdblue commented Apr 6, 2020

I'm closing this because it has been picked up as #845.

@rdblue rdblue closed this Apr 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants