Skip to content

Conversation

@nsivabalan
Copy link
Contributor

@nsivabalan nsivabalan commented Jan 28, 2021

What is the purpose of the pull request

*Fixing detection of GCS FileSystem

Brief change log

  • Added SchemeAwareFSDataInputStream to assist in special handling of seek for GCSFileSystem

Verify this pull request

Fix is applicable only to GCS File System. Going to rely on customer to test it out.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@vinothchandar
Copy link
Member

cc @vburenin could you please review this as well?

@vinothchandar vinothchandar self-assigned this Jan 29, 2021
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to rethink this. THis opens doors, that we dont want to open :)

@vburenin
Copy link
Contributor

This gives me a big pain right now. Will try to monitor this PR as close as possible.

@nsivabalan nsivabalan force-pushed the GCSFileSystemFix branch 2 times, most recently from 3111964 to 992c72a Compare February 10, 2021 18:35
@nsivabalan
Copy link
Contributor Author

@bvaradar @vinothchandar : you can review the patch.

@codecov-io
Copy link

codecov-io commented Feb 10, 2021

Codecov Report

Merging #2500 (5af6449) into master (a2f85d9) will increase coverage by 18.57%.
The diff coverage is n/a.

Impacted file tree graph

@@              Coverage Diff              @@
##             master    #2500       +/-   ##
=============================================
+ Coverage     50.90%   69.48%   +18.57%     
+ Complexity     3167      364     -2803     
=============================================
  Files           433       53      -380     
  Lines         19806     1963    -17843     
  Branches       2032      235     -1797     
=============================================
- Hits          10083     1364     -8719     
+ Misses         8904      466     -8438     
+ Partials        819      133      -686     
Flag Coverage Δ Complexity Δ
hudicli ? ?
hudiclient ? ?
hudicommon ? ?
hudiflink ? ?
hudihadoopmr ? ?
hudisparkdatasource ? ?
hudisync ? ?
huditimelineservice ? ?
hudiutilities 69.48% <ø> (+0.01%) 0.00 <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ Complexity Δ
...hudi/utilities/sources/helpers/KafkaOffsetGen.java 85.84% <0.00%> (-2.94%) 20.00% <0.00%> (+4.00%) ⬇️
...i/utilities/deltastreamer/HoodieDeltaStreamer.java 68.72% <0.00%> (-0.26%) 18.00% <0.00%> (ø%)
...apache/hudi/utilities/deltastreamer/DeltaSync.java 70.34% <0.00%> (-0.16%) 53.00% <0.00%> (+3.00%) ⬇️
...ies/sources/helpers/DatePartitionPathSelector.java 54.68% <0.00%> (-0.16%) 13.00% <0.00%> (ø%)
...s/deltastreamer/HoodieMultiTableDeltaStreamer.java 78.39% <0.00%> (ø) 18.00% <0.00%> (ø%)
.../java/org/apache/hudi/HoodieDataSourceHelpers.java
...ache/hudi/common/table/timeline/TimelineUtils.java
.../apache/hudi/common/util/DefaultSizeEstimator.java
...g/apache/hudi/common/util/RocksDBSchemaHelper.java
...rc/main/scala/org/apache/hudi/cli/DeDupeType.scala
... and 373 more

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a failure here running a job with a previous GCS detector, it would determine it to be GCS based on FSDataInputStream, but when it calls getWrappedStream().getWrappedStream() it ends up with GoogleHadoopFSDataInputStream that could not be cast to FSInputStream. So as a quick fix IF to this checking for FSInputStream first:

    if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
      this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
          new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
    } else if (FSUtils.isGCSInputStream(fsDataInputStream)) {
      this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
          new BufferedFSInputStream((FSInputStream) ((
              (FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize)));
    } else {
      // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
      // need to wrap in another BufferedFSInputStream the make bufferSize work?
      this.inputStream = fsDataInputStream;
    }

I would suggest you rearrange it here too as there is no need to do anything special if fsDataInputStream.getWrappedStream() is instanceof FSInputStream

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will fix it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems very weird to me from GCS behavior stand point. Is it always -1 difference from HDFS? Is it only applicable to the last byte position that is technically a length of file? I only saw a failure when it tried to jump to the last byte of the file. How does it behave if it wrapped by BufferedReader? Is it the same behavior if data is buffered?

Copy link
Contributor Author

@nsivabalan nsivabalan Feb 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I wasn't aware of this context/history. Now I realize why it was designed the way it was before. I have fixed it. You can check it out.

@nsivabalan nsivabalan added the priority:critical Production degraded; pipelines stalled label Feb 11, 2021
Copy link
Contributor Author

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vburenin : thanks and have addressed your feedback. you can check it out.

Copy link
Contributor Author

@nsivabalan nsivabalan Feb 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I wasn't aware of this context/history. Now I realize why it was designed the way it was before. I have fixed it. You can check it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will fix it.

if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
} else if (FSUtils.isGCSFileSystem(fs)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This piece of logic still slightly worries me a bit.

I would also check fsDataInputStream.getWrappedStream().getWrappedStream() could be cast to FSInputStream and fallback to 'this.inputStream = fsDataInputStream' if it can't be. But I also would log a warning in this case as it would be unexpected situation that we may want to dig deeper in the future.

thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan @bvaradar could you please clarify why we need this casting business here in the first place. With @vburenin . If we can simplify all this, we should

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had two options. either check two conditions like

if(fsDataInputStream.getWrappedStream() instance of FSDataInputStream && ((FSDataInputStream)fsDataInputStream.getWrappedStream()).getWrappedStream() instance of FSInputStream)

or resort to casting and if exception if thrown fallback to original dataInputStream. I have gone w/ 2nd approach.

@nsivabalan
Copy link
Contributor Author

yeah, that's what I initially thought. but wasn't sure if we need to do two checks

if(fsDataInputStream.getWrappedStream() instance of FSDataInputStream && ((FSDataInputStream)fsDataInputStream.getWrappedStream()).getWrappedStream() instance of FSInputStream)

hence, thought will use catch block to fallback which might be easier for readability.

@vburenin
Copy link
Contributor

yeah, that's what I initially thought. but wasn't sure if we need to do two checks

if(fsDataInputStream.getWrappedStream() instance of FSDataInputStream && ((FSDataInputStream)fsDataInputStream.getWrappedStream()).getWrappedStream() instance of FSInputStream)

hence, thought will use catch block to fallback which might be easier for readability.

Agreed. LGTM.
However, I have no way to approve it. My permissions for the projects are still almost R/O ;)

@nsivabalan
Copy link
Contributor Author

@bvaradar : this patch might be of interest to you.

* @param pos Position to seek
* @throws IOException -
*/
private static void safeSeek(FSDataInputStream inputStream, long pos) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this method then? just call inputStream.seek()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we could remove this. will fix it along with any other feedback which could be given.

(FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize)));
} else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please avoid flipping the if-elses, unless we need to. makes reading the changes pretty hard. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since I couldn't verify GCS locally, I was relying on feedback given by @vburenin

this is an excerpt from his previous comment.

I had a failure here running a job with a previous GCS detector, it would determine it to be GCS based on FSDataInputStream, but when it calls getWrappedStream().getWrappedStream() it ends up with GoogleHadoopFSDataInputStream that could not be cast to FSInputStream. So as a quick fix IF to this checking for FSInputStream first:

    if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
      this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
          new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
    } else if (FSUtils.isGCSInputStream(fsDataInputStream)) {
      this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
          new BufferedFSInputStream((FSInputStream) ((
              (FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize)));
    } else {
      // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
      // need to wrap in another BufferedFSInputStream the make bufferSize work?
      this.inputStream = fsDataInputStream;
    }
I would suggest you rearrange it here too as there is no need to do anything special if fsDataInputStream.getWrappedStream() is instanceof FSInputStream

if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
} else if (FSUtils.isGCSFileSystem(fs)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan @bvaradar could you please clarify why we need this casting business here in the first place. With @vburenin . If we can simplify all this, we should

@nsivabalan
Copy link
Contributor Author

to your comment here, again I evolved it based on feedback given by @vburenin . Do you mind explaining the need.

@vburenin
Copy link
Contributor

@nsivabalan I stand by my comment. The problem with all this only arise when fsDataInputStream.getWrappedStream() is not FSInputStream, so only then we start juggling trying to figure out what the type is and what to look for.

@vinothchandar
Copy link
Member

@vburenin do you have any alternative suggestions for how we go about this? trying to understand what it takes to land this.

@vburenin
Copy link
Contributor

vburenin commented Mar 1, 2021

@vinothchandar I am fine with this PR at this state it almost resembles what I currently run on production with some small implementation differences.

Copy link
Contributor Author

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar : let's sync up today/tmrw and close this out. or call in volodymyr for a quick sync up. would like to get this in by upcoming release.

@nsivabalan
Copy link
Contributor Author

nsivabalan commented Mar 6, 2021

Finally able to get hold of it. I set up a cluster w/ gcloud dataproc and tested it out. Even existing PR didn't work(before my latest commit).

fsDataInputStream.getWrappedStream() refers to com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream. This is an instance of FSInputStream. And so, we may not get into the 2nd else if block at all(if first if condition is wrapped stream instance of FSInputStream).

So, the actual fix is something like this.

FSDataInputStream inputStreamLocal;
if(wrapped stream instance of FSInputStream) {
     inputStreamLocal = new Timed...(new Buffered ... ((FSInputStream) fsDataInputStream.getWrappedStream())....);
    if( GCS ) { // wrap with SchemeAwareFSDataInputStream to intercept seek calls. 
         inputStreamLocal = new SchemeAwareFSDataInputStream(inputStreamLocal);
    }
} else {
   inputStreamLocal = fsDataInputStream;
}

Have updated the PR with the fix. I also tested with GCS FS and it works.
@vburenin @vinothchandar

new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));

if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this PR right now is that I had a case when fsDataInputStream.getWrappedStream() instanceof FSInputStream was not true because fsDataInputStream.getWrappedStream() was returning FSDataInputStream type when it was running as a spark job, however it was true when I was running deltastreamer as it is locally. (maybe I had different GCS connector versions?, this is likely)
So, that is why I was asking for multilayered IF.
I would also suggest to have this IF outside the first IF scope:

 if (FSUtils.isGCSFileSystem(fs)) {
        inputStreamLocal = new SchemeAwareFSDataInputStream(inputStreamLocal, true);
      }

since in case of the fallback 'else' scenario we can potentially miss GCS filesystem and crash on incorrect SEEK scenario.

@nsivabalan
Copy link
Contributor Author

nsivabalan commented Mar 7, 2021

If I am not wrong, there was a bug in the code version that you ran. I tested 4 different variants of code to arrive at the latest proposal. Let me walk through them :) Sorry about the lengthy response. Hopefully we get a closure.

1st variant. Current master branch:

FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
    LOG.warn("HoodieLogFileReader :: canonical name :: " + fsDataInputStream.getClass().getCanonicalName() + ", name "
            + fsDataInputStream.getClass().getName());
    if (FSUtils.isGCSInputStream(fsDataInputStream)) {
      LOG.warn("HoodieLogFileReader :: 111 start GCSFileSystem " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
      this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
          new BufferedFSInputStream((FSInputStream) ((
              (FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize)));
      LOG.warn("HoodieLogFileReader :: 111 completed ");
    } else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
      LOG.warn("HoodieLogFileReader :: 222 start " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
      this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
          new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
      LOG.warn("HoodieLogFileReader :: 222 complete");
    } else {
      LOG.warn("HoodieLogFileReader :: 333 ");
      // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
      // need to wrap in another BufferedFSInputStream the make bufferSize work?
      this.inputStream = fsDataInputStream;
    }

Output from my run:

"HoodieLogFileReader :: canonical name :: org.apache.hadoop.fs.FSDataInputStream, name org.apache.hadoop.fs.FSDataInputStream" 

"HoodieLogFileReader :: 111 start GCSFileSystem com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream" 

 Caused by: java.lang.ClassCastException: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream cannot be cast to org.apache.hadoop
.fs.FSDataInputStream
        at org.apache.hudi.common.table.log.HoodieLogFileReader.<init>(HoodieLogFileReader.java:84)
        at org.apache.hudi.common.table.log.HoodieLogFormatReader.<init>(HoodieLogFormatReader.java:62)
        at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:131)
        ... 24 more

2nd variant:
This PR just before my last commit.

if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
      LOG.warn("HoodieLogFileReader 1111 " + logFile.getFileName() + " " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
      inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
          new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
    } else if (FSUtils.isGCSFileSystem(fs)) {
      LOG.warn("HoodieLogFileReader 2222 aaa " + logFile.getFileName() + " " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
      try {
        FSInputStream localFSInputStream = (FSInputStream)(((FSDataInputStream)fsDataInputStream.getWrappedStream()).getWrappedStream());
        inputStreamLocal = new SchemeAwareFSDataInputStream(new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
            new BufferedFSInputStream(localFSInputStream,bufferSize))), true);
        LOG.warn("HoodieLogFileReader 2222 aaa succeeded " + logFile.getFileName());
      } catch (ClassCastException e) {
        Log.warn("HoodieLogFileReader 2222 bbb (aaa failed) " + logFile.getFileName() + " " + e.getCause()
                + ", msg " + e.getMessage());
        // if we cannot cast  fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream, fallback to using as is
        LOG.warn("Cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream with GCSFileSystem, falling back to original "
            + "fsDataInputStream");
        inputStreamLocal = fsDataInputStream;
      }
    } else {
      // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
      // need to wrap in another BufferedFSInputStream the make bufferSize work?
      LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName());
      inputStreamLocal = fsDataInputStream;
    }

Output from the run:

"HoodieLogFileReader 1111 .0d7ba334-2847-4b24-997e-1dbecfd12e3b-0_20210306132835.log.1_0-55-75 com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream" 

So, what this essentially means is that fsDataInputStream.getWrappedStream() instanceof FSInputStream for GCSFileSystem. And the execution don't even go into the else block here which is our intention actually. 

3rd variant: just to check if fsDataInputStream.getWrappedStream() is an instance of FSDataInputStream or FSInputStream

if (FSUtils.isGCSFileSystem(fs)) {
      LOG.warn("HoodieLogFileReader 111 aaa " + logFile.getFileName() + " can_name: " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName()
              + ". Is wrappedStream instance of fsDataInputStream " + (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream)
              + " , is wrappedSTream instance of fsInputStream " + (fsDataInputStream.getWrappedStream() instanceof FSInputStream));
      try {
        FSInputStream localFSInputStream = (FSInputStream)(((FSDataInputStream)fsDataInputStream.getWrappedStream()).getWrappedStream());
        inputStreamLocal = new SchemeAwareFSDataInputStream(new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
                new BufferedFSInputStream(localFSInputStream,bufferSize))), true);
        LOG.warn("HoodieLogFileReader 111 aaa succeeded " + logFile.getFileName());
      } catch (ClassCastException e) {
        LOG.warn("HoodieLogFileReader 111 bbb (aaa failed) " + logFile.getFileName() + " " + e.getCause()
                + ", msg " + e.getMessage());
        // if we cannot cast  fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream, fallback to using as is
        LOG.warn("Cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream with GCSFileSystem, falling back to original "
                + "fsDataInputStream");
        inputStreamLocal = fsDataInputStream;
      }
    } else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
      LOG.warn("HoodieLogFileReader 222 " + logFile.getFileName() + " " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
      inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
          new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
      LOG.warn("HoodieLogFileReader 222 completed ");
    } else {
      // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
      // need to wrap in another BufferedFSInputStream the make bufferSize work?
      LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName());
      inputStreamLocal = fsDataInputStream;
    }

Output from the run:

"HoodieLogFileReader 111 aaa .978be663-e43b-427e-a102-f26066b15776-0_20210306140026.log.1_0-55-76 can_name: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream. Is wrappedStream instance of fsDataInputStream false , is wrappedSTream instance of fsInputStream true" 

"HoodieLogFileReader 111 bbb (aaa failed) .978be663-e43b-427e-a102-f26066b15776-0_20210306140026.log.1_0-55-76 null, msg com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream cannot be cast to org.apache.hadoop.fs.FSDataInputStream" 

"Cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream with GCSFileSystem, falling back to original fsDataInputStream" 
.
.
After this, the seek ran into issue. 
Caused by: java.io.EOFException: Invalid seek offset: position value (1584) must be between 0 and 1584 for 'gs://dataproc-staging-us-
.
.
.

So, since we encountered class cast exception, we don't leverage the SchemeAwareFSDataInputStream class at all and hence ran into seek issue.

4th variant:
my latest commit w/ the proposed fix.

if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
      LOG.warn("HoodieLogFileReader 111 start " + logFile.getFileName());
      inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
              new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
      LOG.warn("HoodieLogFileReader 111 completed ");
      if (FSUtils.isGCSFileSystem(fs)) {
        LOG.warn("HoodieLogFileReader 222 GCS. Wrapping with SchemeAwareFSDataInputStream");
        inputStreamLocal = new SchemeAwareFSDataInputStream(inputStreamLocal, true);
      }
    } else {
      // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
      // need to wrap in another BufferedFSInputStream the make bufferSize work?
      LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName());
      inputStreamLocal = fsDataInputStream;
    }

Output from the run:
"HoodieLogFileReader 111 start .7a1a0684-b710-4a44-97c4-4c98b75db8a2-0_20210306142209.log.1_2-55-76"
"HoodieLogFileReader 111 completed "
"HoodieLogFileReader 222 GCS. Wrapping with SchemeAwareFSDataInputStream"

// No exceptions. all good.

Summary: at some point, we came up w/ two conditions, where condition 1 refers to fsDataInputStream.getWrappedStream() instanceof FSInputStream. and condition2 caters to GCSFileSystem. But in reality, GCSFileSystem also falls into first condition i.e. fsDataInputStream.getWrappedStream() instanceof FSInputStream. Hence the proposed fix.

@vburenin
Copy link
Contributor

vburenin commented Mar 8, 2021

Wow, this is super long reply. I truly appreciate the details.
I am going to do the following: I will apply this fix on my end today and will try two run it through both scenarios. As I mentioned I had two different behaviors, first one running it as a k8s spark app and the second one running it locally as a deltastreamer app. As of this moment I confirmed that I used two different GCS connectors, the latest version -1 was used locally and 2019 version was used on k8s. I think, the confusion is actually coming from GCS connector.

One more separate thing, we may want to dig into later. I use the pre-latest GCS-connection library version due to some libraries conflict that prevents me from running Hudi at all, did you use the latest version by chance?

@vburenin
Copy link
Contributor

vburenin commented Mar 8, 2021

I can clearly reproduce the issue with this code:

    LOG.info("fsDataInputStream.getWrappedStream: " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
    LOG.info("fsDataInputStream.getWrappedStream: instanceof FSInputStream" + (fsDataInputStream.getWrappedStream() instanceof FSInputStream));
    if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
      LOG.info("fsDataInputStream.getWrappedStream: instanceof FSInputStream " + (fsDataInputStream.getWrappedStream() instanceof FSInputStream));
      inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
          new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
      if (FSUtils.isGCSFileSystem(fs)) {
        inputStreamLocal = new SchemeAwareFSDataInputStream(inputStreamLocal, true);
      }
    } else {
      // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
      // need to wrap in another BufferedFSInputStream the make bufferSize work?
      inputStreamLocal = fsDataInputStream;
    }
    LOG.info("inputStreamLocal: " + inputStreamLocal.getClass().getCanonicalName());

225852 [Executor task launch worker for task 174] INFO org.apache.hudi.common.table.log.HoodieLogFileReader - fsDataInputStream.getWrappedStream: org.apache.hadoop.fs.FSDataInputStream
225852 [Executor task launch worker for task 174] INFO org.apache.hudi.common.table.log.HoodieLogFileReader - fsDataInputStream.getWrappedStream: instanceof FSInputStream false
225852 [Executor task launch worker for task 174] INFO org.apache.hudi.common.table.log.HoodieLogFileReader - inputStreamLocal: org.apache.hudi.common.fs.TimedFSDataInputStream

So, clearly, there is a bug.

