1717
1818package org .apache .spark .sql .sources
1919
20+ import scala .language .implicitConversions
21+ import scala .util .parsing .combinator .syntactical .StandardTokenParsers
22+ import scala .util .parsing .combinator .{RegexParsers , PackratParsers }
23+
2024import org .apache .spark .Logging
25+ import org .apache .spark .annotation .DeveloperApi
2126import org .apache .spark .sql .SQLContext
27+ import org .apache .spark .sql .catalyst .types ._
2228import org .apache .spark .sql .execution .RunnableCommand
2329import org .apache .spark .util .Utils
24-
25- import scala .language .implicitConversions
26- import scala .util .parsing .combinator .lexical .StdLexical
27- import scala .util .parsing .combinator .syntactical .StandardTokenParsers
28- import scala .util .parsing .combinator .PackratParsers
29-
3030import org .apache .spark .sql .catalyst .plans .logical ._
3131import org .apache .spark .sql .catalyst .SqlLexical
3232
@@ -49,6 +49,21 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
4949 protected implicit def asParser (k : Keyword ): Parser [String ] =
5050 lexical.allCaseVersions(k.str).map(x => x : Parser [String ]).reduce(_ | _)
5151
52+ // data types
53+ protected val STRING = Keyword (" STRING" )
54+ protected val DOUBLE = Keyword (" DOUBLE" )
55+ protected val BOOLEAN = Keyword (" BOOLEAN" )
56+ protected val FLOAT = Keyword (" FLOAT" )
57+ protected val INT = Keyword (" INT" )
58+ protected val TINYINT = Keyword (" TINYINT" )
59+ protected val SMALLINT = Keyword (" SMALLINT" )
60+ protected val BIGINT = Keyword (" BIGINT" )
61+ protected val BINARY = Keyword (" BINARY" )
62+ protected val DECIMAL = Keyword (" DECIMAL" )
63+ protected val DATE = Keyword (" DATE" )
64+ protected val TIMESTAMP = Keyword (" TIMESTAMP" )
65+ protected val VARCHAR = Keyword (" VARCHAR" )
66+
5267 protected val CREATE = Keyword (" CREATE" )
5368 protected val TEMPORARY = Keyword (" TEMPORARY" )
5469 protected val TABLE = Keyword (" TABLE" )
@@ -67,26 +82,129 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
6782 protected lazy val ddl : Parser [LogicalPlan ] = createTable
6883
6984 /**
70- * CREATE TEMPORARY TABLE avroTable
85+ * ` CREATE TEMPORARY TABLE avroTable
7186 * USING org.apache.spark.sql.avro
72- * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")
87+ * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
88+ * or
89+ * `CREATE TEMPORARY TABLE avroTable(intField int, stringField string...)
90+ * USING org.apache.spark.sql.avro
91+ * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
7392 */
7493 protected lazy val createTable : Parser [LogicalPlan ] =
75- CREATE ~ TEMPORARY ~ TABLE ~> ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
94+ ( CREATE ~ TEMPORARY ~ TABLE ~> ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
7695 case tableName ~ provider ~ opts =>
77- CreateTableUsing (tableName, provider, opts)
96+ CreateTableUsing (tableName, Seq .empty, provider, opts)
97+ }
98+ |
99+ CREATE ~ TEMPORARY ~ TABLE ~> ident
100+ ~ tableCols ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
101+ case tableName ~ tableColumns ~ provider ~ opts =>
102+ CreateTableUsing (tableName, tableColumns, provider, opts)
78103 }
104+ )
105+
106+ protected lazy val metastoreTypes = new MetastoreTypes
107+
108+ protected lazy val tableCols : Parser [Seq [StructField ]] = " (" ~> repsep(column, " ," ) <~ " )"
79109
80110 protected lazy val options : Parser [Map [String , String ]] =
81111 " (" ~> repsep(pair, " ," ) <~ " )" ^^ { case s : Seq [(String , String )] => s.toMap }
82112
83113 protected lazy val className : Parser [String ] = repsep(ident, " ." ) ^^ { case s => s.mkString(" ." )}
84114
85115 protected lazy val pair : Parser [(String , String )] = ident ~ stringLit ^^ { case k ~ v => (k,v) }
116+
117+ protected lazy val column : Parser [StructField ] =
118+ ident ~ ident ^^ { case name ~ typ =>
119+ StructField (name, metastoreTypes.toDataType(typ))
120+ }
121+ }
122+
123+ /**
124+ * :: DeveloperApi ::
125+ * Provides a parser for data types.
126+ */
127+ @ DeveloperApi
128+ private [sql] class MetastoreTypes extends RegexParsers {
129+ protected lazy val primitiveType : Parser [DataType ] =
130+ " string" ^^^ StringType |
131+ " float" ^^^ FloatType |
132+ " int" ^^^ IntegerType |
133+ " tinyint" ^^^ ByteType |
134+ " smallint" ^^^ ShortType |
135+ " double" ^^^ DoubleType |
136+ " bigint" ^^^ LongType |
137+ " binary" ^^^ BinaryType |
138+ " boolean" ^^^ BooleanType |
139+ fixedDecimalType | // Hive 0.13+ decimal with precision/scale
140+ " decimal" ^^^ DecimalType .Unlimited | // Hive 0.12 decimal with no precision/scale
141+ " date" ^^^ DateType |
142+ " timestamp" ^^^ TimestampType |
143+ " varchar\\ ((\\ d+)\\ )" .r ^^^ StringType
144+
145+ protected lazy val fixedDecimalType : Parser [DataType ] =
146+ (" decimal" ~> " (" ~> " \\ d+" .r) ~ (" ," ~> " \\ d+" .r <~ " )" ) ^^ {
147+ case precision ~ scale =>
148+ DecimalType (precision.toInt, scale.toInt)
149+ }
150+
151+ protected lazy val arrayType : Parser [DataType ] =
152+ " array" ~> " <" ~> dataType <~ " >" ^^ {
153+ case tpe => ArrayType (tpe)
154+ }
155+
156+ protected lazy val mapType : Parser [DataType ] =
157+ " map" ~> " <" ~> dataType ~ " ," ~ dataType <~ " >" ^^ {
158+ case t1 ~ _ ~ t2 => MapType (t1, t2)
159+ }
160+
161+ protected lazy val structField : Parser [StructField ] =
162+ " [a-zA-Z0-9_]*" .r ~ " :" ~ dataType ^^ {
163+ case name ~ _ ~ tpe => StructField (name, tpe, nullable = true )
164+ }
165+
166+ protected lazy val structType : Parser [DataType ] =
167+ " struct" ~> " <" ~> repsep(structField," ," ) <~ " >" ^^ {
168+ case fields => new StructType (fields)
169+ }
170+
171+ private [sql] lazy val dataType : Parser [DataType ] =
172+ arrayType |
173+ mapType |
174+ structType |
175+ primitiveType
176+
177+ def toDataType (metastoreType : String ): DataType = parseAll(dataType, metastoreType) match {
178+ case Success (result, _) => result
179+ case failure : NoSuccess => sys.error(s " Unsupported dataType: $metastoreType" )
180+ }
181+
182+ def toMetastoreType (dt : DataType ): String = dt match {
183+ case ArrayType (elementType, _) => s " array< ${toMetastoreType(elementType)}> "
184+ case StructType (fields) =>
185+ s " struct< ${fields.map(f => s " ${f.name}: ${toMetastoreType(f.dataType)}" ).mkString(" ," )}> "
186+ case MapType (keyType, valueType, _) =>
187+ s " map< ${toMetastoreType(keyType)}, ${toMetastoreType(valueType)}> "
188+ case StringType => " string"
189+ case FloatType => " float"
190+ case IntegerType => " int"
191+ case ByteType => " tinyint"
192+ case ShortType => " smallint"
193+ case DoubleType => " double"
194+ case LongType => " bigint"
195+ case BinaryType => " binary"
196+ case BooleanType => " boolean"
197+ case DateType => " date"
198+ case d : DecimalType => " decimal"
199+ case TimestampType => " timestamp"
200+ case NullType => " void"
201+ case udt : UserDefinedType [_] => toMetastoreType(udt.sqlType)
202+ }
86203}
87204
88205private [sql] case class CreateTableUsing (
89206 tableName : String ,
207+ tableCols : Seq [StructField ],
90208 provider : String ,
91209 options : Map [String , String ]) extends RunnableCommand {
92210
@@ -100,7 +218,8 @@ private[sql] case class CreateTableUsing(
100218 }
101219 }
102220 val dataSource = clazz.newInstance().asInstanceOf [org.apache.spark.sql.sources.RelationProvider ]
103- val relation = dataSource.createRelation(sqlContext, new CaseInsensitiveMap (options))
221+ val relation = dataSource.createRelation(
222+ sqlContext, new CaseInsensitiveMap (options), Some (StructType (tableCols)))
104223
105224 sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)
106225 Seq .empty
0 commit comments