Skip to content

Add multiple endpoint listener for fluss server. #531

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,15 @@
* CoordinatorServer and TabletServer.
*/
public abstract class ClientToServerITCaseBase {
protected static final String INTERNAL_LISTENER_NAME = "internal";
protected static final String CLIENT_LISTENER_NAME = "client";

@RegisterExtension
public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
FlussClusterExtension.builder()
.setNumOfTabletServers(3)
.setInternalListenerName(INTERNAL_LISTENER_NAME)
.setClientListenerName(CLIENT_LISTENER_NAME)
.setClusterConf(initConfig())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,8 +563,8 @@ void testGetKvSnapshot() throws Exception {
void testGetServerNodes() throws Exception {
List<ServerNode> serverNodes = admin.getServerNodes().get();
List<ServerNode> expectedNodes = new ArrayList<>();
expectedNodes.add(FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode());
expectedNodes.addAll(FLUSS_CLUSTER_EXTENSION.getTabletServerNodes());
expectedNodes.add(FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode(false));
expectedNodes.addAll(FLUSS_CLUSTER_EXTENSION.getTabletServerNodes(false));
assertThat(serverNodes).containsExactlyInAnyOrderElementsOf(expectedNodes);
}

Expand Down Expand Up @@ -748,7 +748,7 @@ tablePath, newPartitionSpec("pt", String.valueOf(currentYear + 1)), false)
@Test
void testBootstrapServerConfigAsTabletServer() throws Exception {
Configuration newConf = clientConf;
ServerNode ts0 = FLUSS_CLUSTER_EXTENSION.getTabletServerNodes().get(0);
ServerNode ts0 = FLUSS_CLUSTER_EXTENSION.getTabletServerNodes(false).get(0);
newConf.set(
ConfigOptions.BOOTSTRAP_SERVERS,
Collections.singletonList(String.format("%s:%d", ts0.host(), ts0.port())));
Expand Down Expand Up @@ -776,7 +776,7 @@ void testBootstrapServerConfigAsTabletServer() throws Exception {
}

private void assertHasTabletServerNumber(int tabletServerNumber) {
CoordinatorGateway coordinatorGateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient();
CoordinatorGateway coordinatorGateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(true);
retry(
Duration.ofMinutes(2),
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private void addRecordsToBucket(
throws Exception {
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tableBucket);
TabletServerGateway leaderGateWay =
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader, false);
assertProduceLogResponse(
leaderGateWay
.produceLog(
Expand Down
126 changes: 126 additions & 0 deletions fluss-common/src/main/java/com/alibaba/fluss/cluster/Endpoint.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (c) 2025 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.cluster;

import com.alibaba.fluss.annotation.Internal;
import com.alibaba.fluss.utils.StringUtils;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* Endpoint is what fluss server is listened for. It includes host, port and listener name. Listener
* name is used for routing, all fluss server can have same listener names to listen . For example,
* coordinator server and tablet sever can use internal lister to communicate with each other. And
* If a client connect to a server with a host and port, it can only see the other server's same
* listener.
*/
@Internal
public class Endpoint {
private static final Pattern ENDPOINT_PARSE_EXP =
Pattern.compile("^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)");

private final String host;
private final int port;
private final String listenerName;

public Endpoint(String host, int port, String listenerName) {
this.host = host;
this.port = port;
this.listenerName = listenerName;
}

public String getHost() {
return host;
}

public int getPort() {
return port;
}

public String getListenerName() {
return listenerName;
}

public static List<Endpoint> parseEndpoints(String listeners) {
if (StringUtils.isNullOrWhitespaceOnly(listeners)) {
return Collections.emptyList();
}
return Arrays.stream(listeners.split(","))
.map(Endpoint::fromString)
.collect(Collectors.toList());
}

/**
* Create EndPoint object from `endpointString`.
*
* @param listener the format is listener_name://host:port or listener_name://[ipv6 host]:port
* for example: INTERNAL://my_host:9092, CLIENT://my_host:9093 or REPLICATION://[::1]:9094
*/
private static Endpoint fromString(String listener) {
Matcher matcher = ENDPOINT_PARSE_EXP.matcher(listener.trim());
if (!matcher.matches()) {
throw new IllegalArgumentException("Invalid endpoint format: " + listener);
}

return new Endpoint(matcher.group(2), Integer.parseInt(matcher.group(3)), matcher.group(1));
}

public static String toListenerString(List<Endpoint> endpoints) {
return endpoints.stream().map(Endpoint::toString).collect(Collectors.joining(","));
}

public static List<Endpoint> getRegisteredEndpoint(
List<Endpoint> bindEndpoints, List<Endpoint> advisedEndpoints) {
Map<String, Endpoint> advisedEndpointMap =
advisedEndpoints.stream()
.collect(Collectors.toMap(Endpoint::getListenerName, endpoint -> endpoint));
return bindEndpoints.stream()
.map(
endpoint ->
advisedEndpointMap.getOrDefault(
endpoint.getListenerName(), endpoint))
.collect(Collectors.toList());
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
Endpoint endpoint = (Endpoint) o;
return port == endpoint.port
&& Objects.equals(host, endpoint.host)
&& Objects.equals(listenerName, endpoint.listenerName);
}

@Override
public int hashCode() {
return Objects.hash(host, port, listenerName);
}

@Override
public String toString() {
return listenerName + "://" + host + ":" + port;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

/**
* The metadata cache to cache the cluster metadata info.
Expand All @@ -38,7 +38,7 @@ public interface MetadataCache {
* @return the coordinator server node
*/
@Nullable
ServerNode getCoordinatorServer();
ServerNode getCoordinatorServer(String listenerName);

/**
* Check whether the tablet server id related tablet server node is alive.
Expand All @@ -55,21 +55,24 @@ public interface MetadataCache {
* @return the tablet server node
*/
@Nullable
ServerNode getTabletServer(int serverId);
ServerNode getTabletServer(int serverId, String listenerName);

/**
* Get all alive tablet server nodes.
*
* @return all alive tablet server nodes
*/
Map<Integer, ServerNode> getAllAliveTabletServers();
Map<Integer, ServerNode> getAllAliveTabletServers(String listenerName);

Set<Integer> getAliveTabletServerIds();

/** Get ids of all alive tablet server nodes. */
default int[] getLiveServerIds() {
List<ServerNode> serverNodes = new ArrayList<>(getAllAliveTabletServers().values());
int[] server = new int[serverNodes.size()];
for (int i = 0; i < serverNodes.size(); i++) {
server[i] = serverNodes.get(i).id();
Set<Integer> aliveTabletServerIds = getAliveTabletServerIds();
int[] server = new int[aliveTabletServerIds.size()];
Iterator<Integer> iterator = aliveTabletServerIds.iterator();
for (int i = 0; i < aliveTabletServerIds.size(); i++) {
server[i] = iterator.next();
}
return server;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public class ConfigOptions {
* <p>If the coordinator server is used as a bootstrap server (discover all the servers in the
* cluster), the value of this config option should be a static hostname or address.
*/
@Deprecated
public static final ConfigOption<String> COORDINATOR_HOST =
key("coordinator.host")
.stringType()
Expand All @@ -149,6 +150,7 @@ public class ConfigOptions {
* resolution. The value accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a
* combination of both.
*/
@Deprecated
public static final ConfigOption<String> COORDINATOR_PORT =
key("coordinator.port")
.stringType()
Expand All @@ -166,6 +168,50 @@ public class ConfigOptions {
+ " (“50100,50101”), ranges (“50100-50200”) or a combination"
+ " of both.");

/**
* The network address and port the server binds to for accepting connections.
*
* <p>This specifies the interface and port where the server will listen for incoming requests.
* The format is {@code listener_name://host:port}, supporting multiple addresses separated by
* commas.
*
* <p>The default value {@code "CLIENT://localhost:9123"} is suitable for local development.
*/
public static final ConfigOption<String> BIND_LISTENER =
key("bind.listeners")
.stringType()
.defaultValue("CLIENT://localhost:9123")
.withDescription(
"The network address and port the server binds to for accepting connections. "
+ "The format is 'protocol://host:port', supporting multiple addresses separated by commas. "
+ "The default value is suitable for local development. "
+ "In production, configure to '0.0.0.0' for external access.");

/**
* The externally advertised address and port for client connections.
*
* <p>This specifies the address other nodes/clients should use to connect to this server. It is
* required when the bind address ({@link #BIND_LISTENER}) is not publicly reachable (e.g., when
* using {@code localhost} in {@code bind.listeners}). <b>Must be configured in distributed
* environments</b> to ensure proper cluster discovery. If not explicitly set, the value of
* {@code bind.listeners} will be used as fallback.
*/
public static final ConfigOption<String> ADVERTISED_LISTENER =
key("advertised.listeners")
.stringType()
.noDefaultValue()
.withDescription(
"The externally advertised address and port for client connections. "
+ "Required in distributed environments when the bind address is not publicly reachable. "
+ "Format matches 'bind.listeners' (protocol://host:port). "
+ "Defaults to the value of 'bind.listeners' if not explicitly configured.");

public static final ConfigOption<String> INTERNAL_LISTENER_NAME =
key("internal.listener.name")
.stringType()
.defaultValue("FLUSS")
.withDescription("The listener for internal communicate");

public static final ConfigOption<Integer> COORDINATOR_IO_POOL_SIZE =
key("coordinator.io-pool.size")
.intType()
Expand All @@ -180,6 +226,7 @@ public class ConfigOptions {
// ConfigOptions for Tablet Server
// ------------------------------------------------------------------------
/** The external address of the network interface where the tablet server is exposed. */
@Deprecated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Add deprection information (how to migrate to new config) in the description. Update all the deprecated config.
  2. Add a test to verify that configuring legacy tablet-server.host, coordinator.host, tablet-server.port, coordinator.port still works.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add test in cluster.EndpointTest#testCoordinatorEndpointsCompatibility

public static final ConfigOption<String> TABLET_SERVER_HOST =
key("tablet-server.host")
.stringType()
Expand All @@ -193,6 +240,7 @@ public class ConfigOptions {
* The default network port the tablet server expects incoming IPC connections. The {@code "0"}
* means that the TabletServer searches for a free port.
*/
@Deprecated
public static final ConfigOption<String> TABLET_SERVER_PORT =
key("tablet-server.port")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2025 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.config;

import com.alibaba.fluss.cluster.Endpoint;

import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link Endpoint}. */
public class EndPointTest {

@Test
void testParseEndpoints() {
List<Endpoint> parsedEndpoints =
Endpoint.parseEndpoints(
"INTERNAL://my_host:9092, CLIENT://127.0.0.1:9093, REPLICATION://[::1]:9092");
List<Endpoint> expectedEndpoints =
Arrays.asList(
new Endpoint("my_host", 9092, "INTERNAL"),
new Endpoint("127.0.0.1", 9093, "CLIENT"),
new Endpoint("::1", 9092, "REPLICATION"));

assertThat(parsedEndpoints).hasSameElementsAs(expectedEndpoints);
}

@Test
void testAdvisedEndpoints() {

List<Endpoint> registeredEndpoint =
Endpoint.getRegisteredEndpoint(
Endpoint.parseEndpoints(
"INTERNAL://127.0.0.1:9092, CLIENT://127.0.0.1:9093, REPLICATION://[::1]:9094"),
Endpoint.parseEndpoints(
"CLIENT://my_host:9092,CLIENT2://my_host:9093,REPLICATION://[::1]:9094"));
List<Endpoint> expectedEndpoints =
Arrays.asList(
new Endpoint("127.0.0.1", 9092, "INTERNAL"),
new Endpoint("my_host", 9092, "CLIENT"),
new Endpoint("::1", 9094, "REPLICATION"));
assertThat(registeredEndpoint).hasSameElementsAs(expectedEndpoints);
}
}
Loading