Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Dec 12, 2019

What changes were proposed in this pull request?

This patch adds close() method to the DataWriter interface, which will become the place to cleanup the resource.

Why are the changes needed?

The lifecycle of DataWriter instance ends at either commit() or abort(). That makes datasource implementors to feel they can place resource cleanup in both sides, but abort() can be called when commit() fails; so they have to ensure they don't do double-cleanup if cleanup is not idempotent.

Does this PR introduce any user-facing change?

Depends on the definition of user; if they're developers of custom DSv2 source, they have to add close() in their DataWriter implementations. It's OK to just add close() with empty content as they should have already dealt with resource cleanup in commit/abort, but they would love to migrate the resource cleanup logic to close() as it avoids double cleanup. If they're just end users using the provided DSv2 source (regardless of built-in/3rd party), no change.

How was this patch tested?

Existing tests.

CachedKafkaProducer.close(producerParams)
}
}
def close(): Unit = {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is safe; previous implementation cleans up the instance from the cache immediately so it actually helps a bit, but no big deal even we don't do it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this related to adding the close API?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Dec 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there's conflict on naming; test code calls it as close(). If we would like to keep the code as it is, I can rename previous method as invalidateProducer() and leave it as it is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the life cycle of kafka producers? IIRC they were cached before, but that patch gets reverted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there's no "return" in current Kafka producer cache and the cache evicts the expired producer on policy. Previously we force invalidating the Kafka producer when close() is explicitly called as callers of close() are temporarily using the producer (instead of running some query), and current code just let cache expire the producer on policy for all cases.

Copy link
Contributor

@cloud-fan cloud-fan Dec 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM then, if the life cycle of producers are controled by the cache policy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I just renamed origin close() method to invalidateProducer() to avoid the effect on Kafka side.

closeCalled = true
writer.close(errorOrNull)
}
override def close(): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change clearly shows the difference; DataWriter implementations don't need to deal with possible double resource cleanup.

@dongjoon-hyun
Copy link
Member

cc @cloud-fan

@HeartSaVioR
Copy link
Contributor Author

cc. @rdblue as well

Btw, I was planning to cc. after the build passes, but Jenkins hasn't come in for 20 mins. It might be an issue on Amplab build?

@SparkQA
Copy link

SparkQA commented Dec 12, 2019

Test build #115203 has finished for PR 26855 at commit 26b0e25.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Yes. @HeartSaVioR . Jenkins has been very slow today.

@SparkQA
Copy link

SparkQA commented Dec 12, 2019

Test build #115212 has finished for PR 26855 at commit 8058dbf.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@cloud-fan
Copy link
Contributor

LGTM except one comment. We should probably open another PR for that change and ask people who are familiar with kafka to take a look (I am not).

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Dec 12, 2019

Test build #115226 has finished for PR 26855 at commit 8058dbf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 12, 2019

Test build #115240 has finished for PR 26855 at commit 21d03e7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing and merging!

Btw, looks like merge script somehow missed to update the JIRA issue. Could you please take care of this? Thanks again!

@HeartSaVioR HeartSaVioR deleted the SPARK-30227 branch December 13, 2019 10:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants