diff --git a/.travis.yml b/.travis.yml index a6d4f63c..41c743a2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,16 +13,16 @@ matrix: include: # Spark 2.0.0, Scala 2.11, and Avro 1.7.x - jdk: openjdk7 - scala: 2.11.7 - env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="2.0.0" TEST_AVRO_VERSION="1.7.6" TEST_AVRO_MAPRED_VERSION="1.7.7" + scala: 2.11.8 + env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="2.1.0" TEST_AVRO_VERSION="1.7.6" TEST_AVRO_MAPRED_VERSION="1.7.7" # Spark 2.0.0, Scala 2.10, and Avro 1.7.x - jdk: openjdk7 scala: 2.10.4 - env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="2.0.0" TEST_AVRO_VERSION="1.7.6" TEST_AVRO_MAPRED_VERSION="1.7.7" + env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="2.1.0" TEST_AVRO_VERSION="1.7.6" TEST_AVRO_MAPRED_VERSION="1.7.7" # Spark 2.0.0, Scala 2.10, and Avro 1.8.x - jdk: openjdk7 scala: 2.10.4 - env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="2.0.0" TEST_AVRO_VERSION="1.8.0" TEST_AVRO_MAPRED_VERSION="1.8.0" + env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="2.1.0" TEST_AVRO_VERSION="1.8.0" TEST_AVRO_MAPRED_VERSION="1.8.0" script: - ./dev/run-tests-travis.sh after_success: diff --git a/build.sbt b/build.sbt index 4d3bc51c..ccecadfa 100644 --- a/build.sbt +++ b/build.sbt @@ -2,13 +2,13 @@ name := "spark-avro" organization := "com.databricks" -scalaVersion := "2.11.7" +scalaVersion := "2.11.8" crossScalaVersions := Seq("2.10.5", "2.11.7") spName := "databricks/spark-avro" -sparkVersion := "2.0.0" +sparkVersion := "2.1.0" val testSparkVersion = settingKey[String]("The version of Spark to test against.") diff --git a/src/main/scala/com/databricks/spark/avro/AvroOutputWriter.scala b/src/main/scala/com/databricks/spark/avro/AvroOutputWriter.scala index 24d1b5a3..bc71564e 100644 --- a/src/main/scala/com/databricks/spark/avro/AvroOutputWriter.scala +++ b/src/main/scala/com/databricks/spark/avro/AvroOutputWriter.scala @@ -54,10 +54,7 @@ private[avro] class AvroOutputWriter( new AvroKeyOutputFormat[GenericRecord]() { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID") - val taskAttemptId: TaskAttemptID = context.getTaskAttemptID - val split = taskAttemptId.getTaskID.getId - new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension") + new Path(path) } @throws(classOf[IOException]) diff --git a/src/main/scala/com/databricks/spark/avro/AvroOutputWriterFactory.scala b/src/main/scala/com/databricks/spark/avro/AvroOutputWriterFactory.scala index 339eb147..0b3fc648 100644 --- a/src/main/scala/com/databricks/spark/avro/AvroOutputWriterFactory.scala +++ b/src/main/scala/com/databricks/spark/avro/AvroOutputWriterFactory.scala @@ -26,11 +26,16 @@ private[avro] class AvroOutputWriterFactory( recordName: String, recordNamespace: String) extends OutputWriterFactory { - def newInstance( + override def newInstance( path: String, - bucketId: Option[Int], dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { new AvroOutputWriter(path, context, schema, recordName, recordNamespace) } + + override def getFileExtension(context: TaskAttemptContext): String = { + ".avro" + } + + }