diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java index 4b93a8d6269..1fc216f3721 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java @@ -18,6 +18,8 @@ package org.apache.seatunnel.engine.common; public class Constant { + private Constant() {} + public static final String SEATUNNEL_SERVICE_NAME = "st:impl:seaTunnelServer"; public static final String SEATUNNEL_ID_GENERATOR_NAME = "SeaTunnelIdGenerator"; diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java index 79a4d0ace0d..6003971df08 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig; import org.apache.seatunnel.engine.common.config.server.CoordinatorServiceConfig; import org.apache.seatunnel.engine.common.config.server.HttpConfig; +import org.apache.seatunnel.engine.common.config.server.MapStoreConfig; import org.apache.seatunnel.engine.common.config.server.QueueType; import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy; import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions; @@ -99,6 +100,9 @@ public class EngineConfig { private HttpConfig httpConfig = ServerConfigOptions.MasterServerConfigOptions.HTTP.defaultValue(); + private MapStoreConfig mapStoreConfig = + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE.defaultValue(); + public void setBackupCount(int newBackupCount) { checkBackupCount(newBackupCount, 0); this.backupCount = newBackupCount; diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java index 2794862a5ac..8dd25d04941 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java @@ -27,6 +27,7 @@ import org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageMode; import org.apache.seatunnel.engine.common.config.server.CoordinatorServiceConfig; import org.apache.seatunnel.engine.common.config.server.HttpConfig; +import org.apache.seatunnel.engine.common.config.server.MapStoreConfig; import org.apache.seatunnel.engine.common.config.server.QueueType; import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy; import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions; @@ -92,6 +93,61 @@ private boolean handleNode(Node node, String name) { return false; } + private MapStoreConfig parseMapStoreConfig(Node mapStoreNode) { + MapStoreConfig mapStoreConfig = new MapStoreConfig(); + for (Node node : childElements(mapStoreNode)) { + String name = cleanNodeName(node); + if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_ENABLED + .key() + .equals(name)) { + mapStoreConfig.setMapStoreEnabled(getBooleanValue(getTextContent(node))); + } else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_TYPE + .key() + .equals(name)) { + mapStoreConfig.setMapStoreType(getTextContent(node)); + } else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_NAMESPACE + .key() + .equals(name)) { + mapStoreConfig.setNamespace(getTextContent(node)); + } else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_CLUSTER_NAME + .key() + .equals(name)) { + mapStoreConfig.setClusterName(getTextContent(node)); + } else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_DEFAULT_FS + .key() + .equals(name)) { + mapStoreConfig.setDefaultFS(getTextContent(node)); + } else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_BLOCK_SIZE + .key() + .equals(name)) { + mapStoreConfig.setBlockSize( + getIntegerValue( + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_BLOCK_SIZE + .key(), + getTextContent(node))); + } else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_BUCKET + .key() + .equals(name)) { + mapStoreConfig.setOssBucket(getTextContent(node)); + } else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_ACCESS_KEY_ID + .key() + .equals(name)) { + mapStoreConfig.setOssAccessKeyId(getTextContent(node)); + } else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_ACCESS_KEY_SECRET + .key() + .equals(name)) { + mapStoreConfig.setOssAccessKeySecret(getTextContent(node)); + } else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_ENDPOINT + .key() + .equals(name)) { + mapStoreConfig.setOssEndpoint(getTextContent(node)); + } else { + LOGGER.warning("Unrecognized element: " + name); + } + } + return mapStoreConfig; + } + private SlotServiceConfig parseSlotServiceConfig(Node slotServiceNode) { SlotServiceConfig slotServiceConfig = new SlotServiceConfig(); for (Node node : childElements(slotServiceNode)) { @@ -259,6 +315,8 @@ private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) { .key() .equals(name)) { engineConfig.setCoordinatorServiceConfig(parseCoordinatorServiceConfig(node)); + } else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE.key().equals(name)) { + engineConfig.setMapStoreConfig((parseMapStoreConfig(node))); } else { LOGGER.warning("Unrecognized element: " + name); } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/MapStoreConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/MapStoreConfig.java new file mode 100644 index 00000000000..9709085e4f3 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/MapStoreConfig.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.common.config.server; + +import lombok.Data; + +import java.io.Serializable; +import java.util.Map; + +import static com.hazelcast.internal.util.Preconditions.checkPositive; + +@Data +public class MapStoreConfig implements Serializable { + private boolean mapStoreEnabled = + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_ENABLED.defaultValue(); + + private String mapStoreType = + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_TYPE.defaultValue(); + private String namespace = + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_NAMESPACE.defaultValue(); + private String clusterName = + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_CLUSTER_NAME.defaultValue(); + + private String defaultFS = + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_DEFAULT_FS.defaultValue(); + private int blockSize = + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_BLOCK_SIZE.defaultValue(); + private String ossBucket = + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_BUCKET.defaultValue(); + private String ossAccessKeyId = + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_ACCESS_KEY_ID + .defaultValue(); + private String ossAccessKeySecret = + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_ACCESS_KEY_SECRET + .defaultValue(); + private String ossEndpoint = + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_ENDPOINT.defaultValue(); + + public void setMapStoreEnabled(boolean mapStoreEnabled) { + this.mapStoreEnabled = mapStoreEnabled; + } + + public void setMapStoreType(String mapStoreType) { + this.mapStoreType = mapStoreType; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public void setDefaultFS(String defaultFS) { + this.defaultFS = defaultFS; + } + + public void setBlockSize(int blockSize) { + checkPositive( + blockSize, + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_BLOCK_SIZE + + " must be > 0"); + this.blockSize = blockSize; + } + + public void setOssBucket(String ossBucket) { + this.ossBucket = ossBucket; + } + + public void setOssAccessKeyId(String ossAccessKeyId) { + this.ossAccessKeyId = ossAccessKeyId; + } + + public void setOssAccessKeySecret(String ossAccessKeySecret) { + this.ossAccessKeySecret = ossAccessKeySecret; + } + + public void setOssEndpoint(String ossEndpoint) { + this.ossEndpoint = ossEndpoint; + } + + public Map toMap() { + Map configMap = new java.util.HashMap<>(); + configMap.put( + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_ENABLED.key(), + mapStoreEnabled); + configMap.put( + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_TYPE.key(), mapStoreType); + configMap.put( + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_NAMESPACE.key(), namespace); + configMap.put( + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_CLUSTER_NAME.key(), + clusterName); + configMap.put( + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_DEFAULT_FS.key(), + defaultFS); + configMap.put( + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_BLOCK_SIZE.key(), + blockSize); + configMap.put( + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_BUCKET.key(), + ossBucket); + configMap.put( + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_ACCESS_KEY_ID.key(), + ossAccessKeyId); + configMap.put( + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_ACCESS_KEY_SECRET.key(), + ossAccessKeySecret); + configMap.put( + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_ENDPOINT.key(), + ossEndpoint); + return configMap; + } +} diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index 631267c7e30..390ee272595 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -97,6 +97,77 @@ public static class MasterServerConfigOptions { .defaultValue(1) .withDescription("Number of partitions for storing job metrics in IMap."); ///////////////////////////////////////////////// + // The options about RocksDB persistence start + public static final Option MAP_STORE_ENABLED = + Options.key("map-store-enabled") + .booleanType() + .defaultValue(false) + .withDescription("Enable external map-store persistence for RocksDB"); + + // common properties + public static final Option MAP_STORE_TYPE = + Options.key("type") + .stringType() + .defaultValue("hdfs") + .withDescription("Type marker for map-store usage (hdfs/s3/oss)"); + + public static final Option MAP_STORE_NAMESPACE = + Options.key("namespace") + .stringType() + .defaultValue("/tmp/seatunnel/imap") + .withDescription("Namespace / base path used by the file map store"); + + public static final Option MAP_STORE_CLUSTER_NAME = + Options.key("clusterName") + .stringType() + .defaultValue("seatunnel-cluster") + .withDescription("Cluster name used by the file map store"); + + // HDFS / local specific + public static final Option MAP_STORE_DEFAULT_FS = + Options.key("fs.defaultFS") + .stringType() + .defaultValue("hdfs://localhost:9000") + .withDescription("fs.defaultFS for HDFS; for local use file:///"); + + // OSS specific + public static final Option MAP_STORE_BLOCK_SIZE = + Options.key("block.size") + .intType() + .defaultValue(134217728) // 128MB + .withDescription("Block size in bytes for OSS/HDFS write (default 128MB)"); + + public static final Option MAP_STORE_OSS_BUCKET = + Options.key("oss.bucket") + .stringType() + .noDefaultValue() + .withDescription("OSS bucket path, e.g., oss://bucket-name/"); + + public static final Option MAP_STORE_OSS_ACCESS_KEY_ID = + Options.key("fs.oss.accessKeyId") + .stringType() + .noDefaultValue() + .withDescription("OSS access key id"); + + public static final Option MAP_STORE_OSS_ACCESS_KEY_SECRET = + Options.key("fs.oss.accessKeySecret") + .stringType() + .noDefaultValue() + .withDescription("OSS access key secret"); + + public static final Option MAP_STORE_OSS_ENDPOINT = + Options.key("fs.oss.endpoint") + .stringType() + .noDefaultValue() + .withDescription("OSS endpoint"); + + public static final Option MAP_STORE = + Options.key("map-store") + .type(new TypeReference() {}) + .defaultValue(new MapStoreConfig()) + .withDescription("The map store configuration."); + // The options about RocksDB persistence end + ///////////////////////////////////////////////// // The options about Hazelcast IMAP store start public static final Option BACKUP_COUNT = Options.key("backup-count") diff --git a/seatunnel-engine/seatunnel-engine-server/pom.xml b/seatunnel-engine/seatunnel-engine-server/pom.xml index d9e1c860c53..d7fdd587b23 100644 --- a/seatunnel-engine/seatunnel-engine-server/pom.xml +++ b/seatunnel-engine/seatunnel-engine-server/pom.xml @@ -58,6 +58,11 @@ ${project.version} optional + + org.rocksdb + rocksdbjni + 9.8.4 + org.apache.commons commons-lang3 diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java index 15b29ee8752..7afc6a2b8fb 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java @@ -30,6 +30,8 @@ import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; import org.apache.seatunnel.engine.server.execution.TaskLocation; import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext; +import org.apache.seatunnel.engine.server.persistence.MapStoreConfigFactory; +import org.apache.seatunnel.engine.server.rocksdb.RocksDBService; import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService; import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService; import org.apache.seatunnel.engine.server.service.slot.SlotService; @@ -63,6 +65,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.apache.seatunnel.engine.server.rocksdb.RocksDBStateBackend.DB_PATH; + @Slf4j public class SeaTunnelServer implements ManagedService, MembershipAwareService, LiveOperationsTracker { @@ -94,6 +98,7 @@ public class SeaTunnelServer private ScheduledExecutorService monitorService; private JettyService jettyService; private TaskLogManagerService taskLogManagerService; + private RocksDBService rocksDBService; @Getter private SeaTunnelHealthMonitor seaTunnelHealthMonitor; @@ -192,6 +197,23 @@ private void startMaster() { 0, seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(), TimeUnit.SECONDS); + if (rocksDBService == null) { + try { + String safeAddress = + nodeEngine.getThisAddress().toString().replaceAll("[^A-Za-z0-9._-]", "_"); + String dbPath = DB_PATH + "_" + safeAddress; + + this.rocksDBService = + new RocksDBService( + dbPath, + MapStoreConfigFactory.createMapStoreConfig( + seaTunnelConfig.getEngineConfig().getMapStoreConfig(), + nodeEngine.getHazelcastInstance())); + } catch (Exception e) { + LOGGER.severe("Failed to initialize RocksDB state backend: " + e.getMessage()); + throw new SeaTunnelEngineException("Failed to init RocksDBStateBackend", e); + } + } } private void startWorker() { @@ -231,6 +253,10 @@ public void shutdown(boolean terminate) { if (eventService != null) { eventService.shutdownNow(); } + + if (rocksDBService != null) { + rocksDBService.close(false); + } } @Override @@ -303,6 +329,10 @@ public ClassLoaderService getClassLoaderService() { return classLoaderService; } + public RocksDBService getRocksDBService() { + return rocksDBService; + } + /** * return whether task is end * diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/FileMapStore.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/FileMapStore.java index 60cc514c17b..ea4d7946cfd 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/FileMapStore.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/FileMapStore.java @@ -20,8 +20,8 @@ import org.apache.seatunnel.shade.com.google.common.collect.Maps; import org.apache.seatunnel.engine.common.utils.FactoryUtil; -import org.apache.seatunnel.engine.imap.storage.api.IMapStorage; -import org.apache.seatunnel.engine.imap.storage.api.IMapStorageFactory; +import org.apache.seatunnel.engine.imap.storage.api.RocksDBStorage; +import org.apache.seatunnel.engine.imap.storage.api.RocksDBStorageFactory; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.map.MapLoaderLifecycleSupport; @@ -36,7 +36,7 @@ public class FileMapStore implements MapStore, MapLoaderLifecycleSupport { - private IMapStorage mapStorage; + private RocksDBStorage mapStorage; @Override public void init(HazelcastInstance hazelcastInstance, Properties properties, String mapName) { @@ -45,7 +45,7 @@ public void init(HazelcastInstance hazelcastInstance, Properties properties, Str this.mapStorage = FactoryUtil.discoverFactory( Thread.currentThread().getContextClassLoader(), - IMapStorageFactory.class, + RocksDBStorageFactory.class, (String) initMap.get("type")) .create(initMap); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/MapStoreConfigFactory.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/MapStoreConfigFactory.java new file mode 100644 index 00000000000..8f1df50245e --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/MapStoreConfigFactory.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.persistence; + +import org.apache.seatunnel.shade.com.google.common.collect.Maps; + +import org.apache.seatunnel.engine.common.config.server.MapStoreConfig; +import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions; + +import com.hazelcast.config.MapConfig; +import com.hazelcast.core.HazelcastInstance; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +@Slf4j +public class MapStoreConfigFactory { + + public static final String ENGINE_MAP_NAME = "engine*"; + + private MapStoreConfigFactory() {} + + public static MapStoreConfig createMapStoreConfig( + MapStoreConfig mapStoreConfig, HazelcastInstance hazelcastInstance) { + MapConfig mapConfig = hazelcastInstance.getConfig().getMapConfig(ENGINE_MAP_NAME); + if (mapConfig == null || mapConfig.getMapStoreConfig() == null) { + return mapStoreConfig; + } + + Properties mapStoreProperties = mapConfig.getMapStoreConfig().getProperties(); + if (mapStoreProperties == null || mapStoreProperties.isEmpty()) { + return mapStoreConfig; + } + + Map config = new HashMap<>(Maps.fromProperties(mapStoreProperties)); + + try { + Object mapStoreEnabled = + config.get( + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_ENABLED.key()); + if (mapStoreEnabled != null) { + mapStoreConfig.setMapStoreEnabled(Boolean.parseBoolean(mapStoreEnabled.toString())); + } + + Object mapStoreType = + config.get(ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_TYPE.key()); + if (mapStoreType != null) { + mapStoreConfig.setMapStoreType(mapStoreType.toString()); + } + + Object namespace = + config.get( + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_NAMESPACE + .key()); + if (namespace != null) { + mapStoreConfig.setNamespace(namespace.toString()); + } + + Object clusterName = + config.get( + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_CLUSTER_NAME + .key()); + if (clusterName != null) { + mapStoreConfig.setClusterName(clusterName.toString()); + } + + Object defaultFS = + config.get( + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_DEFAULT_FS + .key()); + if (defaultFS != null) { + mapStoreConfig.setDefaultFS(defaultFS.toString()); + } + + Object blockSize = + config.get( + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_BLOCK_SIZE + .key()); + if (blockSize != null) { + mapStoreConfig.setBlockSize(Integer.parseInt(blockSize.toString())); + } + + Object ossBucket = + config.get( + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_BUCKET + .key()); + if (ossBucket != null) { + mapStoreConfig.setOssBucket(ossBucket.toString()); + } + + Object ossAccessKeyId = + config.get( + ServerConfigOptions.MasterServerConfigOptions + .MAP_STORE_OSS_ACCESS_KEY_ID + .key()); + if (ossAccessKeyId != null) { + mapStoreConfig.setOssAccessKeyId(ossAccessKeyId.toString()); + } + + Object ossAccessKeySecret = + config.get( + ServerConfigOptions.MasterServerConfigOptions + .MAP_STORE_OSS_ACCESS_KEY_SECRET + .key()); + if (ossAccessKeySecret != null) { + mapStoreConfig.setOssAccessKeySecret(ossAccessKeySecret.toString()); + } + + Object ossEndpoint = + config.get( + ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_ENDPOINT + .key()); + if (ossEndpoint != null) { + mapStoreConfig.setOssEndpoint(ossEndpoint.toString()); + } + } catch (Exception e) { + log.warn("Failed to create MapStoreConfig from hazelcast config map", e); + } + + return mapStoreConfig; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/rocksdb/FileMapStore.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/rocksdb/FileMapStore.java new file mode 100644 index 00000000000..c1be977c453 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/rocksdb/FileMapStore.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.persistence.rocksdb; + +import org.apache.seatunnel.engine.imap.storage.api.RocksDBStorage; +import org.apache.seatunnel.engine.imap.storage.api.RocksDBStorageFactory; + +import lombok.SneakyThrows; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class FileMapStore { + private RocksDBStorage mapStorage; + + public FileMapStore(RocksDBStorageFactory factory, Map configuration) { + this.mapStorage = factory.create(configuration); + } + + public void destroy() { + mapStorage.destroy(false); + } + + public void store(Object key, Object value) { + mapStorage.store(key, value); + } + + public void storeAll(Map map) { + mapStorage.storeAll(map); + } + + public void delete(Object key) { + mapStorage.delete(key); + } + + public void deleteAll(Collection keys) { + mapStorage.deleteAll(keys); + } + + @SneakyThrows + public Map loadAll(Collection keys) { + Map allMap = mapStorage.loadAll(); + Map retMap = new HashMap<>(); + keys.forEach(key -> retMap.put(key, allMap.get(key))); + + return Collections.unmodifiableMap(retMap); + } + + public Iterable loadAllKeys() { + return mapStorage.loadAllKeys(); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/rocksdb/FileMapStoreFactory.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/rocksdb/FileMapStoreFactory.java new file mode 100644 index 00000000000..32e26a5b990 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/rocksdb/FileMapStoreFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.persistence.rocksdb; + +import org.apache.seatunnel.engine.imap.storage.api.RocksDBStorageFactory; + +import java.util.Map; + +public class FileMapStoreFactory { + public static final String BUSINESS_NAME = "businessName"; + + public FileMapStore newMapStore( + RocksDBStorageFactory factory, Map configuration, String name) { + configuration.put(BUSINESS_NAME, name); + return new FileMapStore(factory, configuration); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/rocksdb/FileMapStoreManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/rocksdb/FileMapStoreManager.java new file mode 100644 index 00000000000..005080a5fb3 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/persistence/rocksdb/FileMapStoreManager.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.server.persistence.rocksdb; + +import org.apache.seatunnel.engine.imap.storage.api.RocksDBStorageFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class FileMapStoreManager { + private final FileMapStoreFactory factory = new FileMapStoreFactory(); + private final Map mapStores = new HashMap<>(); + + public FileMapStoreManager( + List stateNames, + RocksDBStorageFactory storageFactory, + Map configuration) { + for (String name : stateNames) { + FileMapStore fileMapStore = factory.newMapStore(storageFactory, configuration, name); + mapStores.put(name, fileMapStore); + } + } + + public void put(String name, Object key, Object value) { + mapStores.get(name).store(key, value); + } + + public void remove(String name, Object key) { + mapStores.get(name).delete(key); + } + + public Map loadAll(String name) { + Iterable loadedAllKeys = mapStores.get(name).loadAllKeys(); + List keys = new ArrayList<>(); + for (Object k : loadedAllKeys) { + keys.add(k); + } + return mapStores.get(name).loadAll(keys); + } + + public void destroy() { + for (FileMapStore store : mapStores.values()) { + store.destroy(); + } + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/BackendFactory.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/BackendFactory.java new file mode 100644 index 00000000000..18897490ac3 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/BackendFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.server.rocksdb; + +import org.apache.seatunnel.engine.common.config.server.MapStoreConfig; +import org.apache.seatunnel.engine.imap.storage.file.RocksDBFileStorageFactory; + +import org.rocksdb.RocksDBException; + +public class BackendFactory { + private BackendFactory() {} + + public static RocksDBStateBackend createRocksDBStateBackend( + String dbPath, MapStoreConfig mapStoreConfig) { + try { + return new RocksDBStateBackend(dbPath, new RocksDBFileStorageFactory(), mapStoreConfig); + } catch (RocksDBException e) { + throw new RocksDBRuntimeException("Failed to create RocksDBStateBackend", e); + } + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBRuntimeException.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBRuntimeException.java new file mode 100644 index 00000000000..adeaf3a30f0 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBRuntimeException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.server.rocksdb; + +public class RocksDBRuntimeException extends RuntimeException { + public RocksDBRuntimeException(String message) { + super(message); + } + + public RocksDBRuntimeException(String message, Throwable cause) { + super(message, cause); + } + + public RocksDBRuntimeException(Throwable cause) { + super(cause); + } + + protected RocksDBRuntimeException(Throwable cause, String message, Object... data) { + super(String.format(message, data), cause); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBService.java new file mode 100644 index 00000000000..55d9d08d187 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBService.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.server.rocksdb; + +import org.apache.seatunnel.engine.common.config.server.MapStoreConfig; + +import java.util.Map; + +public class RocksDBService { + private final String dbPath; + private final RocksDBStateBackend stateBackend; + + public RocksDBService(String dbPath, MapStoreConfig config) { + this.dbPath = dbPath; + this.stateBackend = BackendFactory.createRocksDBStateBackend(dbPath, config); + stateBackend.init(); + } + + public RocksDBValueState getValueState(String stateName) { + return stateBackend.getValueState(stateName); + } + + public Map getAllData(String stateName) { + RocksDBValueState valueState = stateBackend.getValueState(stateName); + return RocksDBUtils.toMap(valueState.iterator()); + } + + public V getData(String stateName, K key) { + RocksDBValueState valueState = stateBackend.getValueState(stateName); + return valueState.get(key); + } + + public void putData(String stateName, Map map) { + stateBackend.putAll(stateName, map); + } + + public void removeData(String stateName, K key) { + stateBackend.remove(stateName, key); + } + + public void close(boolean deleteFiles) { + stateBackend.close(dbPath, deleteFiles); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBStateBackend.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBStateBackend.java new file mode 100644 index 00000000000..f7b5e96f8aa --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBStateBackend.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.server.rocksdb; + +import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils; + +import org.apache.seatunnel.common.exception.CommonError; +import org.apache.seatunnel.engine.common.Constant; +import org.apache.seatunnel.engine.common.config.server.MapStoreConfig; +import org.apache.seatunnel.engine.imap.storage.api.RocksDBStorageFactory; +import org.apache.seatunnel.engine.serializer.protobuf.ProtoStuffSerializer; +import org.apache.seatunnel.engine.server.persistence.rocksdb.FileMapStoreManager; + +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Slf4j +public class RocksDBStateBackend { + public static final String DB_PATH = "rocksdb"; + public static final String DEFAULT_NAME = "default"; + + private final RocksDB db; + private final DBOptions dbOptions; + private final List columnFamilyOptions = new ArrayList<>(); + private final List columnFamilyHandles = new ArrayList<>(); + private final Map columnFamilyMap = new HashMap<>(); + private final Map> valueStateMap = new HashMap<>(); + private final List initialStateNames = + new ArrayList<>(Arrays.asList(DEFAULT_NAME, Constant.IMAP_RUNNING_JOB_METRICS)); + + private FileMapStoreManager fileMapStoreManager; + + public RocksDBStateBackend( + String dbPath, RocksDBStorageFactory factory, MapStoreConfig mapStoreConfig) + throws RocksDBException { + RocksDB.loadLibrary(); + try { + List descriptors = getColumnFamilyDescriptors(dbPath); + + this.dbOptions = + new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true); + this.db = RocksDB.open(dbOptions, dbPath, descriptors, this.columnFamilyHandles); + + initializeColumnFamilyMapAndValueStateMap(); + + if (mapStoreConfig != null && mapStoreConfig.isMapStoreEnabled()) { + this.fileMapStoreManager = + new FileMapStoreManager(initialStateNames, factory, mapStoreConfig.toMap()); + } + } catch (RocksDBException e) { + log.error("Failed to open RocksDB at {}: {}", dbPath, e.getMessage(), e); + close(dbPath); + throw e; + } + } + + private List getColumnFamilyDescriptors(String dbPath) { + addExistingColumnFamilies(dbPath); + + List descriptors = new ArrayList<>(); + for (String name : initialStateNames) { + ColumnFamilyOptions options = new ColumnFamilyOptions(); + columnFamilyOptions.add(options); + if (DEFAULT_NAME.equals(name)) { + descriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, options)); + } else { + descriptors.add( + new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.UTF_8), options)); + } + } + return descriptors; + } + + private void addExistingColumnFamilies(String dbPath) { + try (Options options = new Options()) { + List existing = RocksDB.listColumnFamilies(options, dbPath); + + List existingNames = new ArrayList<>(); + for (byte[] bytes : existing) { + if (Arrays.equals(bytes, RocksDB.DEFAULT_COLUMN_FAMILY)) { + existingNames.add(DEFAULT_NAME); + } else { + existingNames.add(new String(bytes, StandardCharsets.UTF_8)); + } + } + + for (String name : existingNames) { + if (!initialStateNames.contains(name)) { + this.initialStateNames.add(name); + } + } + } catch (RocksDBException ignored) { + log.info("RocksDB at {} does not exist. It will be created.", dbPath); + } + } + + private void initializeColumnFamilyMapAndValueStateMap() { + int idx = 0; + for (String name : initialStateNames) { + if (StringUtils.isBlank(name)) continue; + if (idx < columnFamilyHandles.size()) { + columnFamilyMap.put(name, columnFamilyHandles.get(idx)); + } + idx++; + } + + for (String name : initialStateNames) { + if (StringUtils.isBlank(name)) continue; + RocksDBValueState valueState = + new RocksDBValueState<>( + db, columnFamilyMap.get(name), new ProtoStuffSerializer()); + valueStateMap.put(name, valueState); + } + } + + public void init() { + if (fileMapStoreManager == null) return; + for (String name : initialStateNames) { + Map loaded = fileMapStoreManager.loadAll(name); + RocksDBValueState valueState = getValueState(name); + for (Map.Entry entry : loaded.entrySet()) { + valueState.put(entry.getKey(), entry.getValue()); + } + } + } + + public RocksDBValueState getValueState(String stateName) { + @SuppressWarnings("unchecked") + RocksDBValueState rocksDBValueState = + (RocksDBValueState) valueStateMap.get(stateName); + if (rocksDBValueState == null) { + throw CommonError.illegalArgument(stateName, "getRocksDBValueState"); + } + return rocksDBValueState; + } + + public void put(String stateName, K key, V value) { + RocksDBValueState valueState = getValueState(stateName); + valueState.compute(key, oldVal -> mergeValues(oldVal, value)); + if (fileMapStoreManager != null) { + V merged = valueState.get(key); + fileMapStoreManager.put(stateName, key, merged); + } + } + + public void putAll(String stateName, Map map) { + final RocksDBValueState valueState = getValueState(stateName); + for (Map.Entry e : map.entrySet()) { + K key = e.getKey(); + valueState.compute(key, oldVal -> mergeValues(oldVal, e.getValue())); + if (fileMapStoreManager != null) { + V merged = valueState.get(key); + fileMapStoreManager.put(stateName, key, merged); + } + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static V mergeValues(V oldVal, V newVal) { + if (oldVal == null) return newVal; + + if (oldVal instanceof Map && newVal instanceof Map) { + Map merged = new HashMap((Map) oldVal); + merged.putAll((Map) newVal); + return (V) merged; + } + + return newVal; + } + + public void remove(String stateName, K key) { + RocksDBValueState valueState = getValueState(stateName); + valueState.remove(key); + if (fileMapStoreManager != null) fileMapStoreManager.remove(stateName, key); + } + + public void close(String dbPath) { + close(dbPath, false); + } + + public void close(String dbPath, boolean destroyFiles) { + if (fileMapStoreManager != null) this.fileMapStoreManager.destroy(); + + for (State state : valueStateMap.values()) { + state.close(); + } + valueStateMap.clear(); + columnFamilyHandles.clear(); + columnFamilyMap.clear(); + + try { + if (db != null) db.close(); + } catch (Exception ignored) { + log.warn("Failed to close RocksDB", ignored); + } + + try { + if (dbOptions != null) dbOptions.close(); + } catch (Exception e) { + log.warn("Failed to close DBOptions", e); + } + for (ColumnFamilyOptions options : columnFamilyOptions) { + try { + if (options != null) options.close(); + } catch (Exception e) { + log.warn("Failed to close ColumnFamilyOptions", e); + } + } + columnFamilyOptions.clear(); + + if (destroyFiles && dbPath != null) { + try (Options options = new Options()) { + RocksDB.destroyDB(dbPath, options); + } catch (RocksDBException e) { + throw new RocksDBRuntimeException("Failed to destroy RocksDB at: " + dbPath, e); + } + } + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBUtils.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBUtils.java new file mode 100644 index 00000000000..a21053bd044 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBUtils.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.server.rocksdb; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +public class RocksDBUtils { + private RocksDBUtils() {} + + public static Map toMap(Iterator> iterator) { + Map result = new HashMap<>(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + result.put(entry.getKey(), entry.getValue()); + } + return result; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBValueState.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBValueState.java new file mode 100644 index 00000000000..3bda53232dc --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBValueState.java @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.server.rocksdb; + +import org.apache.seatunnel.shade.com.google.common.collect.MapMaker; + +import org.apache.seatunnel.common.exception.CommonError; +import org.apache.seatunnel.engine.serializer.api.Serializer; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import lombok.extern.slf4j.Slf4j; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.UnaryOperator; + +@Slf4j +public class RocksDBValueState implements ValueState { + private final RocksDB db; + private final ColumnFamilyHandle columnFamilyHandle; + private final Serializer serializer; + private final WriteOptions writeOptions = new WriteOptions().setSync(false); + + private final Map lockMap = + new MapMaker().concurrencyLevel(16).weakKeys().makeMap(); + + RocksDBValueState(RocksDB db, ColumnFamilyHandle columnFamilyHandle, Serializer serializer) { + this.db = db; + this.columnFamilyHandle = columnFamilyHandle; + this.serializer = serializer; + } + + private byte[] encode(Object object) throws IOException { + if (object == null) throw CommonError.illegalArgument("object is null", "encode object"); + byte[] payload; + if (serializer != null) { + payload = serializer.serialize(object); + } else { + payload = encodeObject(object); + } + String className = object.getClass().getName(); + return encodeWithClassName(payload, className); + } + + private static byte[] encodeObject(Object key) throws IOException { + if (key == null) throw CommonError.illegalArgument("object is null", "encode object"); + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream objectOutputStream = + new ObjectOutputStream(byteArrayOutputStream)) { + objectOutputStream.writeObject(key); + objectOutputStream.flush(); + return byteArrayOutputStream.toByteArray(); + } + } + + @SuppressWarnings("unchecked") + private K decodeKey(byte[] bytes) throws IOException, ClassNotFoundException { + if (bytes == null) return null; + Decoded decoded = decodeWithClassName(bytes); + Class actualClass = Class.forName(decoded.className); + if (serializer != null) { + try { + return (K) serializer.deserialize(decoded.payload, actualClass); + } catch (Exception ex) { + log.warn( + "serializer.deserialize failed for key class {}, fallback to Java deserialization", + decoded.className, + ex); + } + } + return (K) javaDeserialize(decoded.payload); + } + + @SuppressWarnings("unchecked") + private V decodeValue(byte[] bytes) throws IOException, ClassNotFoundException { + if (bytes == null) return null; + Decoded decoded = decodeWithClassName(bytes); + Class actualClass = Class.forName(decoded.className); + if (serializer != null) { + try { + return (V) serializer.deserialize(decoded.payload, actualClass); + } catch (Exception e) { + log.warn( + "serializer.deserialize failed for value class {}, fallback to Java deserialization", + decoded.className, + e); + } + } + return (V) javaDeserialize(decoded.payload); + } + + private Object javaDeserialize(byte[] payload) throws IOException, ClassNotFoundException { + if (payload == null) + throw CommonError.illegalArgument("payload is null", "deserialize payload"); + try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(payload); + ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) { + return objectInputStream.readObject(); + } + } + + private ReentrantLock getLock(K key) { + return lockMap.computeIfAbsent(key, k -> new ReentrantLock()); + } + + public V get(K key) { + if (key == null) throw new NullPointerException("key is null"); + ReentrantLock lock = getLock(key); + lock.lock(); + try { + byte[] rawKey = encode(key); + byte[] value = db.get(columnFamilyHandle, rawKey); + return decodeValue(value); + } catch (Exception e) { + throw new RocksDBRuntimeException("Failed to get value from RocksDB. key: " + key, e); + } finally { + lock.unlock(); + } + } + + public void put(K key, V value) { + if (key == null) throw new NullPointerException("key is null"); + ReentrantLock lock = getLock(key); + lock.lock(); + try { + byte[] rawKey = encode(key); + if (value == null) { + db.delete(columnFamilyHandle, writeOptions, rawKey); + } else { + byte[] valueBytes = encode(value); + db.put(columnFamilyHandle, writeOptions, rawKey, valueBytes); + } + } catch (Exception e) { + throw new RocksDBRuntimeException( + String.format( + "Failed to put key-value into RocksDB. key: %s, value: %s", key, value), + e); + } finally { + lock.unlock(); + } + } + + public void remove(K key) { + if (key == null) throw new NullPointerException("key is null"); + ReentrantLock lock = getLock(key); + lock.lock(); + try { + byte[] rawKey = encode(key); + db.delete(columnFamilyHandle, writeOptions, rawKey); + } catch (Exception e) { + throw new RocksDBRuntimeException("Failed to remove key from RocksDB. key: " + key, e); + } finally { + lock.unlock(); + } + } + + @Override + public boolean contains(K key) throws IOException, RocksDBException { + if (key == null) throw new NullPointerException("key is null"); + ReentrantLock lock = getLock(key); + lock.lock(); + try { + byte[] rawKey = encode(key); + byte[] value = db.get(columnFamilyHandle, rawKey); + return value != null; + } finally { + lock.unlock(); + } + } + + @Override + public Iterable> entries() { + return () -> { + try { + return iterator(); + } catch (Exception e) { + throw new RocksDBRuntimeException("Failed to create iterator for entries", e); + } + }; + } + + @Override + public Iterable keys() { + return () -> new KeyIterator(iterator()); + } + + @Override + public Iterable values() throws Exception { + return () -> new ValueIterator(iterator()); + } + + @Override + public Iterator> iterator() { + RocksIterator rocksIter; + try { + rocksIter = db.newIterator(columnFamilyHandle); + } catch (Exception e) { + throw new RocksDBRuntimeException("Failed to create RocksIterator", e); + } + rocksIter.seekToFirst(); + + return new AutoCloseableIterator>() { + private boolean closed = false; + + private void closeIfNeeded() { + if (!closed) { + try { + rocksIter.close(); + } catch (Exception ignored) { + log.warn("Failed to close RocksIterator", ignored); + } finally { + closed = true; + } + } + } + + @Override + public boolean hasNext() { + boolean valid = rocksIter.isValid(); + if (!valid) { + closeIfNeeded(); + } + return valid; + } + + @Override + public Map.Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + try { + byte[] keyBytes = rocksIter.key(); + byte[] valueBytes = rocksIter.value(); + K k = decodeKey(keyBytes); + V v = decodeValue(valueBytes); + rocksIter.next(); + return new AbstractMap.SimpleEntry<>(k, v); + } catch (Exception e) { + closeIfNeeded(); + throw new RocksDBRuntimeException("Failed to deserialize entry", e); + } + } + + @Override + public void close() { + closeIfNeeded(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public boolean isEmpty() { + RocksIterator it = db.newIterator(columnFamilyHandle); + try { + it.seekToFirst(); + return !it.isValid(); + } finally { + try { + it.close(); + } catch (Exception ignored) { + log.warn("Failed to close RocksIterator", ignored); + } + } + } + + @Override + public void close() { + try { + columnFamilyHandle.close(); + writeOptions.close(); + } catch (Exception ignored) { + log.warn("Failed to close ColumnFamilyHandle", ignored); + } + } + + public void compute(K key, UnaryOperator remappingFunction) { + if (key == null) throw new NullPointerException("key"); + ReentrantLock lock = getLock(key); + lock.lock(); + try { + byte[] rawKey = encode(key); + byte[] rawValue = db.get(columnFamilyHandle, rawKey); + V oldValue = decodeValue(rawValue); + V newValue = remappingFunction.apply(oldValue); + + if (newValue == null) { + db.delete(columnFamilyHandle, rawKey); + } else { + byte[] valueBytes = encode(newValue); + db.put(columnFamilyHandle, rawKey, valueBytes); + } + } catch (Exception e) { + throw new RocksDBRuntimeException( + "Failed to compute value for key in RocksDB. key: " + key, e); + } finally { + lock.unlock(); + } + } + + private byte[] encodeWithClassName(byte[] payload, String className) { + byte[] bytes = className.getBytes(StandardCharsets.UTF_8); + int payloadLen = payload == null ? 0 : payload.length; + ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES + bytes.length + payloadLen); + byteBuffer.putInt(bytes.length); + byteBuffer.put(bytes); + if (payloadLen > 0) byteBuffer.put(payload); + return byteBuffer.array(); + } + + private Decoded decodeWithClassName(byte[] encoded) { + if (encoded == null || encoded.length < Integer.BYTES) { + throw CommonError.illegalArgument(Arrays.toString(encoded), "check framed data"); + } + ByteBuffer byteBuffer = ByteBuffer.wrap(encoded); + int len = byteBuffer.getInt(); + if (len < 0 || len > byteBuffer.remaining()) { + throw CommonError.illegalArgument(Arrays.toString(encoded), "check framed data"); + } + byte[] className = new byte[len]; + byteBuffer.get(className); + byte[] payload = new byte[byteBuffer.remaining()]; + byteBuffer.get(payload); + Decoded decoded = new Decoded(); + decoded.className = new String(className, StandardCharsets.UTF_8); + decoded.payload = payload; + return decoded; + } + + private static class Decoded { + String className; + byte[] payload; + } + + private class KeyIterator implements Iterator { + private final AutoCloseableIterator> entryIter; + + public KeyIterator(Iterator> iterator) { + this.entryIter = (AutoCloseableIterator>) iterator; + } + + @Override + public boolean hasNext() { + return entryIter.hasNext(); + } + + @Override + public K next() { + return entryIter.next().getKey(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + private class ValueIterator implements Iterator { + private final AutoCloseableIterator> entryIter; + + public ValueIterator(Iterator> iterator) { + this.entryIter = (AutoCloseableIterator>) iterator; + } + + @Override + public boolean hasNext() { + return entryIter.hasNext(); + } + + @Override + public V next() { + return entryIter.next().getValue(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + public interface AutoCloseableIterator extends Iterator, AutoCloseable {} +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/State.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/State.java new file mode 100644 index 00000000000..b14c5807d8a --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/State.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.server.rocksdb; + +public interface State { + void close(); +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/ValueState.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/ValueState.java new file mode 100644 index 00000000000..b0a70c9bcdb --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rocksdb/ValueState.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.server.rocksdb; + +import java.util.Iterator; +import java.util.Map; + +public interface ValueState extends State { + + V get(K key); + + void put(K key, V value); + + void remove(K key); + + boolean contains(K key) throws Exception; + + Iterable> entries(); + + Iterable keys(); + + Iterable values() throws Exception; + + Iterator> iterator(); + + boolean isEmpty(); +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java index da2e1851089..4a874ce13b3 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java @@ -35,6 +35,10 @@ import org.apache.seatunnel.engine.server.task.operation.SendConnectorJarToMemberNodeOperation; import org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation; import org.apache.seatunnel.engine.server.task.operation.checkpoint.CloseRequestOperation; +import org.apache.seatunnel.engine.server.task.operation.rocksdb.GetAllDataOperation; +import org.apache.seatunnel.engine.server.task.operation.rocksdb.GetDataOperation; +import org.apache.seatunnel.engine.server.task.operation.rocksdb.PutDataOperation; +import org.apache.seatunnel.engine.server.task.operation.rocksdb.RemoveDataOperation; import org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation; import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation; import org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation; @@ -108,7 +112,15 @@ public class TaskDataSerializerHook implements DataSerializerHook { public static final int CLEAN_LOG_OPERATION = 27; - public static final int REPORT_METRICS_OPERATION = 28; + public static final int GET_DATA_OPERATION = 28; + + public static final int PUT_DATA_OPERATION = 29; + + public static final int REMOVE_DATA_OPERATION = 30; + + public static final int GET_ALL_DATA_OPERATION = 31; + + public static final int REPORT_METRICS_OPERATION = 32; public static final int FACTORY_ID = FactoryIdHelper.getFactoryId( @@ -184,6 +196,14 @@ public IdentifiedDataSerializable create(int typeId) { return new CloseIdleReaderOperation(); case CLEAN_LOG_OPERATION: return new CleanLogOperation(); + case GET_DATA_OPERATION: + return new GetDataOperation(); + case PUT_DATA_OPERATION: + return new PutDataOperation(); + case REMOVE_DATA_OPERATION: + return new RemoveDataOperation(); + case GET_ALL_DATA_OPERATION: + return new GetAllDataOperation(); case REPORT_METRICS_OPERATION: return new ReportMetricsOperation(); default: diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/rocksdb/GetAllDataOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/rocksdb/GetAllDataOperation.java new file mode 100644 index 00000000000..507a7a0c4cf --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/rocksdb/GetAllDataOperation.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.server.task.operation.rocksdb; + +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook; +import org.apache.seatunnel.engine.server.task.operation.TracingOperation; + +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import com.hazelcast.nio.serialization.IdentifiedDataSerializable; + +import java.io.IOException; +import java.util.Map; + +public class GetAllDataOperation extends TracingOperation + implements IdentifiedDataSerializable { + private String stateName; + private Map map; + + public GetAllDataOperation() {} + + public GetAllDataOperation(String stateName) { + this.stateName = stateName; + } + + @Override + public void runInternal() throws Exception { + SeaTunnelServer seaTunnelServer = getService(); + map = seaTunnelServer.getRocksDBService().getAllData(stateName); + } + + @Override + protected void writeInternal(ObjectDataOutput out) throws IOException { + super.writeInternal(out); + out.writeString(stateName); + } + + @Override + protected void readInternal(ObjectDataInput in) throws IOException { + super.readInternal(in); + this.stateName = in.readString(); + } + + @Override + public int getFactoryId() { + return TaskDataSerializerHook.FACTORY_ID; + } + + @Override + public int getClassId() { + return TaskDataSerializerHook.GET_ALL_DATA_OPERATION; + } + + public Map getMap() { + return map; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/rocksdb/GetDataOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/rocksdb/GetDataOperation.java new file mode 100644 index 00000000000..701c9048e0d --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/rocksdb/GetDataOperation.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.server.task.operation.rocksdb; + +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook; +import org.apache.seatunnel.engine.server.task.operation.TracingOperation; + +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import com.hazelcast.nio.serialization.IdentifiedDataSerializable; + +import java.io.IOException; + +public class GetDataOperation extends TracingOperation implements IdentifiedDataSerializable { + private String stateName; + private K key; + private V value; + + public GetDataOperation() {} + + public GetDataOperation(String stateName, K key) { + this.stateName = stateName; + this.key = key; + } + + @Override + public void runInternal() throws Exception { + SeaTunnelServer seaTunnelServer = getService(); + value = seaTunnelServer.getRocksDBService().getData(stateName, key); + } + + @Override + protected void writeInternal(ObjectDataOutput out) throws IOException { + super.writeInternal(out); + out.writeString(stateName); + out.writeObject(key); + } + + @Override + protected void readInternal(ObjectDataInput in) throws IOException { + super.readInternal(in); + this.stateName = in.readString(); + this.key = in.readObject(); + } + + @Override + public int getFactoryId() { + return TaskDataSerializerHook.FACTORY_ID; + } + + @Override + public int getClassId() { + return TaskDataSerializerHook.GET_DATA_OPERATION; + } + + public V getValue() { + return value; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/rocksdb/PutDataOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/rocksdb/PutDataOperation.java new file mode 100644 index 00000000000..2ad04e3e82c --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/rocksdb/PutDataOperation.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.server.task.operation.rocksdb; + +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook; +import org.apache.seatunnel.engine.server.task.operation.TracingOperation; + +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import com.hazelcast.nio.serialization.IdentifiedDataSerializable; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class PutDataOperation extends TracingOperation implements IdentifiedDataSerializable { + private String stateName; + private Map map; + + public PutDataOperation() {} + + public PutDataOperation(String stateName, Map map) { + this.stateName = stateName; + this.map = map; + } + + @Override + public void runInternal() throws Exception { + SeaTunnelServer seaTunnelServer = getService(); + seaTunnelServer.getRocksDBService().putData(stateName, map); + } + + @Override + protected void writeInternal(ObjectDataOutput out) throws IOException { + super.writeInternal(out); + out.writeString(stateName); + out.writeInt(map == null ? 0 : map.size()); + if (map != null) { + for (Map.Entry e : map.entrySet()) { + out.writeObject(e.getKey()); + out.writeObject(e.getValue()); + } + } + } + + @Override + protected void readInternal(ObjectDataInput in) throws IOException { + super.readInternal(in); + this.stateName = in.readString(); + int size = in.readInt(); + this.map = new HashMap<>(size); + for (int i = 0; i < size; i++) { + K key = in.readObject(); + V value = in.readObject(); + this.map.put(key, value); + } + } + + @Override + public int getFactoryId() { + return TaskDataSerializerHook.FACTORY_ID; + } + + @Override + public int getClassId() { + return TaskDataSerializerHook.PUT_DATA_OPERATION; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/rocksdb/RemoveDataOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/rocksdb/RemoveDataOperation.java new file mode 100644 index 00000000000..2912b43a2f2 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/rocksdb/RemoveDataOperation.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.server.task.operation.rocksdb; + +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook; +import org.apache.seatunnel.engine.server.task.operation.TracingOperation; + +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import com.hazelcast.nio.serialization.IdentifiedDataSerializable; + +import java.io.IOException; + +public class RemoveDataOperation extends TracingOperation implements IdentifiedDataSerializable { + private String stateName; + private K key; + + public RemoveDataOperation() {} + + public RemoveDataOperation(String stateName, K key) { + this.stateName = stateName; + this.key = key; + } + + @Override + public void runInternal() throws Exception { + SeaTunnelServer seaTunnelServer = getService(); + seaTunnelServer.getRocksDBService().removeData(stateName, key); + } + + @Override + protected void writeInternal(ObjectDataOutput out) throws IOException { + super.writeInternal(out); + out.writeString(stateName); + out.writeObject(key); + } + + @Override + protected void readInternal(ObjectDataInput in) throws IOException { + super.readInternal(in); + this.stateName = in.readString(); + this.key = in.readObject(); + } + + @Override + public int getFactoryId() { + return TaskDataSerializerHook.FACTORY_ID; + } + + @Override + public int getClassId() { + return TaskDataSerializerHook.REMOVE_DATA_OPERATION; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBServiceTest.java new file mode 100644 index 00000000000..8fe00897f4f --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rocksdb/RocksDBServiceTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.server.rocksdb; + +import org.apache.seatunnel.engine.common.config.server.MapStoreConfig; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import lombok.extern.slf4j.Slf4j; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +@Slf4j +class RocksDBServiceTest { + private static final String DB_PATH = "rocksdb_test"; + private static final String STATE_NAME = "default"; + private RocksDBService rocksDBService; + + @BeforeEach + void setUp() { + rocksDBService = new RocksDBService(DB_PATH, new MapStoreConfig()); + } + + @AfterEach + void tearDown() { + rocksDBService.close(true); + } + + @Test + void testPutAndGetData() { + String key = "testKey"; + String value = "testValue"; + + rocksDBService.putData(STATE_NAME, Collections.singletonMap(key, value)); + String retrievedValue = rocksDBService.getData(STATE_NAME, key); + + Assertions.assertEquals(value, retrievedValue); + } + + @Test + void testGetAllData() { + Map initialData = new HashMap<>(); + initialData.put("testKey1", "testValue1"); + initialData.put("testKey2", "testValue2"); + initialData.put("testKey3", "testValue3"); + rocksDBService.putData(STATE_NAME, initialData); + + Map allData = rocksDBService.getAllData(STATE_NAME); + for (Map.Entry entry : initialData.entrySet()) { + Assertions.assertEquals(entry.getValue(), allData.get(entry.getKey())); + } + } + + @Test + void testRemoveData() { + Map initialData = new HashMap<>(); + initialData.put("testKey1", "testValue1"); + initialData.put("testKey2", "testValue2"); + initialData.put("testKey3", "testValue3"); + rocksDBService.putData(STATE_NAME, initialData); + + rocksDBService.removeData(STATE_NAME, "testKey2"); + + String testKey2 = rocksDBService.getData(STATE_NAME, "testKey2"); + Assertions.assertNull(testKey2); + + Map allData = rocksDBService.getAllData(STATE_NAME); + Assertions.assertEquals(2, allData.size()); + } + + @Test + void testCloseAndReopen() { + String key = "testKey"; + String value = "testValue"; + + rocksDBService.putData(STATE_NAME, Collections.singletonMap(key, value)); + rocksDBService.close(false); + + rocksDBService = new RocksDBService(DB_PATH, new MapStoreConfig()); + String retrievedValue = rocksDBService.getData(STATE_NAME, key); + + Assertions.assertEquals(value, retrievedValue); + } + + @Test + void testPutMultipleData() { + Map dataToPut = new HashMap<>(); + dataToPut.put("key1", "value1"); + dataToPut.put("key2", "value2"); + dataToPut.put("key3", "value3"); + + rocksDBService.putData(STATE_NAME, dataToPut); + + for (Map.Entry entry : dataToPut.entrySet()) { + String retrievedValue = rocksDBService.getData(STATE_NAME, entry.getKey()); + Assertions.assertEquals(entry.getValue(), retrievedValue); + } + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/IMapStorage.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/RocksDBStorage.java similarity index 98% rename from seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/IMapStorage.java rename to seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/RocksDBStorage.java index f4c6a8f0bb0..9992e6519a0 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/IMapStorage.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/RocksDBStorage.java @@ -25,7 +25,7 @@ import java.util.Map; import java.util.Set; -public interface IMapStorage { +public interface RocksDBStorage { public void initialize(Map properties); diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/IMapStorageFactory.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/RocksDBStorageFactory.java similarity index 80% rename from seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/IMapStorageFactory.java rename to seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/RocksDBStorageFactory.java index bfdf7052051..5f3ed48f8e4 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/IMapStorageFactory.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/RocksDBStorageFactory.java @@ -20,13 +20,13 @@ package org.apache.seatunnel.engine.imap.storage.api; -import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; +import org.apache.seatunnel.engine.imap.storage.api.exception.RocksDBStorageException; import java.util.Map; -public interface IMapStorageFactory { +public interface RocksDBStorageFactory { String factoryIdentifier(); - IMapStorage create(Map configuration) throws IMapStorageException; + RocksDBStorage create(Map configuration) throws RocksDBStorageException; } diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/exception/IMapStorageException.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/exception/RocksDBStorageException.java similarity index 76% rename from seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/exception/IMapStorageException.java rename to seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/exception/RocksDBStorageException.java index 07a0165dde3..e5153573596 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/exception/IMapStorageException.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/exception/RocksDBStorageException.java @@ -20,21 +20,21 @@ package org.apache.seatunnel.engine.imap.storage.api.exception; -public class IMapStorageException extends RuntimeException { +public class RocksDBStorageException extends RuntimeException { - public IMapStorageException(String message) { + public RocksDBStorageException(String message) { super(message); } - public IMapStorageException(String message, Throwable cause) { + public RocksDBStorageException(String message, Throwable cause) { super(message, cause); } - public IMapStorageException(Throwable cause) { + public RocksDBStorageException(Throwable cause) { super(cause); } - public IMapStorageException(Throwable cause, String message, Object... data) { + public RocksDBStorageException(Throwable cause, String message, Object... data) { super(String.format(message, data), cause); } } diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/RocksDBFileStorage.java similarity index 97% rename from seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java rename to seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/RocksDBFileStorage.java index ab125ba5c84..cec62d73b77 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/RocksDBFileStorage.java @@ -20,8 +20,8 @@ package org.apache.seatunnel.engine.imap.storage.file; -import org.apache.seatunnel.engine.imap.storage.api.IMapStorage; -import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; +import org.apache.seatunnel.engine.imap.storage.api.RocksDBStorage; +import org.apache.seatunnel.engine.imap.storage.api.exception.RocksDBStorageException; import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; import org.apache.seatunnel.engine.imap.storage.file.common.FileConstants; import org.apache.seatunnel.engine.imap.storage.file.common.WALReader; @@ -68,7 +68,7 @@ * ensure data consistency use request future to ensure data consistency */ @Slf4j -public class IMapFileStorage implements IMapStorage { +public class RocksDBFileStorage implements RocksDBStorage { private static final String STORAGE_TYPE_KEY = "storage.type"; @@ -156,7 +156,7 @@ public void initialize(Map configuration) { this.fs = FileSystem.get(hadoopConf); fs.setWriteChecksum(false); } catch (IOException e) { - throw new IMapStorageException("Failed to get file system", e); + throw new RocksDBStorageException("Failed to get file system", e); } this.serializer = new ProtoStuffSerializer(); this.walDisruptor = @@ -237,7 +237,7 @@ public Map loadAll() { WALReader reader = new WALReader(fs, fileConfiguration, serializer); return reader.loadAllData(new Path(businessRootPath), new HashSet<>()); } catch (IOException e) { - throw new IMapStorageException("load all data error", e); + throw new RocksDBStorageException("load all data error", e); } } @@ -247,7 +247,7 @@ public Set loadAllKeys() { WALReader reader = new WALReader(fs, fileConfiguration, serializer); return reader.loadAllKeys(new Path(businessRootPath)); } catch (IOException e) { - throw new IMapStorageException( + throw new RocksDBStorageException( e, "load all keys error parent path is {}", e, businessRootPath); } } diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageFactory.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/RocksDBFileStorageFactory.java similarity index 68% rename from seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageFactory.java rename to seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/RocksDBFileStorageFactory.java index 49c47a4d47f..0e367f74acd 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageFactory.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/RocksDBFileStorageFactory.java @@ -20,24 +20,24 @@ package org.apache.seatunnel.engine.imap.storage.file; -import org.apache.seatunnel.engine.imap.storage.api.IMapStorage; -import org.apache.seatunnel.engine.imap.storage.api.IMapStorageFactory; -import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; +import org.apache.seatunnel.engine.imap.storage.api.RocksDBStorage; +import org.apache.seatunnel.engine.imap.storage.api.RocksDBStorageFactory; +import org.apache.seatunnel.engine.imap.storage.api.exception.RocksDBStorageException; import com.google.auto.service.AutoService; import java.util.Map; -@AutoService(IMapStorageFactory.class) -public class IMapFileStorageFactory implements IMapStorageFactory { +@AutoService(RocksDBStorageFactory.class) +public class RocksDBFileStorageFactory implements RocksDBStorageFactory { @Override public String factoryIdentifier() { return "hdfs"; } @Override - public IMapStorage create(Map initMap) throws IMapStorageException { - IMapFileStorage iMapFileStorage = new IMapFileStorage(); + public RocksDBStorage create(Map initMap) throws RocksDBStorageException { + RocksDBFileStorage iMapFileStorage = new RocksDBFileStorage(); iMapFileStorage.initialize(initMap); return iMapFileStorage; } diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReader.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReader.java index f4284b64189..55e0fbd4a34 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReader.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReader.java @@ -22,7 +22,7 @@ import org.apache.seatunnel.shade.org.apache.commons.lang3.ClassUtils; -import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; +import org.apache.seatunnel.engine.imap.storage.api.exception.RocksDBStorageException; import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; import org.apache.seatunnel.engine.imap.storage.file.wal.DiscoveryWalFileFactory; @@ -120,12 +120,12 @@ private Object deserializeData(byte[] data, String className) { } catch (IOException e) { // log.error("deserialize data error, data is {}, className is {}", data, className, // e); - throw new IMapStorageException( + throw new RocksDBStorageException( e, "deserialize data error: data is s%, className is s%", data, className); } } catch (ClassNotFoundException e) { // log.error("deserialize data error, class name is {}", className, e); - throw new IMapStorageException( + throw new RocksDBStorageException( e, "deserialize data error, class name is {}", className); } } diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java index 8198479ae87..d5b090657cf 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java @@ -20,7 +20,7 @@ package org.apache.seatunnel.engine.imap.storage.file.config; -import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; +import org.apache.seatunnel.engine.imap.storage.api.exception.RocksDBStorageException; import org.apache.hadoop.conf.Configuration; @@ -55,7 +55,7 @@ void checkConfiguration(Map config, String... keys) { } public abstract Configuration buildConfiguration(Map config) - throws IMapStorageException; + throws RocksDBStorageException; /** * set extra options for configuration diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java index 6ffd3250642..ca3c8ef7b22 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java @@ -22,7 +22,7 @@ import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils; -import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; +import org.apache.seatunnel.engine.imap.storage.api.exception.RocksDBStorageException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -95,16 +95,16 @@ public Configuration buildConfiguration(Map config) { * @param kerberosPrincipal kerberos principal * @param kerberosKeytabFilePath kerberos keytab file path * @param hdfsConf hdfs configuration - * @throws IMapStorageException authentication exception + * @throws RocksDBStorageException authentication exception */ private void authenticateKerberos( String kerberosPrincipal, String kerberosKeytabFilePath, Configuration hdfsConf) - throws IMapStorageException { + throws RocksDBStorageException { UserGroupInformation.setConfiguration(hdfsConf); try { UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath); } catch (IOException e) { - throw new IMapStorageException( + throw new RocksDBStorageException( "Failed to login user from keytab : " + kerberosKeytabFilePath + " and kerberos principal : " diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java index 71f31063be1..a6a0bbe93d9 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java @@ -20,7 +20,7 @@ package org.apache.seatunnel.engine.imap.storage.file.config; -import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; +import org.apache.seatunnel.engine.imap.storage.api.exception.RocksDBStorageException; import org.apache.hadoop.conf.Configuration; @@ -37,7 +37,7 @@ public class OssConfiguration extends AbstractConfiguration { @Override public Configuration buildConfiguration(Map config) - throws IMapStorageException { + throws RocksDBStorageException { checkConfiguration(config, OSS_BUCKET_KEY); Configuration hadoopConf = new Configuration(); hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(OSS_BUCKET_KEY)); diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java index 872120a5baf..d971a8d627a 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java @@ -20,7 +20,7 @@ package org.apache.seatunnel.engine.imap.storage.file.config; -import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; +import org.apache.seatunnel.engine.imap.storage.api.exception.RocksDBStorageException; import org.apache.hadoop.conf.Configuration; @@ -40,7 +40,7 @@ public class S3Configuration extends AbstractConfiguration { @Override public Configuration buildConfiguration(Map config) - throws IMapStorageException { + throws RocksDBStorageException { checkConfiguration(config, S3_BUCKET_KEY); String protocol = DEFAULT_PROTOCOL; if (config.get(S3_BUCKET_KEY).startsWith(S3A_PROTOCOL)) { diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java index 35db6d2842f..26cc3d5674a 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java @@ -20,7 +20,7 @@ package org.apache.seatunnel.engine.imap.storage.file.disruptor; -import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; +import org.apache.seatunnel.engine.imap.storage.api.exception.RocksDBStorageException; import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; import org.apache.seatunnel.engine.serializer.api.Serializer; @@ -105,7 +105,7 @@ public void close() throws IOException { disruptor.shutdown(DEFAULT_CLOSE_WAIT_TIME_SECONDS, TimeUnit.SECONDS); } catch (TimeoutException e) { log.error("WALDisruptor close timeout error", e); - throw new IMapStorageException("WALDisruptor close timeout error", e); + throw new RocksDBStorageException("WALDisruptor close timeout error", e); } } } diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java index 70c411e44ef..9134d54a148 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java @@ -20,7 +20,7 @@ package org.apache.seatunnel.engine.imap.storage.file.disruptor; -import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; +import org.apache.seatunnel.engine.imap.storage.api.exception.RocksDBStorageException; import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; import org.apache.seatunnel.engine.imap.storage.file.common.WALWriter; import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; @@ -49,7 +49,7 @@ public WALWorkHandler( try { writer = new WALWriter(fs, fileConfiguration, new Path(parentPath), serializer); } catch (IOException e) { - throw new IMapStorageException( + throw new RocksDBStorageException( e, "create new current writer failed, parent path is %s", parentPath); } } diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/reader/DefaultReader.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/reader/DefaultReader.java index f8d5e7fadf9..d2908b82d9c 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/reader/DefaultReader.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/reader/DefaultReader.java @@ -19,7 +19,7 @@ */ package org.apache.seatunnel.engine.imap.storage.file.wal.reader; -import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; +import org.apache.seatunnel.engine.imap.storage.api.exception.RocksDBStorageException; import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; import org.apache.seatunnel.engine.imap.storage.file.common.WALDataUtils; import org.apache.seatunnel.engine.serializer.api.Serializer; @@ -82,7 +82,7 @@ private List getFileNames(Path parentPath) { } return fileNames; } catch (IOException e) { - throw new IMapStorageException(e, "get file names error,path is s%", parentPath); + throw new RocksDBStorageException(e, "get file names error,path is s%", parentPath); } } diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java index 48c9cea0156..4463fb283bd 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java @@ -19,7 +19,7 @@ */ package org.apache.seatunnel.engine.imap.storage.file.wal.writer; -import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; +import org.apache.seatunnel.engine.imap.storage.api.exception.RocksDBStorageException; import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; import org.apache.seatunnel.engine.imap.storage.file.common.WALDataUtils; import org.apache.seatunnel.engine.serializer.api.Serializer; @@ -97,7 +97,7 @@ private void write(byte[] bytes) { checkAndSetNextScheduleRotation(allBytes.length); } catch (Exception ex) { - throw new IMapStorageException(ex); + throw new RocksDBStorageException(ex); } } diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileOSSStorageTest.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileOSSStorageTest.java index c642670016c..6d49e52cab2 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileOSSStorageTest.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileOSSStorageTest.java @@ -58,7 +58,7 @@ public class IMapFileOSSStorageTest { static String CLUSTER_NAME = "test-one"; private static final Configuration CONF; - private static final IMapFileStorage STORAGE; + private static final RocksDBFileStorage STORAGE; static { CONF = new Configuration(); @@ -69,7 +69,7 @@ public class IMapFileOSSStorageTest { CONF.set("fs.oss.accessKeySecret", OSS_ACCESS_KEY_SECRET); CONF.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem"); - STORAGE = new IMapFileStorage(); + STORAGE = new RocksDBFileStorage(); Map properties = new HashMap<>(); properties.put("storage.type", "oss"); properties.put("oss.bucket", OSS_BUCKET_NAME); diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/RocksDBFileStorageTest.java similarity index 95% rename from seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java rename to seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/RocksDBFileStorageTest.java index a1aebb5e478..d43823f4242 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/RocksDBFileStorageTest.java @@ -45,17 +45,17 @@ import static org.junit.jupiter.api.condition.OS.MAC; @EnabledOnOs({LINUX, MAC}) -public class IMapFileStorageTest { +class RocksDBFileStorageTest { private static final Configuration CONF; - private static final IMapFileStorage STORAGE; + private static final RocksDBFileStorage STORAGE; static { CONF = new Configuration(); CONF.set("fs.defaultFS", "file:///"); CONF.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem"); - STORAGE = new IMapFileStorage(); + STORAGE = new RocksDBFileStorage(); Map properties = new HashMap<>(); properties.put("fs.defaultFS", "file:///"); @@ -114,7 +114,7 @@ void testStoreArray() { data[6] = 111111111L; STORAGE.store("array", data); Long[] array = (Long[]) STORAGE.loadAll().get("array"); - Assertions.assertEquals(array[6], 111111111L); + Assertions.assertEquals(111111111L, array[6]); } @AfterAll diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index 241819ed88e..91383696a30 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -95,6 +95,7 @@ reactive-streams-1.0.4.jar regions-2.31.30.jar retries-2.31.30.jar retries-spi-2.31.30.jar +rocksdbjni-9.8.4.jar auth-2.31.30.jar annotations-2.31.30.jar apache-client-2.31.30.jar