Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,9 @@ public static <D> FileReader<D> openReader(SeekableInput in, DatumReader<D> read
length -= bytesRead;
offset += bytesRead;
}
in.seek(0);

if (Arrays.equals(MAGIC, magic)) // current format
return new DataFileReader<>(in, reader);
return new DataFileReader<>(in, reader, magic);
if (Arrays.equals(DataFileReader12.MAGIC, magic)) // 1.2 format
return new DataFileReader12<>(in, reader);

Expand Down Expand Up @@ -111,7 +110,7 @@ public static <D> DataFileReader<D> openReader(SeekableInput in, DatumReader<D>
* <pre/>
*/
public DataFileReader(File file, DatumReader<D> reader) throws IOException {
this(new SeekableFileInput(file), reader, true);
this(new SeekableFileInput(file), reader, true, null);
}

/**
Expand All @@ -128,15 +127,20 @@ public DataFileReader(File file, DatumReader<D> reader) throws IOException {
* <pre/>
*/
public DataFileReader(SeekableInput sin, DatumReader<D> reader) throws IOException {
this(sin, reader, false);
this(sin, reader, false, null);
}

private DataFileReader(SeekableInput sin, DatumReader<D> reader, byte[] magic) throws IOException {
this(sin, reader, false, magic);
}

/** Construct a reader for a file. Please close resource files yourself. */
protected DataFileReader(SeekableInput sin, DatumReader<D> reader, boolean closeOnError) throws IOException {
protected DataFileReader(SeekableInput sin, DatumReader<D> reader, boolean closeOnError, byte[] magic)
throws IOException {
super(reader);
try {
this.sin = new SeekableInputStream(sin);
initialize(this.sin);
initialize(this.sin, magic);
blockFinished();
} catch (final Throwable e) {
if (closeOnError) {
Expand All @@ -153,7 +157,7 @@ protected DataFileReader(SeekableInput sin, DatumReader<D> reader, boolean close
protected DataFileReader(SeekableInput sin, DatumReader<D> reader, Header header) throws IOException {
super(reader);
this.sin = new SeekableInputStream(sin);
initialize(this.sin, header);
initialize(header);
}

/**
Expand All @@ -180,7 +184,7 @@ public void sync(final long position) throws IOException {
seek(position);
// work around an issue where 1.5.4 C stored sync in metadata
if ((position == 0L) && (getMeta("avro.sync") != null)) {
initialize(sin); // re-init to skip header
initialize(sin, null); // re-init to skip header
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public DataFileReader12(SeekableInput sin, DatumReader<D> reader) throws IOExcep
this.in = new DataFileReader.SeekableInputStream(sin);

byte[] magic = new byte[4];
in.seek(0); // seek to 0 to read magic header
in.read(magic);
if (!Arrays.equals(MAGIC, magic))
throw new InvalidAvroMagicException("Not a data file.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private Header() {
*/
public DataFileStream(InputStream in, DatumReader<D> reader) throws IOException {
this.reader = reader;
initialize(in);
initialize(in, null);
}

/**
Expand All @@ -97,18 +97,30 @@ protected DataFileStream(DatumReader<D> reader) throws IOException {
this.reader = reader;
}

/** Initialize the stream by reading from its head. */
void initialize(InputStream in) throws IOException {
this.header = new Header();
this.vin = DecoderFactory.get().binaryDecoder(in, vin);
byte[] readMagic() throws IOException {
if (this.vin == null) {
throw new IOException("InputStream is not initialized");
}
byte[] magic = new byte[DataFileConstants.MAGIC.length];
try {
vin.readFixed(magic); // read magic
} catch (IOException e) {
throw new IOException("Not an Avro data file.", e);
}
return magic;
}

void validateMagic(byte[] magic) throws InvalidAvroMagicException {
if (!Arrays.equals(DataFileConstants.MAGIC, magic))
throw new InvalidAvroMagicException("Not an Avro data file.");
}

/** Initialize the stream by reading from its head. */
void initialize(InputStream in, byte[] magic) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry I should have been more complete last time. Can we make these two initialize functions be private, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataFileReader extends this class and making it "private" would cause issue in accessing this method from DataFileReader.

this.header = new Header();
this.vin = DecoderFactory.get().binaryDecoder(in, vin);
magic = (magic == null) ? readMagic() : magic;
validateMagic(magic);

long l = vin.readMapStart(); // read meta data
if (l > 0) {
Expand All @@ -134,7 +146,7 @@ void initialize(InputStream in) throws IOException {
}

/** Initialize the stream without reading from it. */
void initialize(InputStream in, Header header) throws IOException {
void initialize(Header header) throws IOException {
this.header = header;
this.codec = resolveCodec();
reader.setSchema(header.schema);
Expand Down