Skip to content

Conversation

@tdas
Copy link
Contributor

@tdas tdas commented Jun 30, 2016

What changes were proposed in this pull request?

This refactoring make the public API simpler. Current version has the following problems

  1. Too many separate public classes for different location and consumer strategies.
  2. Each strategy has a separate apply and create methods for Scala and Java.
  3. ConsumerStrategy is interface, cannot add methods with breaking compatibility in the future.
  4. Using JavaConveters to wrap scala maps as java sometimes leads to the final map not being serializable. This broke serializability of strategies having maps.

To fix these, I have refactored the classes. Now LocationStrategy has static methods to create the appropriate strategy. E.g.

LocationStrategy.PreferBrokers()   // returns a LocationStrategy object, same API in scala and java

Similarly,

ConsumerStrategy.Subscribe(topics, params)  // returns a ConsumerStrategy object, same API in scala and java

1+2. This allows all the strategy classes to be hidden from public, less surface for incompatibility in the future. And same APIs for Java and Scala.
3. In addition, both strategies have been implemented as abstract class in Java.

How was this patch tested?

Modified unit tests.

TODO: More unit tests to

  • Check serializability of each strategy
  • Call scala API of each strategy

@tdas
Copy link
Contributor Author

tdas commented Jun 30, 2016

@koeninger @zsxwing

@SparkQA
Copy link

SparkQA commented Jun 30, 2016

Test build #61540 has finished for PR 13996 at commit 39a26f3.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

import org.apache.spark.annotation.Experimental;

