@@ -22,7 +22,10 @@ import scala.util.parsing.combinator.syntactical.StandardTokenParsers
2222import scala .util .parsing .combinator .PackratParsers
2323
2424import org .apache .spark .Logging
25+ import org .apache .spark .sql .SQLContext
2526import org .apache .spark .sql .catalyst .types ._
27+ import org .apache .spark .sql .execution .RunnableCommand
28+ import org .apache .spark .util .Utils
2629import org .apache .spark .sql .catalyst .plans .logical ._
2730import org .apache .spark .sql .catalyst .SqlLexical
2831
@@ -61,14 +64,14 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
6164
6265 // Data types.
6366 protected val STRING = Keyword (" STRING" )
64- protected val FLOAT = Keyword (" FLOAT " )
65- protected val INT = Keyword (" INT " )
67+ protected val BINARY = Keyword (" BINARY " )
68+ protected val BOOLEAN = Keyword (" BOOLEAN " )
6669 protected val TINYINT = Keyword (" TINYINT" )
6770 protected val SMALLINT = Keyword (" SMALLINT" )
68- protected val DOUBLE = Keyword (" DOUBLE " )
71+ protected val INT = Keyword (" INT " )
6972 protected val BIGINT = Keyword (" BIGINT" )
70- protected val BINARY = Keyword (" BINARY " )
71- protected val BOOLEAN = Keyword (" BOOLEAN " )
73+ protected val FLOAT = Keyword (" FLOAT " )
74+ protected val DOUBLE = Keyword (" DOUBLE " )
7275 protected val DECIMAL = Keyword (" DECIMAL" )
7376 protected val DATE = Keyword (" DATE" )
7477 protected val TIMESTAMP = Keyword (" TIMESTAMP" )
@@ -102,8 +105,8 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
102105 CREATE ~ TEMPORARY ~ TABLE ~> ident
103106 ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
104107 case tableName ~ columns ~ provider ~ opts =>
105- val tblColumns = if ( columns.isEmpty) Seq .empty else columns.get
106- CreateTableUsing (tableName, tblColumns , provider, opts)
108+ val userSpecifiedSchema = columns.flatMap(fields => Some ( StructType (fields)))
109+ CreateTableUsing (tableName, userSpecifiedSchema , provider, opts)
107110 }
108111 )
109112
@@ -179,6 +182,37 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
179182 }
180183}
181184
185+ private [sql] case class CreateTableUsing (
186+ tableName : String ,
187+ userSpecifiedSchema : Option [StructType ],
188+ provider : String ,
189+ options : Map [String , String ]) extends RunnableCommand {
190+
191+ def run (sqlContext : SQLContext ) = {
192+ val loader = Utils .getContextOrSparkClassLoader
193+ val clazz : Class [_] = try loader.loadClass(provider) catch {
194+ case cnf : java.lang.ClassNotFoundException =>
195+ try loader.loadClass(provider + " .DefaultSource" ) catch {
196+ case cnf : java.lang.ClassNotFoundException =>
197+ sys.error(s " Failed to load class for data source: $provider" )
198+ }
199+ }
200+ val relation = clazz.newInstance match {
201+ case dataSource : org.apache.spark.sql.sources.RelationProvider =>
202+ dataSource
203+ .asInstanceOf [org.apache.spark.sql.sources.RelationProvider ]
204+ .createRelation(sqlContext, new CaseInsensitiveMap (options))
205+ case dataSource : org.apache.spark.sql.sources.SchemaRelationProvider =>
206+ dataSource
207+ .asInstanceOf [org.apache.spark.sql.sources.SchemaRelationProvider ]
208+ .createRelation(sqlContext, new CaseInsensitiveMap (options), userSpecifiedSchema)
209+ }
210+
211+ sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)
212+ Seq .empty
213+ }
214+ }
215+
182216/**
183217 * Builds a map in which keys are case insensitive
184218 */
0 commit comments