Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ public class AmoroManagementConf {
.defaultValue(100)
.withDescription("Filters will not be used beyond that number of partitions.");

public static final ConfigOption<Duration> REFRESH_TABLES_MAX_INTERVAL =
ConfigOptions.key("refresh-tables.max-interval")
.durationType()
.defaultValue(Duration.ofHours(1))
.withDescription(
"Maximum interval for refreshing table metadata. (Used as the fallback interval when enabling refreshes triggered by external events).");

public static final ConfigOption<Duration> BLOCKER_TIMEOUT =
ConfigOptions.key("blocker.timeout")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ private void initHttpService() {
new DashboardServer(
serviceConfig, catalogManager, tableManager, optimizerManager, terminalManager);
RestCatalogService restCatalogService = new RestCatalogService(catalogManager, tableManager);
ExternalEventService externalEventService = new ExternalEventService();

httpServer =
Javalin.create(
Expand All @@ -304,6 +305,7 @@ private void initHttpService() {
() -> {
dashboardServer.endpoints().addEndpoints();
restCatalogService.endpoints().addEndpoints();
externalEventService.endpoints().addEndpoints();
});

httpServer.before(
Expand All @@ -320,6 +322,8 @@ private void initHttpService() {
(e, ctx) -> {
if (restCatalogService.needHandleException(ctx)) {
restCatalogService.handleException(e, ctx);
} else if (externalEventService.needHandleException(ctx)) {
externalEventService.handleException(e, ctx);
} else {
dashboardServer.handleException(e, ctx);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server;

import static io.javalin.apibuilder.ApiBuilder.path;
import static io.javalin.apibuilder.ApiBuilder.post;

import io.javalin.apibuilder.EndpointGroup;
import io.javalin.http.Context;
import io.javalin.http.HttpCode;
import org.apache.amoro.exception.ForbiddenException;
import org.apache.amoro.exception.SignatureCheckException;
import org.apache.amoro.server.dashboard.response.ErrorResponse;
import org.apache.amoro.server.dashboard.response.OkResponse;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.scheduler.inline.InlineTableExecutors;
import org.apache.amoro.table.TableIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExternalEventService extends PersistentBase {
private static final Logger LOG = LoggerFactory.getLogger(ExternalEventService.class);

public static final String REFRESH_REST_API_PREFIX = "/api/ams/v1/refresh/";

public ExternalEventService() {}

public EndpointGroup endpoints() {
return () -> {
// for refresh rest api
path(
REFRESH_REST_API_PREFIX,
() -> {
post(
"/catalog/{catalog}/database/{database}/table/{table}",
this::receiveExternalTableRefreshEvent);
});
};
}

public boolean needHandleException(Context ctx) {
return ctx.req.getRequestURI().startsWith(REFRESH_REST_API_PREFIX);
}

public void handleException(Exception e, Context ctx) {
if (e instanceof ForbiddenException) {
ctx.json(new ErrorResponse(HttpCode.FORBIDDEN, "Please check authentication", ""));

} else if (e instanceof SignatureCheckException) {
ctx.json(new ErrorResponse(HttpCode.FORBIDDEN, "Signature check failed", ""));
} else {
ctx.json(new ErrorResponse(HttpCode.INTERNAL_SERVER_ERROR, e.getMessage(), ""));
}
LOG.error("Error when handle refresh event", e);
}

/** POST /api/ams/refresh/catalog/{catalog}/database/{database}/table/{table} */
public void receiveExternalTableRefreshEvent(Context ctx) {
String catalog = ctx.pathParam("catalog").trim().replaceAll("^\"|\"$", "");
String database = ctx.pathParam("database").trim().replaceAll("^\"|\"$", "");
String table = ctx.pathParam("table").trim().replaceAll("^\"|\"$", "");

TableIdentifier tableIdentifier = TableIdentifier.of(catalog, database, table);
boolean result =
InlineTableExecutors.getInstance()
.getTableRefreshingExecutor()
.addTableToRefresh(tableIdentifier);
if (result) {
ctx.json(OkResponse.of("Table added to wait for refreshing"));
}
ctx.json(ErrorResponse.of("Table not managed by event trigger"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.amoro.exception.ForbiddenException;
import org.apache.amoro.exception.SignatureCheckException;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.ExternalEventService;
import org.apache.amoro.server.RestCatalogService;
import org.apache.amoro.server.authentication.HttpAuthenticationFactory;
import org.apache.amoro.server.catalog.CatalogManager;
Expand Down Expand Up @@ -467,7 +468,8 @@ public void handleException(Exception e, Context ctx) {
"/swagger-docs",
"/api/ams/v1/api/token/calculate/signature",
"/api/ams/v1/api/token/calculate/encryptString",
RestCatalogService.ICEBERG_REST_API_PREFIX + "/*"
RestCatalogService.ICEBERG_REST_API_PREFIX + "/*",
ExternalEventService.REFRESH_REST_API_PREFIX + "/*"
};

private static boolean inWhiteList(String uri) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public void setup(TableService tableService, Configurations conf) {
tableService,
conf.getInteger(AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT),
conf.get(AmoroManagementConf.REFRESH_TABLES_INTERVAL).toMillis(),
conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS));
conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS),
conf.get(AmoroManagementConf.REFRESH_TABLES_MAX_INTERVAL).toMillis());
if (conf.getBoolean(AmoroManagementConf.AUTO_CREATE_TAGS_ENABLED)) {
this.tagsAutoCreatingExecutor =
new TagsAutoCreatingExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.amoro.server.scheduler.inline;

import org.apache.amoro.AmoroTable;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.config.TableConfiguration;
Expand All @@ -33,21 +34,61 @@
import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.TableIdentifier;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/** Executor that refreshes table runtimes and evaluates optimizing status periodically. */
public class TableRuntimeRefreshExecutor extends PeriodicTableScheduler {

// 1 minutes
private final long interval;
// 1 hour
private final long maxInterval;
private final int maxPendingPartitions;
// Tables configured to be triggered by events
protected final Map<TableIdentifier, ServerTableIdentifier> managedEventTriggerTables =
new ConcurrentHashMap<>();
// Tables to be refreshed in the next execution schedule
protected final Set<ServerTableIdentifier> pendingRefreshTables =
Collections.synchronizedSet(new HashSet<>());

public TableRuntimeRefreshExecutor(
TableService tableService, int poolSize, long interval, int maxPendingPartitions) {
TableService tableService,
int poolSize,
long interval,
int maxPendingPartitions,
long maxInterval) {
super(tableService, poolSize);
this.interval = interval;
this.maxInterval = maxInterval;
this.maxPendingPartitions = maxPendingPartitions;
}

@Override
protected void initHandler(List<TableRuntime> tableRuntimeList) {
tableRuntimeList.stream()
.filter(this::enabled)
.filter(
tableRuntime ->
tableRuntime
.getTableConfiguration()
.getOptimizingConfig()
.isEventTriggeredRefresh())
.forEach(
tableRuntime -> {
managedEventTriggerTables.put(
tableRuntime.getTableIdentifier().getIdentifier(),
tableRuntime.getTableIdentifier());
});
super.initHandler(tableRuntimeList);
}

@Override
protected boolean enabled(TableRuntime tableRuntime) {
return tableRuntime instanceof DefaultTableRuntime;
Expand Down Expand Up @@ -104,6 +145,38 @@ public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration or
optimizingProcess.close(false);
}
}
// Add or remove managed event trigger table when the configuration changes
if (defaultTableRuntime
.getTableConfiguration()
.getOptimizingConfig()
.isEventTriggeredRefresh()) {
addManagedEventTriggerTable(defaultTableRuntime);
} else {
removeManagedEventTriggerTable(defaultTableRuntime);
}
}

@Override
public void handleTableAdded(AmoroTable<?> table, TableRuntime tableRuntime) {
Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) tableRuntime;
if (tableRuntime.getTableConfiguration().getOptimizingConfig().isEventTriggeredRefresh()) {
addManagedEventTriggerTable(defaultTableRuntime);
}
super.handleTableAdded(table, tableRuntime);
}

@Override
public void handleTableRemoved(TableRuntime tableRuntime) {
Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) tableRuntime;
if (defaultTableRuntime
.getTableConfiguration()
.getOptimizingConfig()
.isEventTriggeredRefresh()) {
removeManagedEventTriggerTable(defaultTableRuntime);
}
super.handleTableRemoved(tableRuntime);
}

@Override
Expand All @@ -117,6 +190,18 @@ public void execute(TableRuntime tableRuntime) {
Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime);
DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) tableRuntime;

if (defaultTableRuntime.getOptimizingConfig().isEventTriggeredRefresh()) {
if (!reachMaxInterval(defaultTableRuntime)
&& !pendingRefreshTables.contains(defaultTableRuntime.getTableIdentifier())) {
// If the table refresh is configured refreshing by event but has not been triggered, or
// the interval between the
// last refresh has not reached the maximum interval, skip refreshing
return;
}
}
// continue the following table refresh process and remove it from the pending refresh tables
removeTableToRefresh(defaultTableRuntime.getTableIdentifier());

long lastOptimizedSnapshotId = defaultTableRuntime.getLastOptimizedSnapshotId();
long lastOptimizedChangeSnapshotId = defaultTableRuntime.getLastOptimizedChangeSnapshotId();
AmoroTable<?> table = loadTable(tableRuntime);
Expand All @@ -134,4 +219,40 @@ public void execute(TableRuntime tableRuntime) {
logger.error("Refreshing table {} failed.", tableRuntime.getTableIdentifier(), throwable);
}
}

private boolean reachMaxInterval(DefaultTableRuntime tableRuntime) {
long currentTime = System.currentTimeMillis();
long lastRefreshTime = tableRuntime.getLastRefreshTime();
return currentTime - lastRefreshTime >= maxInterval;
}

private void addManagedEventTriggerTable(DefaultTableRuntime tableRuntime) {
managedEventTriggerTables.put(
tableRuntime.getTableIdentifier().getIdentifier(), tableRuntime.getTableIdentifier());
}

private void removeManagedEventTriggerTable(DefaultTableRuntime tableRuntime) {
managedEventTriggerTables.remove(tableRuntime.getTableIdentifier().getIdentifier());
removeTableToRefresh(tableRuntime.getTableIdentifier());
}

public boolean addTableToRefresh(TableIdentifier tableIdentifier) {
if (!managedEventTriggerTables.containsKey(tableIdentifier)) {
logger.warn(
"Table {} is not managed by event trigger, cannot add to refresh list.", tableIdentifier);
return false;
}
pendingRefreshTables.add(managedEventTriggerTables.get(tableIdentifier));
return true;
}

public void addTableToRefresh(ServerTableIdentifier serverTableIdentifier) {
this.pendingRefreshTables.add(serverTableIdentifier);
logger.debug("Add table {} to refresh pending list.", serverTableIdentifier);
}

public void removeTableToRefresh(ServerTableIdentifier serverTableIdentifier) {
this.pendingRefreshTables.remove(serverTableIdentifier);
logger.debug("Remove table {} from refresh pending list.", serverTableIdentifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public class DefaultTableRuntime extends AbstractTableRuntime
private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics;
private final TableSummaryMetrics tableSummaryMetrics;
private volatile long lastPlanTime;
private volatile long lastRefreshTime;
private volatile OptimizingProcess optimizingProcess;
private final List<TaskRuntime.TaskQuota> taskQuotas = new CopyOnWriteArrayList<>();

Expand Down Expand Up @@ -181,6 +182,14 @@ public void setLastPlanTime(long lastPlanTime) {
this.lastPlanTime = lastPlanTime;
}

public long getLastRefreshTime() {
return lastRefreshTime;
}

public void setLastRefreshTime(long lastRefreshTime) {
this.lastRefreshTime = lastRefreshTime;
}

public OptimizingStatus getOptimizingStatus() {
return OptimizingStatus.ofCode(getStatusCode());
}
Expand Down Expand Up @@ -334,6 +343,7 @@ public DefaultTableRuntime refresh(AmoroTable<?> table) {
return s;
})
.commit();
setLastRefreshTime(System.currentTimeMillis());
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,12 @@ public static OptimizingConfig parseOptimizingConfig(Map<String, String> propert
PropertyUtil.propertyAsLong(
properties,
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE,
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT));
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT))
.setEventTriggeredRefresh(
PropertyUtil.propertyAsBoolean(
properties,
TableProperties.SELF_OPTIMIZING_REFRESH_EVENT_TRIGGERED,
TableProperties.SELF_OPTIMIZING_REFRESH_EVENT_TRIGGERED_DEFAULT));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ protected void reboot() throws InterruptedException {
private class TableRuntimeRefresher extends TableRuntimeRefreshExecutor {

public TableRuntimeRefresher() {
super(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE);
super(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE, Long.MAX_VALUE);
}

void refreshPending() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ private void appendData(UnkeyedTable table, int id) {

void refreshPending() {
TableRuntimeRefreshExecutor refresher =
new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE);
new TableRuntimeRefreshExecutor(
tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE, Long.MAX_VALUE);
refresher.execute(getDefaultTableRuntime(serverTableIdentifier().getId()));
refresher.dispose();
}
Expand Down
Loading