/**
* :: Experimental :: Choice of how to create and configure underlying Kafka Consumers on driver and
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix docs

@SparkQA
Copy link

SparkQA commented Jun 30, 2016

Test build #61541 has finished for PR 13996 at commit 345e1c3.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public abstract class ConsumerStrategy<K, V>
    • public abstract class LocationStrategy

@koeninger
Copy link
Contributor

I've got concerns about this, please don't merge these refactorings until I
get a chance to look at it today
On Jun 30, 2016 6:32 AM, "Tathagata Das" [email protected] wrote:

@koeninger https://github.com/koeninger @zsxwing
https://github.com/zsxwing


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#13996 (comment), or mute
the thread
https://github.com/notifications/unsubscribe/AAGAB84z6rE7iAdFNNjRg5FEPOPF7MNqks5qQ6k8gaJpZM4JCD52
.

@SparkQA
Copy link

SparkQA commented Jun 30, 2016

Test build #61543 has finished for PR 13996 at commit e5026f7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koeninger
Copy link
Contributor

I'll leave line comments about specific things, but my major overarching concern is about moving the interface to a Java abstract class.

As far as I can tell, nothing being done in that class couldn't be done in Scala. If this is just a reaction to not wanting autogenerated apply methods, nothing's stopping us from making a standalone non-companion object with the exact same interface you're proposing.

More important in my opinion is the reasoning behind moving to an abstract class. Nothing about the definition of that interface requires constructor state, so a class doesn't make sense.

If your thinking here is that you want to be able to later add methods with a default implementation without relying on Java 8 features... I'm really opposed to this line of reasoning. This is prioritizing binary compatibility at the cost of silently breaking people's code when APIs really do need to change.

If the interface needs to change, and a bunch of people have already implemented that interface in their own code, which would you rather have as a user? Know at compile time that this interface, (which is marked as experimental) needs to change, figure out how it changed, and write the right thing for your use case? Or silently have it "work"... until you find out at runtime (or in a business meeting about why metrics are now wrong) that the default implementation the spark project added does totally the wrong thing for your use case?

* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
* @param consumerStrategy In most cases, pass in [[Subscribe]],
* see [[ConsumerStrategy]] for more details
* @param consumerStrategy In most cases, pass in [[SubscribeStrategy]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't hide PreferConsistent and SusbscribeStrategy from users and then tell them to pass it in...

@koeninger
Copy link
Contributor

So another concrete reason I'm against moving the strategy interfaces to java is that LocationStrategy is no longer sealed, so we lose exhaustivity checking on the cases.

LocationStrategy is a tagged union / sum type / whatever you want to call it. It's definitely not an OO hierarchy, and users shouldn't be extending it.

@koeninger
Copy link
Contributor

koeninger commented Jun 30, 2016

See the linked pr #13998 for an example of what I'm proposing. I'll make sure the same thing works for consumer strategy and update that pr.
Edit - works for consumer strategy as well, both scala 2.10 and 2.11

If you let me know where you were actually running into serialization issues I can address those too - see note above.

}


public static LocationStrategy PreferFixed(Map<TopicPartition, String> hostMap) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: docs?

@tdas
Copy link
Contributor Author

tdas commented Jun 30, 2016

Let me note down all the concerns clearly.

  1. Compatibility with abstract class vs trait: Let me clarify. When abstract class, you can later add defined methods which the users can override if needed, but does not break compatibility of existing implementations. For example, if there is a class abstract class A { public void func() }, you can make it abstract class A { public void func(); public void newFunc() { } }. Existing implementations of A will not break at compile time, and if the default implementation of newFunc() is done correctly, then it will maintain the earlier behavior at runtime. No runtime failures. This is not possible with scala trait. The moment you add a defined method in a trait, it compiles down a java interface AND a similarly named java class - which becomes painful for Java users, no compatibility whatsoever.
  2. Language (Scala vs Java): I did the refactoring with Java because I knew that it will obviously work in Java as there is no more Scala magic anywhere. But I just did a quick test with scala
abstract class A {
  def func()
  def newFunc() { }   // class B below compiles fine with and without this method.
}

object A {
  def staticFunc() { }
}

class B(val x: String) extends A {
  override def func() { }
}

The resultant public interfaces are as expected.

$ javap A.class
Compiled from "MapConverter.scala"
public abstract class org.apache.spark.streaming.kafka010.A {
  public static void staticFunc();
  public abstract void func();
  public void newFunc();
  public org.apache.spark.streaming.kafka010.A();
}

So this should be fine if written in Scala.

  1. Style: This is are more fuzzy thing. I agree that this breaks the style guide a little, but we are doing so in specific cases to ensure that the public API looks a certains. Case in point OutputMode in sql.streaming. This unusual style make the API look enum-ish but allows parameterized enum-ish values.

Your PR #13998 is going in the right direction. I am stuck for the next couple of hours in meetings. So if you dont mind could you update your PR?

In addition, for testing, I noticed that the JavaConsumerStrategySuite used ju.HashMap[TopicPartition, Object] for offsets, and not ju.HashMap[TopicPartition, java.lang.Long]. Better to test with the latter, as that is what we would expect the user to have as a parameter. You can pull in my changes in this PR as much or as little you want.

@koeninger
Copy link
Contributor

koeninger commented Jun 30, 2016

"When abstract class, you can later add defined methods which the users can override if needed, but does not break compatibility of existing implementations." This is what I'm taking issue with. It's confusing binary compatibility with actual api compatibility. Changing the class and silently slipping in a new default method into someone's code may preserve binary compatibility, at the cost of actually breaking their code at runtime and losing them their job.

Clearly not a big change from a lines of code perspective to move trait to abstract class, so I can do it quickly, but I'd like to at least hear your response to that concern.

Other changes, no concerns, I can add them to my pr

@tdas
Copy link
Contributor Author

tdas commented Jun 30, 2016

I am closing this PR. This PR is superseded by #13998

@tdas tdas closed this Jun 30, 2016
asfgit pushed a commit that referenced this pull request Jul 1, 2016
## What changes were proposed in this pull request?
This is an alternative to the refactoring proposed by #13996

## How was this patch tested?

unit tests
also tested under scala 2.10 via
mvn -Dscala-2.10

Author: cody koeninger <[email protected]>

Closes #13998 from koeninger/kafka-0-10-refactor.
asfgit pushed a commit that referenced this pull request Jul 1, 2016
## What changes were proposed in this pull request?
This is an alternative to the refactoring proposed by #13996

## How was this patch tested?

unit tests
also tested under scala 2.10 via
mvn -Dscala-2.10

Author: cody koeninger <[email protected]>

Closes #13998 from koeninger/kafka-0-10-refactor.

(cherry picked from commit fbfd0ab)
Signed-off-by: Tathagata Das <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants