Skip to content

Commit 98c8a3b

Browse files
author
Andrew Or
committed
Merge in @yhuai's changes
1 parent 27e1f38 commit 98c8a3b

File tree

1 file changed

+232
-0
lines changed

1 file changed

+232
-0
lines changed
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.catalog
19+
20+
import java.util.concurrent.ConcurrentHashMap
21+
22+
import org.apache.spark.sql.catalyst.TableIdentifier
23+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
24+
25+
26+
/**
27+
* An internal catalog that is used by a Spark Session. This internal catalog serves as a
28+
* proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
29+
* tables and functions of the Spark Session that it belongs to.
30+
*/
31+
abstract class SessionCatalog(catalog: ExternalCatalog) {
32+
import ExternalCatalog._
33+
34+
private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
35+
36+
private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
37+
38+
// --------------------------------------------------------------------------
39+
// Databases
40+
// All methods in this category interact directly with the underlying catalog.
41+
// --------------------------------------------------------------------------
42+
43+
def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
44+
45+
def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
46+
47+
def alterDatabase(dbDefinition: CatalogDatabase): Unit
48+
49+
def getDatabase(db: String): CatalogDatabase
50+
51+
def databaseExists(db: String): Boolean
52+
53+
def listDatabases(): Seq[String]
54+
55+
def listDatabases(pattern: String): Seq[String]
56+
57+
// --------------------------------------------------------------------------
58+
// Tables
59+
// --------------------------------------------------------------------------
60+
61+
// --------------------------------------------------------------------------
62+
// Tables: Methods for metastore tables.
63+
// Methods in this category are only used for metastore tables, which store
64+
// metadata in the underlying catalog.
65+
// --------------------------------------------------------------------------
66+
67+
def createTable(db: String, tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
68+
69+
/**
70+
* Alters a table whose name matches the one specified in `tableDefinition`,
71+
* assuming the table exists.
72+
*
73+
* Note: If the underlying implementation does not support altering a certain field,
74+
* this becomes a no-op.
75+
*/
76+
def alterTable(db: String, tableDefinition: CatalogTable): Unit
77+
78+
/**
79+
* Retrieves the metadata of a table called `table` in the database `db`.
80+
*/
81+
def getTable(db: String, table: String): CatalogTable
82+
83+
// --------------------------------------------------------------------------
84+
// Tables: Methods for metastore tables or temp tables.
85+
// --------------------------------------------------------------------------
86+
87+
/**
88+
* Creates a temporary table. If there is already a temporary table having the same name,
89+
* the table definition of that table will be updated to the new definition.
90+
*/
91+
// TODO: Should we automatically overwrite the existing temp table?
92+
// Postgres and Hive will complain if a temp table is already defined.
93+
def createTempTable(tableIdent: TableIdentifier, tableDefinition: LogicalPlan): Unit
94+
95+
def renameTable(
96+
specifiedDB: Option[String],
97+
currentDB: String,
98+
oldName: String,
99+
newName: String): Unit
100+
101+
/**
102+
* Drops a table. If a database name is not provided, this method will drop the table with
103+
* the given name from the temporary table name space as well as the table
104+
* in the current database. If a database name is provided, this method only drops the table
105+
* with the given name from the given database.
106+
*/
107+
// TODO: When a temp table and a table in the current db have the same name, should we
108+
// only drop the temp table when a database is not provided (Postgresql's semantic)?
109+
def dropTable(
110+
tableIdent: TableIdentifier,
111+
currentDB: String,
112+
ignoreIfNotExists: Boolean): Unit
113+
114+
/**
115+
* Returns a [[LogicalPlan]] representing the requested table. This method is used
116+
* when we need to create a query plan for a given table.
117+
*
118+
* This method is different from `getTable`, which only returns the metadata of the table
119+
* in the form of [[CatalogTable]]. The [[LogicalPlan]] returned by this method contains
120+
* the metadata of the table in the form of [[CatalogTable]].
121+
*/
122+
def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan
123+
124+
def listTables(specifiedDB: Option[String], currentDB: String): Seq[String]
125+
126+
def listTables(specifiedDB: Option[String], currentDB: String, pattern: String): Seq[String]
127+
128+
// --------------------------------------------------------------------------
129+
// Partitions
130+
// All methods in this category interact directly with the underlying catalog.
131+
// --------------------------------------------------------------------------
132+
// TODO: We need to figure out how these methods interact with our data source tables.
133+
// For data source tables, we do not store values of partitioning columns in the metastore.
134+
// For now, partition values of a data source table will be automatically discovered
135+
// when we load the table.
136+
137+
def createPartitions(
138+
db: String,
139+
table: String,
140+
parts: Seq[CatalogTablePartition],
141+
ignoreIfExists: Boolean): Unit
142+
143+
def dropPartitions(
144+
db: String,
145+
table: String,
146+
parts: Seq[TablePartitionSpec],
147+
ignoreIfNotExists: Boolean): Unit
148+
149+
/**
150+
* Override the specs of one or many existing table partitions, assuming they exist.
151+
* This assumes index i of `specs` corresponds to index i of `newSpecs`.
152+
*/
153+
def renamePartitions(
154+
db: String,
155+
table: String,
156+
specs: Seq[TablePartitionSpec],
157+
newSpecs: Seq[TablePartitionSpec]): Unit
158+
159+
/**
160+
* Alter one or many table partitions whose specs that match those specified in `parts`,
161+
* assuming the partitions exist.
162+
*
163+
* Note: If the underlying implementation does not support altering a certain field,
164+
* this becomes a no-op.
165+
*/
166+
def alterPartitions(
167+
db: String,
168+
table: String,
169+
parts: Seq[CatalogTablePartition]): Unit
170+
171+
def getPartition(db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition
172+
173+
def listPartitions(db: String, table: String): Seq[CatalogTablePartition]
174+
175+
// --------------------------------------------------------------------------
176+
// Functions
177+
// --------------------------------------------------------------------------
178+
179+
// --------------------------------------------------------------------------
180+
// Functions: Methods for metastore functions (permanent UDFs).
181+
// --------------------------------------------------------------------------
182+
183+
def createFunction(db: String, funcDefinition: CatalogFunction): Unit
184+
185+
/**
186+
* Drops a permanent function with the given name from the given database.
187+
*/
188+
def dropFunction(db: String, funcName: String): Unit
189+
190+
// --------------------------------------------------------------------------
191+
// Functions: Methods for metastore functions (permanent UDFs) or temp functions.
192+
// --------------------------------------------------------------------------
193+
194+
def createTempFunction(funcDefinition: CatalogFunction): Unit
195+
196+
/**
197+
* Drops a temporary function with the given name.
198+
*/
199+
// TODO: The reason that we distinguish dropFunction and dropTempFunction is that
200+
// Hive has DROP FUNCTION and DROP TEMPORARY FUNCTION. We may want to consolidate
201+
// dropFunction and dropTempFunction.
202+
def dropTempFunction(funcName: String): Unit
203+
204+
def renameFunction(
205+
specifiedDB: Option[String],
206+
currentDB: String,
207+
oldName: String,
208+
newName: String): Unit
209+
210+
/**
211+
* Alter a function whose name that matches the one specified in `funcDefinition`,
212+
* assuming the function exists.
213+
*
214+
* Note: If the underlying implementation does not support altering a certain field,
215+
* this becomes a no-op.
216+
*/
217+
def alterFunction(
218+
specifiedDB: Option[String],
219+
currentDB: String,
220+
funcDefinition: CatalogFunction): Unit
221+
222+
def getFunction(
223+
specifiedDB: Option[String],
224+
currentDB: String,
225+
funcName: String): CatalogFunction
226+
227+
def listFunctions(
228+
specifiedDB: Option[String],
229+
currentDB: String,
230+
pattern: String): Seq[String]
231+
232+
}

0 commit comments

Comments
 (0)