Skip to content

Conversation

@zjffdu
Copy link
Contributor

@zjffdu zjffdu commented Sep 6, 2016

What is this PR for?

The root cause is that SQLContext's signature changes in spark 2.0.
Spark 1.6

def __init__(self, sparkContext, sqlContext=None):

Spark 2.0

def __init__(self, sparkContext, sparkSession=None, jsqlContext=None):

So we need to create SQLContext using named parameters, otherwise it would take intp.getSQLContext() as sparkSession which cause the issue.

What type of PR is it?

[Bug Fix]

Todos

  • - Task

What is the Jira issue?

How should this be tested?

Tested using the example code in ZEPPELIN-1411.

Screenshots (if appropriate)

image

Questions:

  • Does the licenses files need update? No
  • Is there breaking changes for older versions? No
  • Does this needs documentation? No

@zjffdu
Copy link
Contributor Author

zjffdu commented Sep 6, 2016

\cc @Leemoonsoo Please help review, thanks

@felixcheung
Copy link
Member

LGTM
It'll be great to add some test for pyspark interpreter?

@zjffdu
Copy link
Contributor Author

zjffdu commented Sep 6, 2016

It guess it is because PythonInterpreter depends on python environment, so there's no test for it yet.

@felixcheung
Copy link
Member

right. probably another PR, but I think we could use travis' addons support to install python via apt-get https://docs.travis-ci.com/user/installing-dependencies/
as we have for R.

@felixcheung
Copy link
Member

CI failed because of selenium, I think

@felixcheung
Copy link
Member

could you kick off CI again? Let's merge this after

@Leemoonsoo
Copy link
Member

Thanks @zjffdu for the contribution. Actually, we do have some tests for pyspark already.

Please see https://github.com/apache/zeppelin/blob/master/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java#L125

If it's not too much difficult, adding unit test for this case would be really beneficial.

@Leemoonsoo
Copy link
Member

I tested this branch with given example, but it doesn't work for me.
On my machine, it hangs on sqlContext.createDataFrame() and end up with errors like

kqueue: Too many open files in system

I'm not sure it's problem of this patch or not.
Could someone else test this patch, too?

@zjffdu
Copy link
Contributor Author

zjffdu commented Sep 9, 2016

@Leemoonsoo Can you guide me how to run this test ? I try to run it using maven, but fails, seems it depends on something.

@Leemoonsoo
Copy link
Member

@zjffdu

Once you build zeppelin,

mvn package [your profiles] -DskipTests

Then you can run this test, like

mvn package -pl 'zeppelin-interpreter,zeppelin-zengine,zeppelin-server' -Dtest=ZeppelinSparkClusterTest -DfailIfNoTests=false -Drat.skip=true

Let me know if it does not work.

@zjffdu zjffdu closed this Sep 12, 2016
@zjffdu zjffdu reopened this Sep 12, 2016
@zjffdu
Copy link
Contributor Author

zjffdu commented Sep 12, 2016