It actually got even slightly worse as it is no longer handles GCS last byte seek issue anymore, not counting the absence of use of buffered reader.

@vburenin
Copy link
Contributor

vburenin commented Mar 8, 2021

Here is working option:

  public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
                             boolean readBlockLazily, boolean reverseReader) throws IOException {
    FSDataInputStream inputStreamLocal;
    FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
    LOG.info("fsDataInputStream.getWrappedStream: " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
    LOG.info("fsDataInputStream.getWrappedStream: instanceof FSInputStream" + (fsDataInputStream.getWrappedStream() instanceof FSInputStream));
    if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
      LOG.info("fsDataInputStream.getWrappedStream: instanceof FSInputStream" + (fsDataInputStream.getWrappedStream() instanceof FSInputStream));
      inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
          new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
    } else if (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream
        && ((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream() instanceof FSInputStream) {
      FSInputStream inputStream = (FSInputStream)((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream();
      LOG.info("getWrappedStream.getWrappedStream: " + inputStream.getClass().getCanonicalName());
      inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
          new BufferedFSInputStream(inputStream, bufferSize)));
    } else {
      inputStreamLocal = fsDataInputStream;
    }
    if (FSUtils.isGCSFileSystem(fs)) {
      inputStreamLocal = new SchemeAwareFSDataInputStream(inputStreamLocal, true);
    }
    LOG.info("inputStreamLocal: " + inputStreamLocal.getClass().getCanonicalName());

    this.inputStream = inputStreamLocal;
    this.logFile = logFile;
    this.readerSchema = readerSchema;
    this.readBlockLazily = readBlockLazily;
    this.reverseReader = reverseReader;
    if (this.reverseReader) {
      this.reverseLogFilePosition = this.lastReverseLogFilePosition = fs.getFileStatus(logFile.getPath()).getLen();
    }
    addShutDownHook();
  }

223711 [Executor task launch worker for task 174] INFO org.apache.hudi.common.table.log.HoodieLogFileReader - fsDataInputStream.getWrappedStream: org.apache.hadoop.fs.FSDataInputStream
223711 [Executor task launch worker for task 174] INFO org.apache.hudi.common.table.log.HoodieLogFileReader - fsDataInputStream.getWrappedStream: instanceof FSInputStreamfalse
223711 [Executor task launch worker for task 174] INFO org.apache.hudi.common.table.log.HoodieLogFileReader - getWrappedStream.getWrappedStream: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream
223713 [Executor task launch worker for task 174] INFO org.apache.hudi.common.fs.FSUtils - FS Scheme:gs
223713 [Executor task launch worker for task 174] INFO org.apache.hudi.common.fs.FSUtils - GSC Scheme:gs
223713 [Executor task launch worker for task 174] INFO org.apache.hudi.common.table.log.HoodieLogFileReader - inputStreamLocal: org.apache.hudi.common.fs.SchemeAwareFSDataInputStream

@nsivabalan
Copy link
Contributor Author

ok, got it. my test env probably was hitting just one code path, but looks like there is another path as well.

@nsivabalan
Copy link
Contributor Author

@vinothchandar : I feel we have sufficient data from my local env and from Vlad's run as well. I will go ahead w/ what Vlad has proposed. Do check it out when you get time. Will update the PR by tonight or tomorrow.

@nsivabalan
Copy link
Contributor Author

nsivabalan commented Mar 9, 2021

I made one minor change to your proposal @vburenin . with the else condition for wrapped over wrapped, added isGCSFileSystem check as well. Don't want to change the flow for other FileSystems. Have updated the PR. please check it out.

@vburenin
Copy link
Contributor

vburenin commented Mar 9, 2021

@nsivabalan Well, I am fine with that little change for us, but it makes it kinda questionable for other file systems if they run into the same case, so if I were you I would not add isGCSFileSystem check as it is only for GCS and if there is something similar in the future we will hit it again. Other than that LGTM.

@nsivabalan
Copy link
Contributor Author

@vburenin : yeah, but we have not seen this reported by anyone else in the community so far and so we would like to think it happens only for GCS FS for now. don't want to over-optimize w/ an assumption. Also, I have done some minor refactoring to move code to a separate method and kept GCS handling to a separate method as well so that general users don't need to wrap their head around the special handling of GCS.

@stackfun
Copy link

stackfun commented Mar 9, 2021

I tested this pull request and crashed - here's the stack trace
Edit - I pulled the latest code and updated the stack trace.

21/03/09 22:28:08 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 28.0 in stage 14.0 (TID 686, dataproc-w-22.us-east1-b.c.my-project.internal, executor 44): org.apache.hudi.exception.HoodieException: Exception when reading log file 
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:261)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:100)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:93)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:75)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:230)
	at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:327)
	at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:209)
	at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:199)
	at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:74)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
	at org.apache.hudi.common.table.log.HoodieLogFileReader.getFSDataInputStreamForGCSFs(HoodieLogFileReader.java:137)
	at org.apache.hudi.common.table.log.HoodieLogFileReader.getFSDataInputStream(HoodieLogFileReader.java:109)
	at org.apache.hudi.common.table.log.HoodieLogFileReader.<init>(HoodieLogFileReader.java:79)
	at org.apache.hudi.common.table.log.HoodieLogFormatReader.<init>(HoodieLogFormatReader.java:62)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:134)
	... 42 more

21/03/09 22:28:15 ERROR org.apache.spark.scheduler.TaskSetManager: Task 17 in stage 14.0 failed 4 times; aborting job
21/03/09 22:28:15 ERROR org.apache.spark.sql.execution.datasources.FileFormatWriter: Aborting job f426c7a0-b351-4bac-bc87-170784b48e0c.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 17 in stage 14.0 failed 4 times, most recent failure: Lost task 17.3 in stage 14.0 (TID 1021, dataproc-w-23.us-east1-b.c.my-project.internal, executor 40): org.apache.hudi.exception.HoodieException: Exception when reading log file 
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:261)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:100)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:93)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:75)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:230)
	at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:327)
	at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:209)
	at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:199)
	at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:74)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
	at org.apache.hudi.common.table.log.HoodieLogFileReader.getFSDataInputStreamForGCSFs(HoodieLogFileReader.java:137)
	at org.apache.hudi.common.table.log.HoodieLogFileReader.getFSDataInputStream(HoodieLogFileReader.java:109)
	at org.apache.hudi.common.table.log.HoodieLogFileReader.<init>(HoodieLogFileReader.java:79)
	at org.apache.hudi.common.table.log.HoodieLogFormatReader.<init>(HoodieLogFormatReader.java:62)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:134)
	... 42 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1892)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1880)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2113)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2062)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2051)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file 
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:261)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:100)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:93)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:75)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:230)
	at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:327)
	at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:209)
	at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:199)
	at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:74)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.lang.NullPointerException
	at org.apache.hudi.common.table.log.HoodieLogFileReader.getFSDataInputStreamForGCSFs(HoodieLogFileReader.java:137)
	at org.apache.hudi.common.table.log.HoodieLogFileReader.getFSDataInputStream(HoodieLogFileReader.java:109)
	at org.apache.hudi.common.table.log.HoodieLogFileReader.<init>(HoodieLogFileReader.java:79)
	at org.apache.hudi.common.table.log.HoodieLogFormatReader.<init>(HoodieLogFormatReader.java:62)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:134)

