Skip to content

Depend on Spark-3.0.0-palantir.18 (cont'd)#1

Merged
yifeih merged 18 commits intomasterfrom
yh/use-palantir-spark
Mar 13, 2019
Merged

Depend on Spark-3.0.0-palantir.18 (cont'd)#1
yifeih merged 18 commits intomasterfrom
yh/use-palantir-spark

Conversation

@yifeih
Copy link
Copy Markdown
Owner

@yifeih yifeih commented Mar 8, 2019

Working off matt's work here: mccheah#5 (created new PR since I can't push to matt's fork)

@vinooganesh @mccheah @rdblue for SA

mccheah and others added 10 commits February 28, 2019 14:39
* add write encryption codepath

* add reader side

* remove unnecessary field

* add log warn

* fix check

* try a single iterator

* new reader

* addressing comments

* remove unused struct
}

@Override
public org.apache.spark.sql.sources.v2.Table getTable(DataSourceOptions options, StructType schema) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be no need to implement this method. Iceberg tables always have a schema so it makes no sense to supply one. That's for cases like CSV where a schema is required to interpret the data (column names, and types). Otherwise, normal projection works just fine.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you still need it for write? Sort of analogous to the previous getWriter() method? Or does DSV2 expect that the table will already be created since the datasource layer is separate from the catalog layer, which includes table and schema information?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In DSv2, the writer is passed the physical plan and schema, then calls TableCatalog.createTable with that schema. After it creates the table, it uses the instance returned by create.

These methods are called by the DataFrameReader and DataFrameWriter and assume that the table already exists.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing UnsupportedOperationException is correct then, so we're good here.

@yifeih
Copy link
Copy Markdown
Owner Author

yifeih commented Mar 9, 2019

I'm not familiar with parquet, so I'm struggling to debug the TestParquetAvroReader.testCorrectness test which is failing. Here are the things that I've narrowed it down to:

  • It happens on record 77001. In that record, it fails at this assertion: https://github.com/palantir/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java#L87
  • The assertion failed on the base writer for the winter.list.element.wheeze column.
  • Looking at the stacktrace, it fails while calling MessageColumnIO.startGroup() on the MessageGroupIO object associated with winter.list.element. At that point, the groupNullCache is set to 1 for winter.list.element. It's indeed true that record 77000 had a null {} inside the winter.list array as its last element, so this is expected.
  • In the previous record (number 77000), the ColumnWriterBase.writePage() method was called on the writer for winter.list.element.wheeze. That was probably what set the pageRowCount field to 0, causing the assertion to fail.

Currently, I think my confusion is caused by my lack of understanding of what repetitionLevel means and how it's set. I'll pick this up again next week.

@mccheah
Copy link
Copy Markdown

mccheah commented Mar 11, 2019

Let's make the pull from master a separate PR, so that this diff doesn't include things like integrating encryption everywhere.

@yifeih
Copy link
Copy Markdown
Owner Author

yifeih commented Mar 11, 2019

Oh actually, I think this was just a bad merge with some code that we ended up deleting before the final merge of the encryption code. let me clean that up.

public void testCorrectness() throws IOException {
Iterable<Record> records = RandomData.generate(COMPLEX_SCHEMA, 250_000, 34139);
// TODO (yifeih): Change the seed back to 34139 after merging https://github.com/apache/parquet-mr/pull/620
Iterable<Record> records = RandomData.generate(COMPLEX_SCHEMA, 250_000, 34138);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue for SA - we hit another Parquet bug, I think, which is also fixed by apache/parquet-java#620. We don't think the actual upgrades we're doing in this patch are related to the bug. It's very specific - an array is written, the last value in the array is null, the page is written (without the cached null) and then an assertion in Parquet fails with the pageRowCount being 0 but the repetitionLevel being 1 when nulls are flushed from the previous page.

We don't understand the root cause here, but since the problem goes away from applying the above patch that flushes nulls eagerly, I think we're fine here.

Copy link
Copy Markdown

@mccheah mccheah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks great, this is about what we would expect from migrating from the old DSv2 APIs to the newer ones. We'll have some interesting discussions when logical plan and catalog changes come down the line.

The merge conflicts we could foreseeably get don't look as intimidating as I thought they might be. The most code we moved was porting the Reader setup to the ScanBuilder side, so any changes made to that part of the Reader upstream would have to be mirrored in our ScanBuilder. Shouldn't be too problematic.

Schema readSchema) {
return Avro.read(location)
private CloseableIterable<InternalRow> newAvroIterable(InputFile inputFile,
FileScanTask task,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation doesn't have to change.

CloseableIterable<InternalRow> iter;
InputFile location = inputFiles.get(task.file().path().toString());
Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
CloseableIterable<InternalRow> iter;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no need to move this.

import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics;
import org.apache.spark.sql.sources.v2.reader.*;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't wildcard import here.

private final FileIO fileIo;
private final Map<String, InputFile> inputFiles;

private final Iterator<FileScanTask> tasks;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to move this. In general let's try to avoid moving fields around here - since this is a fork from the upstream codebase we really want to avoid merge conflicts as much as we can.


public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister {
public class IcebergSource implements
TableProvider,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can be on the same line as public class IcebergSource implements

}

@Override
public org.apache.spark.sql.sources.v2.Table getTable(DataSourceOptions options, StructType schema) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing UnsupportedOperationException is correct then, so we're good here.

}

@Override
public WriteBuilder mode(SaveMode mode) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should just say that we don't support SaveMode here, since we're specifically checking that only appending is allowed anyways. We'll discuss how to get Iceberg connected to table catalogs and the V2 logical plans when the time comes.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok yup, the reason I did that was because some tests relied on it, but I can update those tests too.



private static class IcebergWriterBuilder implements WriteBuilder,
SupportsSaveMode {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should just say that we don't support SaveMode here, since we're specifically checking that only appending is allowed anyways. We'll discuss how to get Iceberg connected to table catalogs and the V2 logical plans when the time comes.

.toUpperCase(Locale.ENGLISH));
}

public void setFileFormat(String format) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just make FileFormat final, and in the caller of the constructor, check the option and default to Parquet if it is not present.

import com.netflix.iceberg.types.TypeUtil;
import com.netflix.iceberg.types.Types;
import org.apache.commons.lang.SerializationUtils;
import org.apache.commons.lang3.SerializationUtils;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we need to use commons-lang3? Not entirely opposed to it, just wondering if we can minimize the diff just a tiny bit.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm the original import doesn't seem to be part of the natural dependency tree anymore :/

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine then, actually we probably want to be exclusively using lang3 in upstream as well. We'll catch this if we introduce Baseline and linting has a rule for it.

import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import javax.annotation.processing.SupportedOptions;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use this import

import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
import org.apache.spark.sql.sources.v2.SupportsBatchRead;
import org.apache.spark.sql.sources.v2.TableProvider;
import org.apache.spark.sql.sources.v2.reader.*;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use wildcard import

Copy link
Copy Markdown

@mccheah mccheah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine despite the SaveMode comment elsewhere - we can figure out how we deal with SaveMode when upstream does.

@mccheah
Copy link
Copy Markdown

mccheah commented Mar 12, 2019

But I can't merge PRs on your fork so feel free to merge yourself =P

@yifeih yifeih merged commit 71ead9d into master Mar 13, 2019
yifeih pushed a commit that referenced this pull request Apr 16, 2019
* Integrate encryption into datasource

* add write encryption codepath

* add reader side

* remove unnecessary field

* add log warn

* fix check

* try a single iterator

* new reader

* addressing comments

* remove unused struct

* Begin upgrading Spark

* Revert "Begin upgrading Spark"

This reverts commit f8ee9cd.

* Revert "Revert "Begin upgrading Spark""

This reverts commit 0cbf39b.

* writer implementation migrated

* read side

* simplify writer builder

* oops

* welp everything works except parquet avro correctness

* delete vestigial encryption code

* change seed

* delete vestigial encryption code

* try again

* some cleanups

* address comments and eliminate some diffs

* delete some more stuff
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants