Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ public class BuiltInFunctionNamespaceManager
implements FunctionNamespaceManager<BuiltInFunction>
{
public static final CatalogSchemaName DEFAULT_NAMESPACE = new CatalogSchemaName("presto", "default");
public static final String NAME = "_builtin";
public static final String ID = "builtin";

private final TypeManager typeManager;
private final LoadingCache<Signature, SpecializedFunctionKey> specializedFunctionKeyCache;
Expand Down Expand Up @@ -705,7 +705,7 @@ public void dropFunction(QualifiedFunctionName functionName, Optional<List<TypeS

public String getName()
{
return NAME;
return ID;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public class FunctionManager
private final FunctionInvokerProvider functionInvokerProvider;
private final Map<String, FunctionNamespaceManagerFactory> functionNamespaceManagerFactories = new ConcurrentHashMap<>();
private final HandleResolver handleResolver;
private final Map<CatalogSchemaPrefix, FunctionNamespaceManager<?>> functionNamespaces = new ConcurrentHashMap<>();
private final Map<CatalogSchemaPrefix, String> functionNamespaces = new ConcurrentHashMap<>();
private final Map<String, FunctionNamespaceManager<?>> functionNamespaceManagers = new ConcurrentHashMap<>();

private final boolean listNonBuiltInFunctions;

Expand All @@ -111,14 +112,15 @@ public FunctionManager(
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.builtInFunctionNamespaceManager = new BuiltInFunctionNamespaceManager(typeManager, blockEncodingSerde, featuresConfig, this);
this.functionNamespaces.put(DEFAULT_NAMESPACE.asCatalogSchemaPrefix(), builtInFunctionNamespaceManager);
this.functionNamespaces.put(DEFAULT_NAMESPACE.asCatalogSchemaPrefix(), BuiltInFunctionNamespaceManager.ID);
this.functionNamespaceManagers.put(BuiltInFunctionNamespaceManager.ID, builtInFunctionNamespaceManager);
this.functionInvokerProvider = new FunctionInvokerProvider(this);
this.handleResolver = handleResolver;
if (typeManager instanceof TypeRegistry) {
((TypeRegistry) typeManager).setFunctionManager(this);
}
// TODO: Provide a more encapsulated way for TransactionManager to register FunctionNamespaceManager
transactionManager.registerFunctionNamespaceManager(BuiltInFunctionNamespaceManager.NAME, builtInFunctionNamespaceManager);
transactionManager.registerFunctionNamespaceManager(BuiltInFunctionNamespaceManager.ID, builtInFunctionNamespaceManager);
this.listNonBuiltInFunctions = featuresConfig.isListNonBuiltInFunctions();
}

Expand All @@ -129,17 +131,24 @@ public FunctionManager(TypeManager typeManager, BlockEncodingSerde blockEncoding
this(typeManager, createTestTransactionManager(), blockEncodingSerde, featuresConfig, new HandleResolver());
}

public void loadFunctionNamespaces(String functionNamespaceManagerName, List<String> catalogSchemaPrefixes, Map<String, String> properties)
public void loadFunctionNamespaceManager(
String functionNamespaceManagerName,
String functionNamespaceManagerId,
List<String> catalogSchemaPrefixes,
Map<String, String> properties)
{
requireNonNull(functionNamespaceManagerName, "connectorName is null");
requireNonNull(functionNamespaceManagerName, "functionNamespaceManagerName is null");
FunctionNamespaceManagerFactory factory = functionNamespaceManagerFactories.get(functionNamespaceManagerName);
checkState(factory != null, "No function namespace manager for %s", functionNamespaceManagerName);
checkState(factory != null, "No factory for function namespace manager %s", functionNamespaceManagerName);
FunctionNamespaceManager<?> functionNamespaceManager = factory.create(properties);

transactionManager.registerFunctionNamespaceManager(functionNamespaceManagerName, functionNamespaceManager);
transactionManager.registerFunctionNamespaceManager(functionNamespaceManagerId, functionNamespaceManager);
if (functionNamespaceManagers.putIfAbsent(functionNamespaceManagerId, functionNamespaceManager) != null) {
throw new IllegalArgumentException(format("Function namespace manager [%s] is already registered", functionNamespaceManagerId));
}
for (String catalogSchemaPrefix : catalogSchemaPrefixes) {
if (functionNamespaces.putIfAbsent(CatalogSchemaPrefix.of(catalogSchemaPrefix), functionNamespaceManager) != null) {
throw new IllegalArgumentException(format("Function namespace manager '%s' already registered to handle function namespace '%s'", factory.getName(), catalogSchemaPrefix));
if (functionNamespaces.putIfAbsent(CatalogSchemaPrefix.of(catalogSchemaPrefix), functionNamespaceManagerId) != null) {
throw new IllegalArgumentException(format("Function namespace [%s] is already registered to function namespace manager [%s]", catalogSchemaPrefix, functionNamespaceManagerId));
}
}
}
Expand All @@ -164,11 +173,11 @@ public void registerBuiltInFunctions(List<? extends BuiltInFunction> functions)

public List<SqlFunction> listFunctions()
{
Set<FunctionNamespaceManager<?>> functionNamespaceManagers = listNonBuiltInFunctions ?
ImmutableSet.copyOf(functionNamespaces.values()) :
Collection<FunctionNamespaceManager<?>> managers = listNonBuiltInFunctions ?
functionNamespaceManagers.values() :
ImmutableSet.of(builtInFunctionNamespaceManager);

return functionNamespaceManagers.stream()
return managers.stream()
.flatMap(manager -> manager.listFunctions().stream())
.filter(function -> !function.isHidden())
.collect(toImmutableList());
Expand Down Expand Up @@ -220,17 +229,18 @@ public FunctionHandle resolveFunction(Optional<TransactionId> transactionId, Qua

public FunctionHandle resolveFunction(Optional<TransactionId> transactionId, QualifiedFunctionName functionName, List<TypeSignatureProvider> parameterTypes)
{
Optional<FunctionNamespaceManager<?>> functionNamespaceManager = getServingFunctionNamespaceManager(functionName.getFunctionNamespace());
if (!functionNamespaceManager.isPresent()) {
Optional<String> functionNamespaceManagerId = getServingFunctionNamespaceManagerId(functionName.getFunctionNamespace());
if (!functionNamespaceManagerId.isPresent()) {
throw new PrestoException(FUNCTION_NOT_FOUND, constructFunctionNotFoundErrorMessage(functionName, parameterTypes, ImmutableList.of()));
}

Optional<FunctionNamespaceTransactionHandle> transactionHandle = transactionId
.map(id -> transactionManager.getFunctionNamespaceTransaction(id, functionNamespaceManager.get().getName()));
Collection<? extends SqlFunction> candidates = functionNamespaceManager.get().getFunctions(transactionHandle, functionName);
.map(id -> transactionManager.getFunctionNamespaceTransaction(id, functionNamespaceManagerId.get()));
FunctionNamespaceManager<?> functionNamespaceManager = functionNamespaceManagers.get(functionNamespaceManagerId.get());
Collection<? extends SqlFunction> candidates = functionNamespaceManager.getFunctions(transactionHandle, functionName);

try {
return lookupFunction(functionNamespaceManager.get(), transactionHandle, functionName, parameterTypes, candidates);
return lookupFunction(functionNamespaceManager, transactionHandle, functionName, parameterTypes, candidates);
}
catch (PrestoException e) {
if (e.getErrorCode().getCode() != FUNCTION_NOT_FOUND.toErrorCode().getCode()) {
Expand All @@ -240,7 +250,7 @@ public FunctionHandle resolveFunction(Optional<TransactionId> transactionId, Qua

Optional<Signature> match = matchFunctionWithCoercion(candidates, parameterTypes);
if (match.isPresent()) {
return functionNamespaceManager.get().getFunctionHandle(transactionHandle, match.get());
return functionNamespaceManager.getFunctionHandle(transactionHandle, match.get());
}

if (functionName.getFunctionName().startsWith(MAGIC_LITERAL_FUNCTION_PREFIX)) {
Expand Down Expand Up @@ -378,22 +388,27 @@ private FunctionHandle lookupFunction(
}

private Optional<FunctionNamespaceManager<?>> getServingFunctionNamespaceManager(CatalogSchemaName functionNamespace)
{
return getServingFunctionNamespaceManagerId(functionNamespace).map(functionNamespaceManagers::get);
}

private Optional<String> getServingFunctionNamespaceManagerId(CatalogSchemaName functionNamespace)
{
if (functionNamespace.equals(DEFAULT_NAMESPACE)) {
return Optional.of(builtInFunctionNamespaceManager);
return Optional.of(BuiltInFunctionNamespaceManager.ID);
}

CatalogSchemaPrefix bestMatch = null;
FunctionNamespaceManager<?> servingFunctionNamespaceManager = null;
String servingFunctionNamespaceManagerId = null;

for (Map.Entry<CatalogSchemaPrefix, FunctionNamespaceManager<?>> entry : functionNamespaces.entrySet()) {
for (Map.Entry<CatalogSchemaPrefix, String> entry : functionNamespaces.entrySet()) {
CatalogSchemaPrefix prefix = entry.getKey();
if (prefix.includes(functionNamespace) && (bestMatch == null || bestMatch.includes(prefix))) {
bestMatch = prefix;
servingFunctionNamespaceManager = entry.getValue();
servingFunctionNamespaceManagerId = entry.getValue();
}
}
return Optional.ofNullable(servingFunctionNamespaceManager);
return Optional.ofNullable(servingFunctionNamespaceManagerId);
}

private String constructFunctionNotFoundErrorMessage(QualifiedFunctionName functionName, List<TypeSignatureProvider> parameterTypes, Collection<? extends SqlFunction> candidates)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import static com.facebook.presto.util.PropertiesUtil.loadProperties;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.io.Files.getNameWithoutExtension;

public class StaticFunctionNamespaceStore
{
Expand Down Expand Up @@ -61,14 +62,19 @@ public void loadFunctionNamespaceManagers()
private void loadFunctionNamespaceManager(File file)
throws Exception
{
log.info("-- Loading function namespace from %s --", file);
String functionNamespaceManagerId = getNameWithoutExtension(file.getName());

log.info("-- Loading function namespace manager from %s --", file);
Map<String, String> properties = new HashMap<>(loadProperties(file));

String functionNamespaceManagerName = properties.remove("function-namespace-manager.name");
checkState(functionNamespaceManagerName != null, "Function namespace configuration %s does not contain function-namespace-manager.name", file.getAbsoluteFile());

String functionNamespaces = properties.remove("serving-namespaces");
checkState(functionNamespaces != null, "Function namespace configuration %s does not contain serving-namespaces", file.getAbsoluteFile());
functionManager.loadFunctionNamespaces(functionNamespaceManagerName, SPLITTER.splitToList(functionNamespaces), properties);
log.info("-- Added function namespace %s managed by function manager %s --", functionNamespaces, functionNamespaceManagerName);

functionManager.loadFunctionNamespaceManager(functionNamespaceManagerName, functionNamespaceManagerId, SPLITTER.splitToList(functionNamespaces), properties);
log.info("-- Added function namespaces [%s] using function namespace manager [%s] --", functionNamespaces, functionNamespaceManagerId);
}

private static List<File> listFiles(File dir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,13 @@
public class InMemoryFunctionNamespaceManager
extends AbstractSqlInvokedFunctionNamespaceManager
{
private static final String NAME = "_in_memory";

private final Map<SqlFunctionId, SqlInvokedFunction> latestFunctions = new ConcurrentHashMap<>();

public InMemoryFunctionNamespaceManager(SqlInvokedFunctionNamespaceManagerConfig config)
{
super(config);
}

@Override
public String getName()
{
return NAME;
}

@Override
public synchronized void createFunction(SqlInvokedFunction function, boolean replace)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,16 +235,16 @@ public ConnectorTransactionHandle getConnectorTransaction(TransactionId transact
}

@Override
public synchronized void registerFunctionNamespaceManager(String functionNamespaceManagerName, FunctionNamespaceManager<?> functionNamespaceManager)
public synchronized void registerFunctionNamespaceManager(String functionNamespaceManagerId, FunctionNamespaceManager<?> functionNamespaceManager)
{
checkArgument(!functionNamespaceManagers.containsKey(functionNamespaceManagerName), "FunctionNamespaceManager %s is already registered", functionNamespaceManagerName);
functionNamespaceManagers.put(functionNamespaceManagerName, functionNamespaceManager);
checkArgument(!functionNamespaceManagers.containsKey(functionNamespaceManagerId), "FunctionNamespaceManager %s is already registered", functionNamespaceManagerId);
functionNamespaceManagers.put(functionNamespaceManagerId, functionNamespaceManager);
}

@Override
public FunctionNamespaceTransactionHandle getFunctionNamespaceTransaction(TransactionId transactionId, String functionNamespaceManagerName)
public FunctionNamespaceTransactionHandle getFunctionNamespaceTransaction(TransactionId transactionId, String functionNamespaceManagerId)
{
return getTransactionMetadata(transactionId).getFunctionNamespaceTransaction(functionNamespaceManagerName).getTransactionHandle();
return getTransactionMetadata(transactionId).getFunctionNamespaceTransaction(functionNamespaceManagerId).getTransactionHandle();
}

private void checkConnectorWrite(TransactionId transactionId, ConnectorId connectorId)
Expand Down Expand Up @@ -472,14 +472,14 @@ private synchronized CatalogMetadata getTransactionCatalogMetadata(ConnectorId c
return catalogMetadata;
}

private synchronized FunctionNamespaceTransactionMetadata getFunctionNamespaceTransaction(String functionNamespaceManagerName)
private synchronized FunctionNamespaceTransactionMetadata getFunctionNamespaceTransaction(String functionNamespaceManagerId)
{
checkOpenTransaction();

return functionNamespaceTransactions.computeIfAbsent(
functionNamespaceManagerName, name -> {
verify(name != null, "Unknown function namespace manager: %s", name);
FunctionNamespaceManager<?> functionNamespaceManager = functionNamespaceManagers.get(name);
functionNamespaceManagerId, id -> {
verify(id != null, "Unknown function namespace manager: %s", id);
FunctionNamespaceManager<?> functionNamespaceManager = functionNamespaceManagers.get(id);
FunctionNamespaceTransactionHandle transactionHandle = functionNamespaceManager.beginTransaction();
return new FunctionNamespaceTransactionMetadata(functionNamespaceManager, transactionHandle);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
@Experimental
public interface FunctionNamespaceManager<F extends SqlFunction>
{
String getName();

/**
* Start a transaction.
*/
Expand Down