Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.seatunnel.engine.common;

public class Constant {
private Constant() {}

public static final String SEATUNNEL_SERVICE_NAME = "st:impl:seaTunnelServer";

public static final String SEATUNNEL_ID_GENERATOR_NAME = "SeaTunnelIdGenerator";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
import org.apache.seatunnel.engine.common.config.server.CoordinatorServiceConfig;
import org.apache.seatunnel.engine.common.config.server.HttpConfig;
import org.apache.seatunnel.engine.common.config.server.MapStoreConfig;
import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
Expand Down Expand Up @@ -99,6 +100,9 @@ public class EngineConfig {
private HttpConfig httpConfig =
ServerConfigOptions.MasterServerConfigOptions.HTTP.defaultValue();

private MapStoreConfig mapStoreConfig =
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE.defaultValue();

public void setBackupCount(int newBackupCount) {
checkBackupCount(newBackupCount, 0);
this.backupCount = newBackupCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageMode;
import org.apache.seatunnel.engine.common.config.server.CoordinatorServiceConfig;
import org.apache.seatunnel.engine.common.config.server.HttpConfig;
import org.apache.seatunnel.engine.common.config.server.MapStoreConfig;
import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
Expand Down Expand Up @@ -92,6 +93,61 @@ private boolean handleNode(Node node, String name) {
return false;
}

private MapStoreConfig parseMapStoreConfig(Node mapStoreNode) {
MapStoreConfig mapStoreConfig = new MapStoreConfig();
for (Node node : childElements(mapStoreNode)) {
String name = cleanNodeName(node);
if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_ENABLED
.key()
.equals(name)) {
mapStoreConfig.setMapStoreEnabled(getBooleanValue(getTextContent(node)));
} else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_TYPE
.key()
.equals(name)) {
mapStoreConfig.setMapStoreType(getTextContent(node));
} else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_NAMESPACE
.key()
.equals(name)) {
mapStoreConfig.setNamespace(getTextContent(node));
} else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_CLUSTER_NAME
.key()
.equals(name)) {
mapStoreConfig.setClusterName(getTextContent(node));
} else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_DEFAULT_FS
.key()
.equals(name)) {
mapStoreConfig.setDefaultFS(getTextContent(node));
} else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_BLOCK_SIZE
.key()
.equals(name)) {
mapStoreConfig.setBlockSize(
getIntegerValue(
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_BLOCK_SIZE
.key(),
getTextContent(node)));
} else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_BUCKET
.key()
.equals(name)) {
mapStoreConfig.setOssBucket(getTextContent(node));
} else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_ACCESS_KEY_ID
.key()
.equals(name)) {
mapStoreConfig.setOssAccessKeyId(getTextContent(node));
} else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_ACCESS_KEY_SECRET
.key()
.equals(name)) {
mapStoreConfig.setOssAccessKeySecret(getTextContent(node));
} else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_ENDPOINT
.key()
.equals(name)) {
mapStoreConfig.setOssEndpoint(getTextContent(node));
} else {
LOGGER.warning("Unrecognized element: " + name);
}
}
return mapStoreConfig;
}