* @param bufferSize buffer size to be used.
* @return the right {@link FSDataInputStream} as required.
*/
private FSDataInputStream getFSDataInputStream(FSDataInputStream fsDataInputStream, FileSystem fs, int bufferSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather rewrite it like this reducing cyclomatic complexity, but I am also fine with what is here originally:

  private FSDataInputStream getFSDataInputStream(FSDataInputStream fsDataInputStream, FileSystem fs, int bufferSize) {
    if (FSUtils.isGCSFileSystem(fs)) {
      return new SchemeAwareFSDataInputStream(
         getFSDataInputStreamForGCSFs(fsDataInputStream, fs, bufferSize), true);
    }
    if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
        return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
            new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
    }
    return fsDataInputStream;
  }
 
   private FSDataInputStream getFSDataInputStreamForGCSFs(FSDataInputStream fsDataInputStream, FileSystem fs, int bufferSize) {
    if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
      return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
          new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
    }
    if (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream
        && ((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream() instanceof FSInputStream) {
      FSInputStream inputStream = (FSInputStream)((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream();
      return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
          new BufferedFSInputStream(inputStream, bufferSize)));
    }
    return fsDataInputSt
 

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also agree with you. will restructure this a bit.

@nsivabalan
Copy link
Contributor Author

@stackfun : sorry, missed to assign logfile to instance var. have fixed it. Can you try it out.

@stackfun
Copy link

Thanks @nsivabalan - no crash on the same test

@nsivabalan
Copy link
Contributor Author

@vinothchandar : Vlad and stackfun certified the fix works.

toReturnInputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
} else {
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these two lines are comment are copy-reused also down below?

@vinothchandar
Copy link
Member

Pushed small refactors. Will merge once CI passes

@vinothchandar vinothchandar merged commit e93c6a5 into apache:master Mar 14, 2021
prashantwason pushed a commit to prashantwason/incubator-hudi that referenced this pull request Aug 5, 2021
)

* Adding SchemeAwareFSDataInputStream for abstract out special handling for GCSFileSystem
* Moving wrapping of fsDataInputStream to separate method in HoodieLogFileReader

Co-authored-by: Vinoth Chandar <[email protected]>
prashantwason added a commit to prashantwason/incubator-hudi that referenced this pull request Aug 5, 2021
…OSS master

Summary:
[HUDI-1509]: Reverting LinkedHashSet changes to combine fields from oldSchema and newSchema in favor of using only new schema for record rewriting (apache#2424)
[MINOR] Bumping snapshot version to 0.7.0 (apache#2435)
[HUDI-1533] Make SerializableSchema work for large schemas and add ability to sortBy numeric values (apache#2453)
[HUDI-1529] Add block size to the FileStatus objects returned from metadata table to avoid too many file splits (apache#2451)
[HUDI-1532] Fixed suboptimal implementation of a magic sequence search  (apache#2440)
[HUDI-1535] Fix 0.7.0 snapshot (apache#2456)
[MINOR] Fixing setting defaults for index config (apache#2457)
[HUDI-1540] Fixing commons codec shading in spark bundle (apache#2460)
[HUDI 1308] Harden RFC-15 Implementation based on production testing (apache#2441)
[MINOR] Remove redundant judgments (apache#2466)
[MINOR] Fix dataSource cannot use hoodie.datasource.hive_sync.auto_create_database (apache#2444)
[MINOR] Disabling problematic tests temporarily to stabilize CI (apache#2468)
[MINOR] Make a separate travis CI job for hudi-utilities (apache#2469)
[HUDI-1512] Fix spark 2 unit tests failure with Spark 3 (apache#2412)
[HUDI-1511] InstantGenerateOperator support multiple parallelism (apache#2434)
[HUDI-1332] Introduce FlinkHoodieBloomIndex to hudi-flink-client (apache#2375)
[HUDI] Add bloom index for hudi-flink-client
[MINOR] Remove InstantGeneratorOperator parallelism limit in HoodieFlinkStreamer and update docs (apache#2471)
[MINOR] Improve code readability,remove the continue keyword (apache#2459)
[HOTFIX] Revert upgrade flink verison to 1.12.0 (apache#2473)
[HUDI-1453] Fix NPE using HoodieFlinkStreamer to etl data from kafka to hudi (apache#2474)
[MINOR] Use skipTests flag for skip.hudi-spark2.unit.tests property (apache#2477)
[HUDI-1476] Introduce unit test infra for java client (apache#2478)
[MINOR] Update doap with 0.7.0 release (apache#2491)
[MINOR]Fix NPE when using HoodieFlinkStreamer with multi parallelism (apache#2492)
[HUDI-1234] Insert new records to data files without merging for "Insert" operation.  (apache#2111)
[MINOR] Add Jira URL and Mailing List (apache#2404)
[HUDI-1522] Add a new pipeline for Flink writer (apache#2430)
[HUDI-1522] Add a new pipeline for Flink writer
[HUDI-623] Remove UpgradePayloadFromUberToApache (apache#2455)
[HUDI-1555] Remove isEmpty to improve clustering execution performance (apache#2502)
[HUDI-1266] Add unit test for validating replacecommit rollback (apache#2418)
[MINOR] Quickstart.generateUpdates method add check (apache#2505)
[HUDI-1519] Improve minKey/maxKey computation in HoodieHFileWriter (apache#2427)
[HUDI-1550] Honor ordering field for MOR Spark datasource reader (apache#2497)
[MINOR] Fix method comment typo (apache#2518)
[MINOR] Rename FileSystemViewHandler to RequestHandler and corrected the class comment (apache#2458)
[HUDI-1335] Introduce FlinkHoodieSimpleIndex to hudi-flink-client (apache#2271)
[HUDI-1523] Call mkdir(partition) only if not exists (apache#2501)
[HUDI-1538] Try to init class trying different signatures instead of checking its name (apache#2476)
[HUDI-1538] Try to init class trying different signatures instead of checking its name.
[HUDI-1547] CI intermittent failure: TestJsonStringToHoodieRecordMapF… (apache#2521)
[MINOR] Fixing the default value for source ordering field for payload config (apache#2516)
[HUDI-1420] HoodieTableMetaClient.getMarkerFolderPath works incorrectly on windows client with hdfs server for wrong file seperator (apache#2526)
[HUDI-1571] Adding commit_show_records_info to display record sizes for commit (apache#2514)
[HUDI-1589] Fix Rollback Metadata AVRO backwards incompatiblity (apache#2543)
[MINOR] Fix wrong logic for checking state condition (apache#2524)
[HUDI-1557] Make Flink write pipeline write task scalable (apache#2506)
[HUDI-1545] Add test cases for INSERT_OVERWRITE Operation (apache#2483)
[HUDI-1603] fix DefaultHoodieRecordPayload serialization failure (apache#2556)
[MINOR] Fix the wrong comment for HoodieJavaWriteClientExample (apache#2559)
[HUDI-1526] Translate the api partitionBy in spark datasource to hoodie.datasource.write.partitionpath.field (apache#2431)
[HUDI-1612] Fix write test flakiness in StreamWriteITCase (apache#2567)
[HUDI-1612] Fix write test flakiness in StreamWriteITCase
[MINOR] Default to empty list for unset datadog tags property (apache#2574)
[MINOR] Add clustering to feature list (apache#2568)
[HUDI-1598] Write as minor batches during one checkpoint interval for the new writer (apache#2553)
[HUDI-1109] Support Spark Structured Streaming read from Hudi table (apache#2485)
[HUDI-1621] Gets the parallelism from context when init StreamWriteOperatorCoordinator (apache#2579)
[HUDI-1381] Schedule compaction based on time elapsed (apache#2260)
[HUDI-1582] Throw an exception when syncHoodieTable() fails, with RuntimeException (apache#2536)
[HUDI-1539] Fix bug in HoodieCombineRealtimeRecordReader with reading empty iterators (apache#2583)
[HUDI-1315] Adding builder for HoodieTableMetaClient initialization (apache#2534)
[HUDI-1486] Remove inline inflight rollback in hoodie writer (apache#2359)
[HUDI-1586] [Common Core] [Flink Integration] Reduce the coupling of hadoop. (apache#2540)
[HUDI-1624] The state based index should bootstrap from existing base files (apache#2581)
[HUDI-1477] Support copyOnWriteTable in java client (apache#2382)
[MINOR] Ensure directory exists before listing all marker files. (apache#2594)
[MINOR] hive sync checks for table after creating db if auto create is true (apache#2591)
[HUDI-1620] Add azure pipelines configs (apache#2582)
[HUDI-1347] Fix Hbase index to make rollback synchronous (via config) (apache#2188)
[HUDI-1637] Avoid to rename for bucket update when there is only one flush action during a checkpoint (apache#2599)
[HUDI-1638] Some improvements to BucketAssignFunction (apache#2600)
[HUDI-1367] Make deltaStreamer transition from dfsSouce to kafkasouce (apache#2227)
[HUDI-1269] Make whether the failure of connect hive affects hudi ingest process configurable (apache#2443)
[HUDI-1611] Added a configuration to allow specific directories to be filtered out during Metadata Table bootstrap. (apache#2565)
[Hudi-1583]: Fix bug that Hudi will skip remaining log files if there is logFile with zero size in logFileList when merge on read. (apache#2584)
[HUDI-1632] Supports merge on read write mode for Flink writer (apache#2593)
[HUDI-1540] Fixing commons codec dependency in bundle jars (apache#2562)
[HUDI-1644] Do not delete older rollback instants as part of rollback. Archival can take care of removing old instants cleanly (apache#2610)
[HUDI-1634] Re-bootstrap metadata table when un-synced instants have been archived. (apache#2595)
[HUDI-1584] Modify maker file path, which should start with the target base path. (apache#2539)
[MINOR] Fix default value for hoodie.deltastreamer.source.kafka.auto.reset.offsets (apache#2617)
[HUDI-1553] Configuration and metrics for the TimelineService. (apache#2495)
[HUDI-1587] Add latency and freshness support (apache#2541)
[HUDI-1647] Supports snapshot read for Flink (apache#2613)
[HUDI-1646] Provide mechanism to read uncommitted data through InputFormat (apache#2611)
[HUDI-1655] Support custom date format and fix unsupported exception in DatePartitionPathSelector (apache#2621)
[HUDI-1636] Support Builder Pattern To Build Table Properties For HoodieTableConfig (apache#2596)
[HUDI-1660] Excluding compaction and clustering instants from inflight rollback (apache#2631)
[HUDI-1661] Exclude clustering commits from getExtraMetadataFromLatest API (apache#2632)
[MINOR] Fix import in StreamerUtil.java (apache#2638)
[HUDI-1618] Fixing NPE with Parquet src in multi table delta streamer (apache#2577)
[HUDI-1662] Fix hive date type conversion for mor table (apache#2634)
[HUDI-1673] Replace scala.Tule2 to Pair in FlinkHoodieBloomIndex (apache#2642)
[MINOR] HoodieClientTestHarness close resources in AfterAll phase (apache#2646)
[HUDI-1635] Improvements to Hudi Test Suite (apache#2628)
[HUDI-1651] Fix archival of requested replacecommit (apache#2622)
[HUDI-1663] Streaming read for Flink MOR table (apache#2640)
[HUDI-1678] Row level delete for Flink sink (apache#2659)
[HUDI-1664] Avro schema inference for Flink SQL table (apache#2658)
[HUDI-1681] Support object storage for Flink writer (apache#2662)
[HUDI-1685] keep updating current date for every batch (apache#2671)
[HUDI-1496] Fixing input stream detection of GCS FileSystem (apache#2500)
[HUDI-1684] Tweak hudi-flink-bundle module pom and reorganize the pacakges for hudi-flink module (apache#2669)
[HUDI-1692] Bounded source for stream writer (apache#2674)
[HUDI-1552] Improve performance of key lookups from base file in Metadata Table. (apache#2494)
[HUDI-1552] Improve performance of key lookups from base file in Metadata Table.
[HUDI-1695] Fixed the error messaging (apache#2679)
[HUDI 1615] Fixing null schema in bulk_insert row writer path  (apache#2653)
[HUDI-845] Added locking capability to allow multiple writers (apache#2374)
[HUDI-1701] Implement HoodieTableSource.explainSource for all kinds of pushing down (apache#2690)
[HUDI-1704] Use PRIMARY KEY syntax to define record keys for Flink Hudi table (apache#2694)
[HUDI-1688]hudi write should uncache rdd, when the write operation is finnished (apache#2673)
[MINOR] Remove unused var in AbstractHoodieWriteClient (apache#2693)
[HUDI-1653] Add support for composite keys in NonpartitionedKeyGenerator (apache#2627)
[HUDI-1705] Flush as per data bucket for mini-batch write (apache#2695)
[1568] Fixing spark3 bundles (apache#2625)
[HUDI-1650] Custom avro kafka deserializer. (apache#2619)
[HUDI-1667]: Fix a null value related bug for spark vectorized reader. (apache#2636)
[HUDI-1709] Improving config names and adding hive metastore uri config (apache#2699)
[MINOR][DOCUMENT] Update README doc for integ test (apache#2703)
[HUDI-1710] Read optimized query type for Flink batch reader (apache#2702)
[HUDI-1712] Rename & standardize config to match other configs (apache#2708)
[hotfix] Log the error message for creating table source first (apache#2711)
[HUDI-1495] Bump Flink version to 1.12.2 (apache#2718)
[HUDI-1728] Fix MethodNotFound for HiveMetastore Locks (apache#2731)
[HUDI-1478] Introduce HoodieBloomIndex to hudi-java-client (apache#2608)
[HUDI-1729] Asynchronous Hive sync and commits cleaning for Flink writer (apache#2732)
[HOTFIX] close spark session in functional test suite and disable spark3 test for spark2 (apache#2727)
[HOTFIX] Disable ITs for Spark3 and scala2.12 (apache#2733)
[HOTFIX] fix deploy staging jars script
[MINOR] Add Missing Apache License to test files (apache#2736)
[UBER] Fixed creation of HoodieMetadataClient which now uses a Builder pattern instead of a constructor.

Reviewers: balajee, O955 Project Hoodie Project Reviewer: Add blocking reviewers!, PHID-PROJ-pxfpotkfgkanblb3detq!

JIRA Issues: HUDI-593

Differential Revision: https://code.uberinternal.com/D5867129
prashantwason added a commit to prashantwason/incubator-hudi that referenced this pull request Aug 5, 2021
…OSS master

Summary:
[HUDI-1509]: Reverting LinkedHashSet changes to combine fields from oldSchema and newSchema in favor of using only new schema for record rewriting (apache#2424)
[MINOR] Bumping snapshot version to 0.7.0 (apache#2435)
[HUDI-1533] Make SerializableSchema work for large schemas and add ability to sortBy numeric values (apache#2453)
[HUDI-1529] Add block size to the FileStatus objects returned from metadata table to avoid too many file splits (apache#2451)
[HUDI-1532] Fixed suboptimal implementation of a magic sequence search  (apache#2440)
[HUDI-1535] Fix 0.7.0 snapshot (apache#2456)
[MINOR] Fixing setting defaults for index config (apache#2457)
[HUDI-1540] Fixing commons codec shading in spark bundle (apache#2460)
[HUDI 1308] Harden RFC-15 Implementation based on production testing (apache#2441)
[MINOR] Remove redundant judgments (apache#2466)
[MINOR] Fix dataSource cannot use hoodie.datasource.hive_sync.auto_create_database (apache#2444)
[MINOR] Disabling problematic tests temporarily to stabilize CI (apache#2468)
[MINOR] Make a separate travis CI job for hudi-utilities (apache#2469)
[HUDI-1512] Fix spark 2 unit tests failure with Spark 3 (apache#2412)
[HUDI-1511] InstantGenerateOperator support multiple parallelism (apache#2434)
[HUDI-1332] Introduce FlinkHoodieBloomIndex to hudi-flink-client (apache#2375)
[HUDI] Add bloom index for hudi-flink-client
[MINOR] Remove InstantGeneratorOperator parallelism limit in HoodieFlinkStreamer and update docs (apache#2471)
[MINOR] Improve code readability,remove the continue keyword (apache#2459)
[HOTFIX] Revert upgrade flink verison to 1.12.0 (apache#2473)
[HUDI-1453] Fix NPE using HoodieFlinkStreamer to etl data from kafka to hudi (apache#2474)
[MINOR] Use skipTests flag for skip.hudi-spark2.unit.tests property (apache#2477)
[HUDI-1476] Introduce unit test infra for java client (apache#2478)
[MINOR] Update doap with 0.7.0 release (apache#2491)
[MINOR]Fix NPE when using HoodieFlinkStreamer with multi parallelism (apache#2492)
[HUDI-1234] Insert new records to data files without merging for "Insert" operation.  (apache#2111)
[MINOR] Add Jira URL and Mailing List (apache#2404)
[HUDI-1522] Add a new pipeline for Flink writer (apache#2430)
[HUDI-1522] Add a new pipeline for Flink writer
[HUDI-623] Remove UpgradePayloadFromUberToApache (apache#2455)
[HUDI-1555] Remove isEmpty to improve clustering execution performance (apache#2502)
[HUDI-1266] Add unit test for validating replacecommit rollback (apache#2418)
[MINOR] Quickstart.generateUpdates method add check (apache#2505)
[HUDI-1519] Improve minKey/maxKey computation in HoodieHFileWriter (apache#2427)
[HUDI-1550] Honor ordering field for MOR Spark datasource reader (apache#2497)
[MINOR] Fix method comment typo (apache#2518)
[MINOR] Rename FileSystemViewHandler to RequestHandler and corrected the class comment (apache#2458)
[HUDI-1335] Introduce FlinkHoodieSimpleIndex to hudi-flink-client (apache#2271)
[HUDI-1523] Call mkdir(partition) only if not exists (apache#2501)
[HUDI-1538] Try to init class trying different signatures instead of checking its name (apache#2476)
[HUDI-1538] Try to init class trying different signatures instead of checking its name.
[HUDI-1547] CI intermittent failure: TestJsonStringToHoodieRecordMapF… (apache#2521)
[MINOR] Fixing the default value for source ordering field for payload config (apache#2516)
[HUDI-1420] HoodieTableMetaClient.getMarkerFolderPath works incorrectly on windows client with hdfs server for wrong file seperator (apache#2526)
[HUDI-1571] Adding commit_show_records_info to display record sizes for commit (apache#2514)
[HUDI-1589] Fix Rollback Metadata AVRO backwards incompatiblity (apache#2543)
[MINOR] Fix wrong logic for checking state condition (apache#2524)
[HUDI-1557] Make Flink write pipeline write task scalable (apache#2506)
[HUDI-1545] Add test cases for INSERT_OVERWRITE Operation (apache#2483)
[HUDI-1603] fix DefaultHoodieRecordPayload serialization failure (apache#2556)
[MINOR] Fix the wrong comment for HoodieJavaWriteClientExample (apache#2559)
[HUDI-1526] Translate the api partitionBy in spark datasource to hoodie.datasource.write.partitionpath.field (apache#2431)
[HUDI-1612] Fix write test flakiness in StreamWriteITCase (apache#2567)
[HUDI-1612] Fix write test flakiness in StreamWriteITCase
[MINOR] Default to empty list for unset datadog tags property (apache#2574)
[MINOR] Add clustering to feature list (apache#2568)
[HUDI-1598] Write as minor batches during one checkpoint interval for the new writer (apache#2553)
[HUDI-1109] Support Spark Structured Streaming read from Hudi table (apache#2485)
[HUDI-1621] Gets the parallelism from context when init StreamWriteOperatorCoordinator (apache#2579)
[HUDI-1381] Schedule compaction based on time elapsed (apache#2260)
[HUDI-1582] Throw an exception when syncHoodieTable() fails, with RuntimeException (apache#2536)
[HUDI-1539] Fix bug in HoodieCombineRealtimeRecordReader with reading empty iterators (apache#2583)
[HUDI-1315] Adding builder for HoodieTableMetaClient initialization (apache#2534)
[HUDI-1486] Remove inline inflight rollback in hoodie writer (apache#2359)
[HUDI-1586] [Common Core] [Flink Integration] Reduce the coupling of hadoop. (apache#2540)
[HUDI-1624] The state based index should bootstrap from existing base files (apache#2581)
[HUDI-1477] Support copyOnWriteTable in java client (apache#2382)
[MINOR] Ensure directory exists before listing all marker files. (apache#2594)
[MINOR] hive sync checks for table after creating db if auto create is true (apache#2591)
[HUDI-1620] Add azure pipelines configs (apache#2582)
[HUDI-1347] Fix Hbase index to make rollback synchronous (via config) (apache#2188)
[HUDI-1637] Avoid to rename for bucket update when there is only one flush action during a checkpoint (apache#2599)
[HUDI-1638] Some improvements to BucketAssignFunction (apache#2600)
[HUDI-1367] Make deltaStreamer transition from dfsSouce to kafkasouce (apache#2227)
[HUDI-1269] Make whether the failure of connect hive affects hudi ingest process configurable (apache#2443)
[HUDI-1611] Added a configuration to allow specific directories to be filtered out during Metadata Table bootstrap. (apache#2565)
[Hudi-1583]: Fix bug that Hudi will skip remaining log files if there is logFile with zero size in logFileList when merge on read. (apache#2584)
[HUDI-1632] Supports merge on read write mode for Flink writer (apache#2593)
[HUDI-1540] Fixing commons codec dependency in bundle jars (apache#2562)
[HUDI-1644] Do not delete older rollback instants as part of rollback. Archival can take care of removing old instants cleanly (apache#2610)
[HUDI-1634] Re-bootstrap metadata table when un-synced instants have been archived. (apache#2595)
[HUDI-1584] Modify maker file path, which should start with the target base path. (apache#2539)
[MINOR] Fix default value for hoodie.deltastreamer.source.kafka.auto.reset.offsets (apache#2617)
[HUDI-1553] Configuration and metrics for the TimelineService. (apache#2495)
[HUDI-1587] Add latency and freshness support (apache#2541)
[HUDI-1647] Supports snapshot read for Flink (apache#2613)
[HUDI-1646] Provide mechanism to read uncommitted data through InputFormat (apache#2611)
[HUDI-1655] Support custom date format and fix unsupported exception in DatePartitionPathSelector (apache#2621)
[HUDI-1636] Support Builder Pattern To Build Table Properties For HoodieTableConfig (apache#2596)
[HUDI-1660] Excluding compaction and clustering instants from inflight rollback (apache#2631)
[HUDI-1661] Exclude clustering commits from getExtraMetadataFromLatest API (apache#2632)
[MINOR] Fix import in StreamerUtil.java (apache#2638)
[HUDI-1618] Fixing NPE with Parquet src in multi table delta streamer (apache#2577)
[HUDI-1662] Fix hive date type conversion for mor table (apache#2634)
[HUDI-1673] Replace scala.Tule2 to Pair in FlinkHoodieBloomIndex (apache#2642)
[MINOR] HoodieClientTestHarness close resources in AfterAll phase (apache#2646)
[HUDI-1635] Improvements to Hudi Test Suite (apache#2628)
[HUDI-1651] Fix archival of requested replacecommit (apache#2622)
[HUDI-1663] Streaming read for Flink MOR table (apache#2640)
[HUDI-1678] Row level delete for Flink sink (apache#2659)
[HUDI-1664] Avro schema inference for Flink SQL table (apache#2658)
[HUDI-1681] Support object storage for Flink writer (apache#2662)
[HUDI-1685] keep updating current date for every batch (apache#2671)
[HUDI-1496] Fixing input stream detection of GCS FileSystem (apache#2500)
[HUDI-1684] Tweak hudi-flink-bundle module pom and reorganize the pacakges for hudi-flink module (apache#2669)
[HUDI-1692] Bounded source for stream writer (apache#2674)
[HUDI-1552] Improve performance of key lookups from base file in Metadata Table. (apache#2494)
[HUDI-1552] Improve performance of key lookups from base file in Metadata Table.
[HUDI-1695] Fixed the error messaging (apache#2679)
[HUDI 1615] Fixing null schema in bulk_insert row writer path  (apache#2653)
[HUDI-845] Added locking capability to allow multiple writers (apache#2374)
[HUDI-1701] Implement HoodieTableSource.explainSource for all kinds of pushing down (apache#2690)
[HUDI-1704] Use PRIMARY KEY syntax to define record keys for Flink Hudi table (apache#2694)
[HUDI-1688]hudi write should uncache rdd, when the write operation is finnished (apache#2673)
[MINOR] Remove unused var in AbstractHoodieWriteClient (apache#2693)
[HUDI-1653] Add support for composite keys in NonpartitionedKeyGenerator (apache#2627)
[HUDI-1705] Flush as per data bucket for mini-batch write (apache#2695)
[1568] Fixing spark3 bundles (apache#2625)
[HUDI-1650] Custom avro kafka deserializer. (apache#2619)
[HUDI-1667]: Fix a null value related bug for spark vectorized reader. (apache#2636)
[HUDI-1709] Improving config names and adding hive metastore uri config (apache#2699)
[MINOR][DOCUMENT] Update README doc for integ test (apache#2703)
[HUDI-1710] Read optimized query type for Flink batch reader (apache#2702)
[HUDI-1712] Rename & standardize config to match other configs (apache#2708)
[hotfix] Log the error message for creating table source first (apache#2711)
[HUDI-1495] Bump Flink version to 1.12.2 (apache#2718)
[HUDI-1728] Fix MethodNotFound for HiveMetastore Locks (apache#2731)
[HUDI-1478] Introduce HoodieBloomIndex to hudi-java-client (apache#2608)
[HUDI-1729] Asynchronous Hive sync and commits cleaning for Flink writer (apache#2732)
[HOTFIX] close spark session in functional test suite and disable spark3 test for spark2 (apache#2727)
[HOTFIX] Disable ITs for Spark3 and scala2.12 (apache#2733)
[HOTFIX] fix deploy staging jars script
[MINOR] Add Missing Apache License to test files (apache#2736)
[UBER] Fixed creation of HoodieMetadataClient which now uses a Builder pattern instead of a constructor.

Reviewers: balajee

Reviewed By: balajee

JIRA Issues: HUDI-593

Differential Revision: https://code.uberinternal.com/D5867129
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:critical Production degraded; pipelines stalled

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants