Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat][Spark] Run tests against multiple spark version && relax spark version requirements #320

Closed
SemyonSinchenko opened this issue Jan 10, 2024 · 11 comments
Assignees
Labels
enhancement New feature or request

Comments

@SemyonSinchenko
Copy link
Member

Is your feature request related to a problem? Please describe.
Currently only Spark 3.2 + Hadoop 3.2 + Scala 2.12 is supported. But from the first view the spark/pyspark code looks like it should work with all the version of spark starting from 3.0 and with both Hadoop 2 and Hadoop 3. Also I do not see any blockers for support of scala 2.13.

Describe the solution you'd like

  • Provide multiple builds of graphar-spark, like graphar-spark-1.0-spark-3.1, graphar-spark-1.0-spark-3.2, etc. For me a nice example is aws deequ
  • Test pyspark bindings against multiple versions of pyspark and python. A good example of workflow is pydeequ
  • Relax spark requirements for pyspark package
  • Relax hadoop-3.2 requirements for spark/pyspark packages

Describe alternatives you've considered
I do not know an alternative solutions.

Additional context
In my experience, spark packages are used mostly on pre-configured clusters with fixed environment (like Databricks Runtimes). That's why it is important to relax spark-version requirements as much as possible. Otherwise, the package will be impossible to install just due dependencies conflicts.

@SemyonSinchenko SemyonSinchenko added the enhancement New feature or request label Jan 10, 2024
@acezen
Copy link
Contributor

acezen commented Jan 12, 2024

Good suggestion. I think we can make this issue as a tracking issue that keep track of a list task to support this feature.

@SemyonSinchenko
Copy link
Member Author

Ok. It is not as easy as I thought. The problem is mostly in datasources; a lot of things are changing here from version to version. What would be a better solution? For example, in 3.2 ParquetPartitionReaderFactory does not have an aggregation argument. But in 3.3 it does.

Option one would be something like this:

val parquetPartitionReaderFactoryClassConstructor = Class
  .forName(
    "org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory"
  )
  .getMethod("apply")

sparkSession.version match {
  case v if v.startsWith("3.2") =>
    parquetPartitionReaderFactoryClassConstructor
      .invoke(
        null,
        sqlConf,
        broadcastedConf,
        dataSchema,
        readDataSchema,
        readPartitionSchema,
        pushedFilters,
        new ParquetOptions(
          options.asCaseSensitiveMap.asScala.toMap,
          sqlConf
        )
      )
      .asInstanceOf[ParquetPartitionReaderFactory]
  case v if v.startsWith("3.3") =>
    parquetPartitionReaderFactoryClassConstructor
      .invoke(
        null,
        sqlConf,
        broadcastedConf,
        dataSchema,
        readDataSchema,
        readPartitionSchema,
        pushedFilters,
        None, // The newly added argument that is Option
        new ParquetOptions(
          options.asCaseSensitiveMap.asScala.toMap,
          sqlConf
        )
      )
      .asInstanceOf[ParquetPartitionReaderFactory]
}

(looks terrible)

Option two will be something like having bundle subdirectories and subprojects (we may have it only for datasource). Or to have branch-tags like spark3.2, spark3.3, etc.

@acezen What do you think about it? To be honest, I did not face such a case in my experience and I do not know what would be the better option.

@SemyonSinchenko
Copy link
Member Author

UPD: I had a talk with a couple of scala experienced guys about it, and it looks like reflection is a valid solution. Even spark itself uses reflection for working with different versions of Hive. So, I suggest going with reflection. I can do it at least for the scope of versions from 3.1.x to 3.4.x

@acezen
Copy link
Contributor

acezen commented Jan 30, 2024

UPD: I had a talk with a couple of scala experienced guys about it, and it looks like reflection is a valid solution. Even spark itself uses reflection for working with different versions of Hive. So, I suggest going with reflection. I can do it at least for the scope of versions from 3.1.x to 3.4.x

reflection looks good to me. As you said, spark uses reflection in the same way, we can follow the strategy.
hi, @lixueclaire, do you have any insight about the solution Sem list above?

@lixueclaire
Copy link
Contributor

Hi, @SemyonSinchenko, I'm impressed with your proposals—they seem quite solid. I did notice, though, that certain features such as the "ZSTD" compression, are supported since Spark v3.2. To maintain compatibility with Spark v3.1.x, could we look into making these particular features optional?

@SemyonSinchenko
Copy link
Member Author

Hi, @SemyonSinchenko, I'm impressed with your proposals—they seem quite solid. I did notice, though, that certain features such as the "ZSTD" compression, are supported since Spark v3.2. To maintain compatibility with Spark v3.1.x, could we look into making these particular features optional?

I would suggest just using other compression for older versions of spark. Or we can start with something like 3.2.x - 3.4.x. And extend the support only in the case of issues from users..

@acezen
Copy link
Contributor

acezen commented Jan 30, 2024

Hi, @SemyonSinchenko, I'm impressed with your proposals—they seem quite solid. I did notice, though, that certain features such as the "ZSTD" compression, are supported since Spark v3.2. To maintain compatibility with Spark v3.1.x, could we look into making these particular features optional?

I would suggest just using other compression for older versions of spark. Or we can start with something like 3.2.x - 3.4.x. And extend the support only in the case of issues from users..
we can just using snappy compression for older version of spark.

@SemyonSinchenko
Copy link
Member Author

Bad news: I tried hard but failed to fix everything. A lot of changes from 3.2 to 3.3 in the part of FiltersPushDown. What do you think about splitting datasources to datasources.common and datasources.spark32/datasources.spark33/etc.? @acezen @lixueclaire

For example, there are cases when the signature of the parent class was changed and I got errors like error: method withFilters overrides nothing. And I have no idea how to fix it with reflection-trick.

@acezen
Copy link
Contributor

acezen commented Jan 30, 2024

Bad news: I tried hard but failed to fix everything. A lot of changes from 3.2 to 3.3 in the part of FiltersPushDown. What do you think about splitting datasources to datasources.common and datasources.spark32/datasources.spark33/etc.? @acezen @lixueclaire

For example, there are cases when the signature of the parent class was changed and I got errors like error: method withFilters overrides nothing. And I have no idea how to fix it with reflection-trick.

Yes,the datasource part may change a lot for different version of Spark, feel free to change the solution and choose the most appropriate for the moment.

@acezen
Copy link
Contributor

acezen commented Feb 5, 2024

hi, @SemyonSinchenko, if you have some work on the issue, you can just create a working in progress PR and I think make the issue splits to some small commits will help back tracing.

@SemyonSinchenko
Copy link
Member Author

SemyonSinchenko commented Feb 23, 2024

  • Split datasources and core in GraphAr spark
  • Support spark 3.3.x as an additional Maven profile and a separate datasoutces bundle
  • Provide multiple spark JARs in Maven
  • Relax GraphAr PySpark requirements from pyspark 3.2 to ">=3.2,<=3.3"
  • Create a page in the documentation about versions compatibility

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants