diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java index 579142b2ed..51722a2236 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java @@ -192,6 +192,13 @@ public class AmoroManagementConf { .defaultValue(100) .withDescription("Filters will not be used beyond that number of partitions."); + public static final ConfigOption REFRESH_TABLES_MAX_INTERVAL = + ConfigOptions.key("refresh-tables.max-interval") + .durationType() + .defaultValue(Duration.ofHours(1)) + .withDescription( + "Maximum interval for refreshing table metadata. (Used as the fallback interval when enabling refreshes triggered by external events)."); + public static final ConfigOption BLOCKER_TIMEOUT = ConfigOptions.key("blocker.timeout") .durationType() diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 18cdf86667..c412eff2ff 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -286,6 +286,7 @@ private void initHttpService() { new DashboardServer( serviceConfig, catalogManager, tableManager, optimizerManager, terminalManager); RestCatalogService restCatalogService = new RestCatalogService(catalogManager, tableManager); + ExternalEventService externalEventService = new ExternalEventService(); httpServer = Javalin.create( @@ -304,6 +305,7 @@ private void initHttpService() { () -> { dashboardServer.endpoints().addEndpoints(); restCatalogService.endpoints().addEndpoints(); + externalEventService.endpoints().addEndpoints(); }); httpServer.before( @@ -320,6 +322,8 @@ private void initHttpService() { (e, ctx) -> { if (restCatalogService.needHandleException(ctx)) { restCatalogService.handleException(e, ctx); + } else if (externalEventService.needHandleException(ctx)) { + externalEventService.handleException(e, ctx); } else { dashboardServer.handleException(e, ctx); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/ExternalEventService.java b/amoro-ams/src/main/java/org/apache/amoro/server/ExternalEventService.java new file mode 100644 index 0000000000..67d1c057de --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/ExternalEventService.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server; + +import static io.javalin.apibuilder.ApiBuilder.path; +import static io.javalin.apibuilder.ApiBuilder.post; + +import io.javalin.apibuilder.EndpointGroup; +import io.javalin.http.Context; +import io.javalin.http.HttpCode; +import org.apache.amoro.exception.ForbiddenException; +import org.apache.amoro.exception.SignatureCheckException; +import org.apache.amoro.server.dashboard.response.ErrorResponse; +import org.apache.amoro.server.dashboard.response.OkResponse; +import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.server.scheduler.inline.InlineTableExecutors; +import org.apache.amoro.table.TableIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExternalEventService extends PersistentBase { + private static final Logger LOG = LoggerFactory.getLogger(ExternalEventService.class); + + public static final String REFRESH_REST_API_PREFIX = "/api/ams/v1/refresh/"; + + public ExternalEventService() {} + + public EndpointGroup endpoints() { + return () -> { + // for refresh rest api + path( + REFRESH_REST_API_PREFIX, + () -> { + post( + "/catalog/{catalog}/database/{database}/table/{table}", + this::receiveExternalTableRefreshEvent); + }); + }; + } + + public boolean needHandleException(Context ctx) { + return ctx.req.getRequestURI().startsWith(REFRESH_REST_API_PREFIX); + } + + public void handleException(Exception e, Context ctx) { + if (e instanceof ForbiddenException) { + ctx.json(new ErrorResponse(HttpCode.FORBIDDEN, "Please check authentication", "")); + + } else if (e instanceof SignatureCheckException) { + ctx.json(new ErrorResponse(HttpCode.FORBIDDEN, "Signature check failed", "")); + } else { + ctx.json(new ErrorResponse(HttpCode.INTERNAL_SERVER_ERROR, e.getMessage(), "")); + } + LOG.error("Error when handle refresh event", e); + } + + /** POST /api/ams/refresh/catalog/{catalog}/database/{database}/table/{table} */ + public void receiveExternalTableRefreshEvent(Context ctx) { + String catalog = ctx.pathParam("catalog").trim().replaceAll("^\"|\"$", ""); + String database = ctx.pathParam("database").trim().replaceAll("^\"|\"$", ""); + String table = ctx.pathParam("table").trim().replaceAll("^\"|\"$", ""); + + TableIdentifier tableIdentifier = TableIdentifier.of(catalog, database, table); + boolean result = + InlineTableExecutors.getInstance() + .getTableRefreshingExecutor() + .addTableToRefresh(tableIdentifier); + if (result) { + ctx.json(OkResponse.of("Table added to wait for refreshing")); + } + ctx.json(ErrorResponse.of("Table not managed by event trigger")); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java index f24f9c3330..9f35511449 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java @@ -36,6 +36,7 @@ import org.apache.amoro.exception.ForbiddenException; import org.apache.amoro.exception.SignatureCheckException; import org.apache.amoro.server.AmoroManagementConf; +import org.apache.amoro.server.ExternalEventService; import org.apache.amoro.server.RestCatalogService; import org.apache.amoro.server.authentication.HttpAuthenticationFactory; import org.apache.amoro.server.catalog.CatalogManager; @@ -467,7 +468,8 @@ public void handleException(Exception e, Context ctx) { "/swagger-docs", "/api/ams/v1/api/token/calculate/signature", "/api/ams/v1/api/token/calculate/encryptString", - RestCatalogService.ICEBERG_REST_API_PREFIX + "/*" + RestCatalogService.ICEBERG_REST_API_PREFIX + "/*", + ExternalEventService.REFRESH_REST_API_PREFIX + "/*" }; private static boolean inWhiteList(String uri) { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java index d51b02713a..b2c13bfe6b 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java @@ -78,7 +78,8 @@ public void setup(TableService tableService, Configurations conf) { tableService, conf.getInteger(AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT), conf.get(AmoroManagementConf.REFRESH_TABLES_INTERVAL).toMillis(), - conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS)); + conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS), + conf.get(AmoroManagementConf.REFRESH_TABLES_MAX_INTERVAL).toMillis()); if (conf.getBoolean(AmoroManagementConf.AUTO_CREATE_TAGS_ENABLED)) { this.tagsAutoCreatingExecutor = new TagsAutoCreatingExecutor( diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java index 18071a2cdd..6d0f53937a 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java @@ -19,6 +19,7 @@ package org.apache.amoro.server.scheduler.inline; import org.apache.amoro.AmoroTable; +import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableRuntime; import org.apache.amoro.config.OptimizingConfig; import org.apache.amoro.config.TableConfiguration; @@ -33,21 +34,61 @@ import org.apache.amoro.server.utils.IcebergTableUtil; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.table.MixedTable; +import org.apache.amoro.table.TableIdentifier; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; /** Executor that refreshes table runtimes and evaluates optimizing status periodically. */ public class TableRuntimeRefreshExecutor extends PeriodicTableScheduler { // 1 minutes private final long interval; + // 1 hour + private final long maxInterval; private final int maxPendingPartitions; + // Tables configured to be triggered by events + protected final Map managedEventTriggerTables = + new ConcurrentHashMap<>(); + // Tables to be refreshed in the next execution schedule + protected final Set pendingRefreshTables = + Collections.synchronizedSet(new HashSet<>()); public TableRuntimeRefreshExecutor( - TableService tableService, int poolSize, long interval, int maxPendingPartitions) { + TableService tableService, + int poolSize, + long interval, + int maxPendingPartitions, + long maxInterval) { super(tableService, poolSize); this.interval = interval; + this.maxInterval = maxInterval; this.maxPendingPartitions = maxPendingPartitions; } + @Override + protected void initHandler(List tableRuntimeList) { + tableRuntimeList.stream() + .filter(this::enabled) + .filter( + tableRuntime -> + tableRuntime + .getTableConfiguration() + .getOptimizingConfig() + .isEventTriggeredRefresh()) + .forEach( + tableRuntime -> { + managedEventTriggerTables.put( + tableRuntime.getTableIdentifier().getIdentifier(), + tableRuntime.getTableIdentifier()); + }); + super.initHandler(tableRuntimeList); + } + @Override protected boolean enabled(TableRuntime tableRuntime) { return tableRuntime instanceof DefaultTableRuntime; @@ -104,6 +145,38 @@ public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration or optimizingProcess.close(false); } } + // Add or remove managed event trigger table when the configuration changes + if (defaultTableRuntime + .getTableConfiguration() + .getOptimizingConfig() + .isEventTriggeredRefresh()) { + addManagedEventTriggerTable(defaultTableRuntime); + } else { + removeManagedEventTriggerTable(defaultTableRuntime); + } + } + + @Override + public void handleTableAdded(AmoroTable table, TableRuntime tableRuntime) { + Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime); + DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) tableRuntime; + if (tableRuntime.getTableConfiguration().getOptimizingConfig().isEventTriggeredRefresh()) { + addManagedEventTriggerTable(defaultTableRuntime); + } + super.handleTableAdded(table, tableRuntime); + } + + @Override + public void handleTableRemoved(TableRuntime tableRuntime) { + Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime); + DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) tableRuntime; + if (defaultTableRuntime + .getTableConfiguration() + .getOptimizingConfig() + .isEventTriggeredRefresh()) { + removeManagedEventTriggerTable(defaultTableRuntime); + } + super.handleTableRemoved(tableRuntime); } @Override @@ -117,6 +190,18 @@ public void execute(TableRuntime tableRuntime) { Preconditions.checkArgument(tableRuntime instanceof DefaultTableRuntime); DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) tableRuntime; + if (defaultTableRuntime.getOptimizingConfig().isEventTriggeredRefresh()) { + if (!reachMaxInterval(defaultTableRuntime) + && !pendingRefreshTables.contains(defaultTableRuntime.getTableIdentifier())) { + // If the table refresh is configured refreshing by event but has not been triggered, or + // the interval between the + // last refresh has not reached the maximum interval, skip refreshing + return; + } + } + // continue the following table refresh process and remove it from the pending refresh tables + removeTableToRefresh(defaultTableRuntime.getTableIdentifier()); + long lastOptimizedSnapshotId = defaultTableRuntime.getLastOptimizedSnapshotId(); long lastOptimizedChangeSnapshotId = defaultTableRuntime.getLastOptimizedChangeSnapshotId(); AmoroTable table = loadTable(tableRuntime); @@ -134,4 +219,40 @@ public void execute(TableRuntime tableRuntime) { logger.error("Refreshing table {} failed.", tableRuntime.getTableIdentifier(), throwable); } } + + private boolean reachMaxInterval(DefaultTableRuntime tableRuntime) { + long currentTime = System.currentTimeMillis(); + long lastRefreshTime = tableRuntime.getLastRefreshTime(); + return currentTime - lastRefreshTime >= maxInterval; + } + + private void addManagedEventTriggerTable(DefaultTableRuntime tableRuntime) { + managedEventTriggerTables.put( + tableRuntime.getTableIdentifier().getIdentifier(), tableRuntime.getTableIdentifier()); + } + + private void removeManagedEventTriggerTable(DefaultTableRuntime tableRuntime) { + managedEventTriggerTables.remove(tableRuntime.getTableIdentifier().getIdentifier()); + removeTableToRefresh(tableRuntime.getTableIdentifier()); + } + + public boolean addTableToRefresh(TableIdentifier tableIdentifier) { + if (!managedEventTriggerTables.containsKey(tableIdentifier)) { + logger.warn( + "Table {} is not managed by event trigger, cannot add to refresh list.", tableIdentifier); + return false; + } + pendingRefreshTables.add(managedEventTriggerTables.get(tableIdentifier)); + return true; + } + + public void addTableToRefresh(ServerTableIdentifier serverTableIdentifier) { + this.pendingRefreshTables.add(serverTableIdentifier); + logger.debug("Add table {} to refresh pending list.", serverTableIdentifier); + } + + public void removeTableToRefresh(ServerTableIdentifier serverTableIdentifier) { + this.pendingRefreshTables.remove(serverTableIdentifier); + logger.debug("Remove table {} from refresh pending list.", serverTableIdentifier); + } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java index c1506218d2..0ad4bccc3d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java @@ -100,6 +100,7 @@ public class DefaultTableRuntime extends AbstractTableRuntime private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics; private final TableSummaryMetrics tableSummaryMetrics; private volatile long lastPlanTime; + private volatile long lastRefreshTime; private volatile OptimizingProcess optimizingProcess; private final List taskQuotas = new CopyOnWriteArrayList<>(); @@ -181,6 +182,14 @@ public void setLastPlanTime(long lastPlanTime) { this.lastPlanTime = lastPlanTime; } + public long getLastRefreshTime() { + return lastRefreshTime; + } + + public void setLastRefreshTime(long lastRefreshTime) { + this.lastRefreshTime = lastRefreshTime; + } + public OptimizingStatus getOptimizingStatus() { return OptimizingStatus.ofCode(getStatusCode()); } @@ -334,6 +343,7 @@ public DefaultTableRuntime refresh(AmoroTable table) { return s; }) .commit(); + setLastRefreshTime(System.currentTimeMillis()); return this; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java index c891a24f4d..3eb70614b8 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java @@ -341,7 +341,12 @@ public static OptimizingConfig parseOptimizingConfig(Map propert PropertyUtil.propertyAsLong( properties, TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE, - TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT)); + TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT)) + .setEventTriggeredRefresh( + PropertyUtil.propertyAsBoolean( + properties, + TableProperties.SELF_OPTIMIZING_REFRESH_EVENT_TRIGGERED, + TableProperties.SELF_OPTIMIZING_REFRESH_EVENT_TRIGGERED_DEFAULT)); } /** diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java index f9fa8ce94d..abf24a864e 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java @@ -478,7 +478,7 @@ protected void reboot() throws InterruptedException { private class TableRuntimeRefresher extends TableRuntimeRefreshExecutor { public TableRuntimeRefresher() { - super(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE); + super(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE, Long.MAX_VALUE); } void refreshPending() { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewManager.java b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewManager.java index 2b40652c2d..62291f344d 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewManager.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewManager.java @@ -108,7 +108,8 @@ private void appendData(UnkeyedTable table, int id) { void refreshPending() { TableRuntimeRefreshExecutor refresher = - new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE); + new TableRuntimeRefreshExecutor( + tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE, Long.MAX_VALUE); refresher.execute(getDefaultTableRuntime(serverTableIdentifier().getId())); refresher.dispose(); } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java new file mode 100644 index 0000000000..035993e594 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.scheduler.inline; + +import org.apache.amoro.AmoroTable; +import org.apache.amoro.BasicTableTestHelper; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.TableTestHelper; +import org.apache.amoro.TestedCatalogs; +import org.apache.amoro.catalog.CatalogTestHelper; +import org.apache.amoro.hive.catalog.HiveCatalogTestHelper; +import org.apache.amoro.hive.catalog.HiveTableTestHelper; +import org.apache.amoro.server.table.AMSTableTestBase; +import org.apache.amoro.server.table.DefaultTableRuntime; +import org.apache.amoro.server.table.TableConfigurations; +import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collections; +import java.util.Map; + +@RunWith(Parameterized.class) +public class TestTableRuntimeRefreshExecutor extends AMSTableTestBase { + + @Parameterized.Parameters(name = "{0}, {1}") + public static Object[] parameters() { + return new Object[][] { + { + TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG), + new BasicTableTestHelper(true, true) + }, + { + new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()), + new HiveTableTestHelper(true, true) + }, + { + new HiveCatalogTestHelper(TableFormat.MIXED_HIVE, TEST_HMS.getHiveConf()), + new HiveTableTestHelper(true, true) + } + }; + } + + private static final Map eventTriggerTableProperties = + ImmutableMap.builder() + .put("self-optimizing.refresh.event-triggered", "true") + .build(); + + public TestTableRuntimeRefreshExecutor( + CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) { + super(catalogTestHelper, tableTestHelper); + } + + @Test + public void testTableRuntimeRefreshByDefault() throws InterruptedException { + TableRuntimeRefresher executor = new TableRuntimeRefresher(); + tableTestHelper().tableProperties().clear(); + createDatabase(); + createTable(); + + TableRuntime tableRuntime = tableService().getRuntime(serverTableIdentifier().getId()); + + // Test add table + Assertions.assertEquals(0, executor.managedEventTriggerTables.size()); + executor.handleTableAdded(tableService().loadTable(serverTableIdentifier()), tableRuntime); + Assertions.assertEquals(0, executor.managedEventTriggerTables.size()); + + // After the table added, execute() will be scheduled after 10s + Assertions.assertEquals(0, executor.pendingRefreshTables.size()); + Thread.sleep(15000L); + // Verify that execute() is called and table is loaded + Assertions.assertTrue(executor.hasExecute); + Assertions.assertTrue(executor.hasLoadTable); + + dropTable(); + dropDatabase(); + } + + @Test + public void testTableRuntimeRefreshWithEventTrigger() throws InterruptedException { + TableRuntimeRefresher executor = new TableRuntimeRefresher(3600000L); + tableTestHelper().tableProperties().putAll(eventTriggerTableProperties); + createDatabase(); + createTable(); + + TableRuntime tableRuntime = tableService().getRuntime(serverTableIdentifier().getId()); + ((DefaultTableRuntime) tableRuntime) + .setLastRefreshTime( + System + .currentTimeMillis()); // Set the last plan time to prevent table refresh triggered + // by fallback interval + + // Test add table + Assertions.assertEquals(0, executor.managedEventTriggerTables.size()); + executor.handleTableAdded(tableService().loadTable(serverTableIdentifier()), tableRuntime); + Assertions.assertEquals(1, executor.managedEventTriggerTables.size()); + + // After the table added, execute() will be scheduled after 10s + Assertions.assertEquals(0, executor.pendingRefreshTables.size()); + Thread.sleep(15000L); + // Verify that execute() is called but table is not loaded due to not triggered + Assertions.assertTrue(executor.hasExecute); + Assertions.assertFalse(executor.hasLoadTable); + + // Test when the table is triggered to refresh + Assertions.assertEquals(0, executor.pendingRefreshTables.size()); + executor.addTableToRefresh(serverTableIdentifier().getIdentifier()); + Assertions.assertEquals(1, executor.pendingRefreshTables.size()); + executor.hasExecute = false; + executor.hasLoadTable = false; + + executor.execute(tableRuntime); + Assertions.assertTrue(executor.hasExecute); + Assertions.assertTrue(executor.hasLoadTable); + Assertions.assertEquals(0, executor.pendingRefreshTables.size()); + + // Test when the table is triggered to refresh due to fallback interval has been reached + executor.hasExecute = false; + executor.hasLoadTable = false; + executor.execute(tableRuntime); + Assertions.assertTrue(executor.hasExecute); + Assertions.assertFalse(executor.hasLoadTable); + + ((DefaultTableRuntime) tableRuntime) + .setLastRefreshTime( + System.currentTimeMillis() + - 5000000); // Set the last plan time to make table refresh triggered by fallback + // interval + executor.execute(tableRuntime); + Assertions.assertTrue(executor.hasLoadTable); + + // Test when a table changes from event-triggered to non-event-triggered configuration + // This should remove the table from managedEventTriggerTables and pendingRefreshTables + executor.addTableToRefresh(serverTableIdentifier().getIdentifier()); + Assertions.assertEquals(1, executor.managedEventTriggerTables.size()); + Assertions.assertEquals(1, executor.pendingRefreshTables.size()); + + ((DefaultTableRuntime) tableRuntime).store().getTableConfig().clear(); + executor.handleConfigChanged( + tableRuntime, TableConfigurations.parseTableConfig(eventTriggerTableProperties)); + Assertions.assertEquals(0, executor.managedEventTriggerTables.size()); + Assertions.assertEquals(0, executor.pendingRefreshTables.size()); + + // Test when a table changes from non-event-triggered to event-triggered configuration + // This should remove the table from managedEventTriggerTables + ((DefaultTableRuntime) tableRuntime) + .store() + .getTableConfig() + .putAll(eventTriggerTableProperties); + Assertions.assertEquals(0, executor.managedEventTriggerTables.size()); + executor.handleConfigChanged( + tableRuntime, TableConfigurations.parseTableConfig(Collections.emptyMap())); + Assertions.assertEquals(1, executor.managedEventTriggerTables.size()); + Assertions.assertEquals(0, executor.pendingRefreshTables.size()); + + // test remove table triggered by event + Assertions.assertEquals(1, executor.managedEventTriggerTables.size()); + executor.handleTableRemoved(tableRuntime); + Assertions.assertEquals(0, executor.managedEventTriggerTables.size()); + + dropTable(); + dropDatabase(); + } + + private class TableRuntimeRefresher extends TableRuntimeRefreshExecutor { + + public TableRuntimeRefresher() { + super(tableService(), Integer.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE, Long.MAX_VALUE); + } + + public TableRuntimeRefresher(long maxInterval) { + super(tableService(), Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, maxInterval); + } + + private boolean hasLoadTable = false; + private boolean hasExecute = false; + + @Override + public AmoroTable loadTable(TableRuntime tableRuntime) { + hasLoadTable = true; + return super.loadTable(tableRuntime); + } + + @Override + public void execute(TableRuntime tableRuntime) { + hasExecute = true; + super.execute(tableRuntime); + } + } +} diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java index acf3a73cec..b52922b152 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java @@ -142,7 +142,8 @@ private void appendPosDelete(UnkeyedTable table) { void refreshPending() { TableRuntimeRefreshExecutor refresher = - new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE); + new TableRuntimeRefreshExecutor( + tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE, Long.MAX_VALUE); refresher.execute(getDefaultTableRuntime(serverTableIdentifier().getId())); refresher.dispose(); } diff --git a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java index 0c743ac6bd..735ddecf1a 100644 --- a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java +++ b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java @@ -97,6 +97,9 @@ public class OptimizingConfig { // self-optimizing.evaluation.fallback-interval private long evaluationFallbackInterval; + // self-optimizing.refresh.event-triggered + private boolean eventTriggeredRefresh; + public OptimizingConfig() {} public boolean isEnabled() { @@ -318,6 +321,15 @@ public OptimizingConfig setEvaluationMseTolerance(long evaluationMseTolerance) { return this; } + public boolean isEventTriggeredRefresh() { + return eventTriggeredRefresh; + } + + public OptimizingConfig setEventTriggeredRefresh(boolean eventTriggeredRefresh) { + this.eventTriggeredRefresh = eventTriggeredRefresh; + return this; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -350,7 +362,8 @@ public boolean equals(Object o) { && Objects.equal(optimizerGroup, that.optimizerGroup) && Objects.equal(minPlanInterval, that.minPlanInterval) && Objects.equal(evaluationMseTolerance, that.evaluationMseTolerance) - && Objects.equal(evaluationFallbackInterval, that.evaluationFallbackInterval); + && Objects.equal(evaluationFallbackInterval, that.evaluationFallbackInterval) + && eventTriggeredRefresh == that.eventTriggeredRefresh; } @Override @@ -379,7 +392,8 @@ public int hashCode() { hiveRefreshInterval, minPlanInterval, evaluationMseTolerance, - evaluationFallbackInterval); + evaluationFallbackInterval, + eventTriggeredRefresh); } @Override @@ -407,6 +421,7 @@ public String toString() { .add("hiveRefreshInterval", hiveRefreshInterval) .add("evaluationMseTolerance", evaluationMseTolerance) .add("evaluationFallbackInterval", evaluationFallbackInterval) + .add("externalEventTriggered", eventTriggeredRefresh) .toString(); } } diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java index 0d804cdec7..a8ccd0781f 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java @@ -146,6 +146,12 @@ private TableProperties() {} // for file size evaluation public static final long SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT = 0; + /** event-triggered refresh related properties */ + public static final String SELF_OPTIMIZING_REFRESH_EVENT_TRIGGERED = + "self-optimizing.refresh.event-triggered"; // enable table refresh triggered by events + + public static final boolean SELF_OPTIMIZING_REFRESH_EVENT_TRIGGERED_DEFAULT = false; + /** table clean related properties */ public static final String ENABLE_TABLE_EXPIRE = "table-expire.enabled"; diff --git a/dist/src/main/amoro-bin/conf/config.yaml b/dist/src/main/amoro-bin/conf/config.yaml index 94ef3cf781..a2b21afb2a 100644 --- a/dist/src/main/amoro-bin/conf/config.yaml +++ b/dist/src/main/amoro-bin/conf/config.yaml @@ -45,6 +45,7 @@ ams: thread-count: 10 interval: 1min # 60000 max-pending-partition-count: 100 # default 100 + max-interval: 1h self-optimizing: commit-thread-count: 10 diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md index 8e2e780d60..850f7dbd4b 100644 --- a/docs/configuration/ams-config.md +++ b/docs/configuration/ams-config.md @@ -96,6 +96,7 @@ table td:last-child, table th:last-child { width: 40%; word-break: break-all; } | refresh-external-catalogs.queue-size | 1000000 | The queue size of the executors of the external catalog explorer. | | refresh-external-catalogs.thread-count | 10 | The number of threads used for discovering tables in external catalogs. | | refresh-tables.interval | 1 min | Interval for refreshing table metadata. | +| refresh-tables.max-interval | 1 h | Maximum interval for refreshing table metadata. (Used as the fallback interval when enabling refreshes triggered by external events). | | refresh-tables.max-pending-partition-count | 100 | Filters will not be used beyond that number of partitions. | | refresh-tables.thread-count | 10 | The number of threads used for refreshing tables. | | self-optimizing.break-quota-limit-enabled | true | Allow the table to break the quota limit when the resource is sufficient. |