@Leemoonsoo , I follow the above command, but seems it doesn't work. I check AbstractTestRestApi, It seems pyspark related job would only run either in travis CI or set in spark standalone with SPARK_HOME is setup (pyspark needs to be set as true). Do I understand correctly ?

 // ci environment runs spark cluster for testing
      // so configure zeppelin use spark cluster
      if ("true".equals(System.getenv("CI"))) {
        // assume first one is spark
        InterpreterSetting sparkIntpSetting = null;
        for(InterpreterSetting intpSetting : ZeppelinServer.notebook.getInterpreterFactory().get()) {
          if (intpSetting.getName().equals("spark")) {
            sparkIntpSetting = intpSetting;
          }
        }

        // set spark master and other properties
        sparkIntpSetting.getProperties().setProperty("master", "spark://" + getHostname() + ":7071");
        sparkIntpSetting.getProperties().setProperty("spark.cores.max", "2");

        // set spark home for pyspark
        sparkIntpSetting.getProperties().setProperty("spark.home", getSparkHome());
        pySpark = true;
        sparkR = true;
        ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.getId());
      } else {
        // assume first one is spark
        InterpreterSetting sparkIntpSetting = null;
        for(InterpreterSetting intpSetting : ZeppelinServer.notebook.getInterpreterFactory().get()) {
          if (intpSetting.getName().equals("spark")) {
            sparkIntpSetting = intpSetting;
          }
        }

        String sparkHome = getSparkHome();
        if (sparkHome != null) {
          sparkIntpSetting.getProperties().setProperty("master", "spark://" + getHostname() + ":7071");
          sparkIntpSetting.getProperties().setProperty("spark.cores.max", "2");
          // set spark home for pyspark
          sparkIntpSetting.getProperties().setProperty("spark.home", sparkHome);
          pySpark = true;
          sparkR = true;
        }

@zjffdu zjffdu closed this Sep 12, 2016
@zjffdu zjffdu reopened this Sep 12, 2016
@Leemoonsoo
Copy link
Member

@zjffdu Right, it looks like AbstractTestRestApi need to be improved when CI is not defined.
So far, i think you can try download and run spark standalone cluster in this way

./testing/downloadSpark.sh 1.6.2 2.6
./testing/startSparkCluster.sh 1.6.2 2.6

And then try run the test cases, so getSparkHome() can find sparkHome.

@zjffdu
Copy link
Contributor Author

zjffdu commented Sep 13, 2016

@Leemoonsoo , I updated the unit test. And also made a little change AbstractTestRestApi, the standalone way doesn't work for me. So I allow user to export SPAKP_MASTER to run it in other modes. The failed CI seems irrelevant.

@Leemoonsoo
Copy link
Member

Leemoonsoo commented Sep 13, 2016

Thanks @zjffdu. I think second ci test profile failure is relevant.

ests run: 6, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 120.231 sec <<< FAILURE! - in org.apache.zeppelin.rest.ZeppelinSparkClusterTest
pySparkTest(org.apache.zeppelin.rest.ZeppelinSparkClusterTest)  Time elapsed: 3.842 sec  <<< FAILURE!
java.lang.AssertionError: expected:<FINISHED> but was:<ERROR>
    at org.junit.Assert.fail(Assert.java:88)
    at org.junit.Assert.failNotEquals(Assert.java:743)
    at org.junit.Assert.assertEquals(Assert.java:118)
    at org.junit.Assert.assertEquals(Assert.java:144)
    at org.apache.zeppelin.rest.ZeppelinSparkClusterTest.pySparkTest(ZeppelinSparkClusterTest.java:150)

Could you check?

@zjffdu zjffdu closed this Sep 14, 2016
@zjffdu zjffdu reopened this Sep 14, 2016
@zjffdu zjffdu closed this Sep 14, 2016
@zjffdu zjffdu reopened this Sep 14, 2016
@zjffdu zjffdu force-pushed the ZEPPELIN-1411 branch 2 times, most recently from f3db4f2 to a142d45 Compare September 17, 2016 03:58
@zjffdu zjffdu closed this Sep 17, 2016
@zjffdu zjffdu closed this Sep 17, 2016
@zjffdu zjffdu reopened this Sep 17, 2016
@zjffdu zjffdu closed this Sep 17, 2016
@zjffdu zjffdu reopened this Sep 17, 2016
@zjffdu zjffdu force-pushed the ZEPPELIN-1411 branch 5 times, most recently from 32cbff6 to a4fda47 Compare September 17, 2016 15:41
@zjffdu zjffdu closed this Sep 18, 2016
@zjffdu zjffdu reopened this Sep 18, 2016
@zjffdu zjffdu force-pushed the ZEPPELIN-1411 branch 5 times, most recently from 4384346 to 1ac1233 Compare September 18, 2016 08:10
// set spark home for pyspark
sparkIntpSetting.getProperties().setProperty("spark.home", sparkHome);
sparkIntpSetting.getProperties().setProperty("zeppelin.spark.useHiveContext", "false");
pySpark = true;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disable HiveContext, otherwise will hit the issue of multiple derby instance.

} else {
sparkIntpSetting.getProperties()
.setProperty("master", "spark://" + getHostname() + ":7071");
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow user to specify SPARK_MASTER, so that can run other modes (like yarn-client)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is testing code only, but doesn't seem like we are using this in tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is for local system test when user want to run it in other modes (e.g. yarn-client).

return sparkHome;
}
sparkHome = getSparkHomeRecursively(new File(System.getProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName())));
System.out.println("SPARK HOME detected " + sparkHome);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow user to specify SPARK_HOME, so that can use existing spark cluster

sc.stop();
sc = null;
sparkSession = null;
if (classServer != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set sparkSession as null, so that it will be created again if the interpreter is scoped

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stop should be called on sparkSession before sc.stop()
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SparkSession

(as of now this is ok since sparkSession.stop() simply calls sc.stop() but this could change)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, when sparkSession is not null (spark 2.0), sparkSession.stop() should be called first.

@zjffdu
Copy link
Contributor Author

zjffdu commented Sep 18, 2016

@Leemoonsoo Finally got the unit test passed (The remaining one failure is irrelevant).

Actually the test failure is caused by several bugs of SparkInterpreter. Here's the steps to reproduce the first issue.

  1. Use Spark2.0 and make SparkInterpeter as scoped
  2. Open note1 and run sample code to start SparkInterperer, then Open note2 and run sample code to start another SparkInterpreter, then you will hit the following issue.

image

The root cause of this issue is that the outputDir should be unique otherwise the second SparkInterpreter instance can not find the class in the outputDir of previous SparkInterpeter.

The second bug is that we should also set sparkSession as null. Otherwise it won't be created in the next second SparkInterperter.

The third bug is that we should disable HiveContext in AbstractTestRestApi, otherwise we will hit the issue of multiple derby instances running.

@Leemoonsoo
Copy link
Member

@zjffdu Thanks for great work!
CI failure looks irrelevant. Merge into master and branch-0.6 if there're no more discussions.

@asfgit asfgit closed this in c61f1fb Sep 21, 2016
asfgit pushed a commit that referenced this pull request Sep 21, 2016
… 'parseDataType'

The root cause is that SQLContext's signature changes in spark 2.0.
Spark 1.6
```
def __init__(self, sparkContext, sqlContext=None):
```
Spark 2.0
```
def __init__(self, sparkContext, sparkSession=None, jsqlContext=None):
```
So we need to create SQLContext using named parameters, otherwise it would take intp.getSQLContext() as sparkSession which cause the issue.

[Bug Fix]

* [ ] - Task

* https://issues.apache.org/jira/browse/ZEPPELIN-1411

Tested using the example code in ZEPPELIN-1411.

![image](https://cloud.githubusercontent.com/assets/164491/18260139/9bd702c0-741d-11e6-8b23-946c38a794c3.png)

* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <[email protected]>

Closes #1404 from zjffdu/ZEPPELIN-1411 and squashes the following commits:

40b080a [Jeff Zhang] retry
4922de1 [Jeff Zhang] log more logging for travis CI diangnose
4fe033d [Jeff Zhang] add unit test
296c63f [Jeff Zhang] ZEPPELIN-1411. UDF with pyspark not working - object has no attribute 'parseDataType'

(cherry picked from commit c61f1fb)
Signed-off-by: Lee moon soo <[email protected]>
pedrozatta pushed a commit to pedrozatta/zeppelin that referenced this pull request Oct 27, 2016
… 'parseDataType'

### What is this PR for?
The root cause is that SQLContext's signature changes in spark 2.0.
Spark 1.6
```
def __init__(self, sparkContext, sqlContext=None):
```
Spark 2.0
```
def __init__(self, sparkContext, sparkSession=None, jsqlContext=None):
```
So we need to create SQLContext using named parameters, otherwise it would take intp.getSQLContext() as sparkSession which cause the issue.

### What type of PR is it?
[Bug Fix]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-1411

### How should this be tested?
Tested using the example code in ZEPPELIN-1411.

### Screenshots (if appropriate)
![image](https://cloud.githubusercontent.com/assets/164491/18260139/9bd702c0-741d-11e6-8b23-946c38a794c3.png)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <[email protected]>

Closes apache#1404 from zjffdu/ZEPPELIN-1411 and squashes the following commits:

40b080a [Jeff Zhang] retry
4922de1 [Jeff Zhang] log more logging for travis CI diangnose
4fe033d [Jeff Zhang] add unit test
296c63f [Jeff Zhang] ZEPPELIN-1411. UDF with pyspark not working - object has no attribute 'parseDataType'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants