@@ -22,7 +22,10 @@ import scala.util.parsing.combinator.syntactical.StandardTokenParsers
2222import scala .util .parsing .combinator .PackratParsers
2323
2424import org .apache .spark .Logging
25+ import org .apache .spark .sql .SQLContext
2526import org .apache .spark .sql .catalyst .types ._
27+ import org .apache .spark .sql .execution .RunnableCommand
28+ import org .apache .spark .util .Utils
2629import org .apache .spark .sql .catalyst .plans .logical ._
2730import org .apache .spark .sql .catalyst .SqlLexical
2831
@@ -179,6 +182,44 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
179182 }
180183}
181184
185+ private [sql] case class CreateTableUsing (
186+ tableName : String ,
187+ tableCols : Seq [StructField ],
188+ provider : String ,
189+ options : Map [String , String ]) extends RunnableCommand {
190+
191+ def run (sqlContext : SQLContext ) = {
192+ val loader = Utils .getContextOrSparkClassLoader
193+ val clazz : Class [_] = try loader.loadClass(provider) catch {
194+ case cnf : java.lang.ClassNotFoundException =>
195+ try loader.loadClass(provider + " .DefaultSource" ) catch {
196+ case cnf : java.lang.ClassNotFoundException =>
197+ sys.error(s " Failed to load class for data source: $provider" )
198+ }
199+ }
200+ val relation = clazz.newInstance match {
201+ case dataSource : org.apache.spark.sql.sources.RelationProvider =>
202+ dataSource
203+ .asInstanceOf [org.apache.spark.sql.sources.RelationProvider ]
204+ .createRelation(sqlContext, new CaseInsensitiveMap (options))
205+ case dataSource : org.apache.spark.sql.sources.SchemaRelationProvider =>
206+ if (tableCols.isEmpty) {
207+ dataSource
208+ .asInstanceOf [org.apache.spark.sql.sources.SchemaRelationProvider ]
209+ .createRelation(sqlContext, new CaseInsensitiveMap (options))
210+ } else {
211+ dataSource
212+ .asInstanceOf [org.apache.spark.sql.sources.SchemaRelationProvider ]
213+ .createRelation(
214+ sqlContext, new CaseInsensitiveMap (options), Some (StructType (tableCols)))
215+ }
216+ }
217+
218+ sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)
219+ Seq .empty
220+ }
221+ }
222+
182223/**
183224 * Builds a map in which keys are case insensitive
184225 */
0 commit comments