private SlotServiceConfig parseSlotServiceConfig(Node slotServiceNode) {
SlotServiceConfig slotServiceConfig = new SlotServiceConfig();
for (Node node : childElements(slotServiceNode)) {
Expand Down Expand Up @@ -259,6 +315,8 @@ private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) {
.key()
.equals(name)) {
engineConfig.setCoordinatorServiceConfig(parseCoordinatorServiceConfig(node));
} else if (ServerConfigOptions.MasterServerConfigOptions.MAP_STORE.key().equals(name)) {
engineConfig.setMapStoreConfig((parseMapStoreConfig(node)));
} else {
LOGGER.warning("Unrecognized element: " + name);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.seatunnel.engine.common.config.server;

import lombok.Data;

import java.io.Serializable;
import java.util.Map;

import static com.hazelcast.internal.util.Preconditions.checkPositive;

@Data
public class MapStoreConfig implements Serializable {
private boolean mapStoreEnabled =
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_ENABLED.defaultValue();

private String mapStoreType =
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_TYPE.defaultValue();
private String namespace =
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_NAMESPACE.defaultValue();
private String clusterName =
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_CLUSTER_NAME.defaultValue();

private String defaultFS =
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_DEFAULT_FS.defaultValue();
private int blockSize =
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_BLOCK_SIZE.defaultValue();
private String ossBucket =
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_BUCKET.defaultValue();
private String ossAccessKeyId =
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_ACCESS_KEY_ID
.defaultValue();
private String ossAccessKeySecret =
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_ACCESS_KEY_SECRET
.defaultValue();
private String ossEndpoint =
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_ENDPOINT.defaultValue();

public void setMapStoreEnabled(boolean mapStoreEnabled) {
this.mapStoreEnabled = mapStoreEnabled;
}

public void setMapStoreType(String mapStoreType) {
this.mapStoreType = mapStoreType;
}

public void setNamespace(String namespace) {
this.namespace = namespace;
}

public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}

public void setDefaultFS(String defaultFS) {
this.defaultFS = defaultFS;
}

public void setBlockSize(int blockSize) {
checkPositive(
blockSize,
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_BLOCK_SIZE
+ " must be > 0");
this.blockSize = blockSize;
}

public void setOssBucket(String ossBucket) {
this.ossBucket = ossBucket;
}

public void setOssAccessKeyId(String ossAccessKeyId) {
this.ossAccessKeyId = ossAccessKeyId;
}

public void setOssAccessKeySecret(String ossAccessKeySecret) {
this.ossAccessKeySecret = ossAccessKeySecret;
}

public void setOssEndpoint(String ossEndpoint) {
this.ossEndpoint = ossEndpoint;
}

public Map<String, Object> toMap() {
Map<String, Object> configMap = new java.util.HashMap<>();
configMap.put(
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_ENABLED.key(),
mapStoreEnabled);
configMap.put(
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_TYPE.key(), mapStoreType);
configMap.put(
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_NAMESPACE.key(), namespace);
configMap.put(
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_CLUSTER_NAME.key(),
clusterName);
configMap.put(
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_DEFAULT_FS.key(),
defaultFS);
configMap.put(
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_BLOCK_SIZE.key(),
blockSize);
configMap.put(
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_BUCKET.key(),
ossBucket);
configMap.put(
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_ACCESS_KEY_ID.key(),
ossAccessKeyId);
configMap.put(
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_ACCESS_KEY_SECRET.key(),
ossAccessKeySecret);
configMap.put(
ServerConfigOptions.MasterServerConfigOptions.MAP_STORE_OSS_ENDPOINT.key(),
ossEndpoint);
return configMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,77 @@ public static class MasterServerConfigOptions {
.defaultValue(1)
.withDescription("Number of partitions for storing job metrics in IMap.");
/////////////////////////////////////////////////
// The options about RocksDB persistence start
public static final Option<Boolean> MAP_STORE_ENABLED =
Options.key("map-store-enabled")
.booleanType()
.defaultValue(false)
.withDescription("Enable external map-store persistence for RocksDB");

// common properties
public static final Option<String> MAP_STORE_TYPE =
Options.key("type")
.stringType()
.defaultValue("hdfs")
.withDescription("Type marker for map-store usage (hdfs/s3/oss)");

public static final Option<String> MAP_STORE_NAMESPACE =
Options.key("namespace")
.stringType()
.defaultValue("/tmp/seatunnel/imap")
.withDescription("Namespace / base path used by the file map store");

public static final Option<String> MAP_STORE_CLUSTER_NAME =
Options.key("clusterName")
.stringType()
.defaultValue("seatunnel-cluster")
.withDescription("Cluster name used by the file map store");

// HDFS / local specific
public static final Option<String> MAP_STORE_DEFAULT_FS =
Options.key("fs.defaultFS")
.stringType()
.defaultValue("hdfs://localhost:9000")
.withDescription("fs.defaultFS for HDFS; for local use file:///");

// OSS specific
public static final Option<Integer> MAP_STORE_BLOCK_SIZE =
Options.key("block.size")
.intType()
.defaultValue(134217728) // 128MB
.withDescription("Block size in bytes for OSS/HDFS write (default 128MB)");

public static final Option<String> MAP_STORE_OSS_BUCKET =
Options.key("oss.bucket")
.stringType()
.noDefaultValue()
.withDescription("OSS bucket path, e.g., oss://bucket-name/");

public static final Option<String> MAP_STORE_OSS_ACCESS_KEY_ID =
Options.key("fs.oss.accessKeyId")
.stringType()
.noDefaultValue()
.withDescription("OSS access key id");

public static final Option<String> MAP_STORE_OSS_ACCESS_KEY_SECRET =
Options.key("fs.oss.accessKeySecret")
.stringType()
.noDefaultValue()
.withDescription("OSS access key secret");

public static final Option<String> MAP_STORE_OSS_ENDPOINT =
Options.key("fs.oss.endpoint")
.stringType()
.noDefaultValue()
.withDescription("OSS endpoint");

public static final Option<MapStoreConfig> MAP_STORE =
Options.key("map-store")
.type(new TypeReference<MapStoreConfig>() {})
.defaultValue(new MapStoreConfig())
.withDescription("The map store configuration.");
// The options about RocksDB persistence end
/////////////////////////////////////////////////
// The options about Hazelcast IMAP store start
public static final Option<Integer> BACKUP_COUNT =
Options.key("backup-count")
Expand Down
5 changes: 5 additions & 0 deletions seatunnel-engine/seatunnel-engine-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
<version>${project.version}</version>
<classifier>optional</classifier>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>9.8.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down
Loading