Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .kokoro/psm_interop_kokoro_lib.sh
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ psm::light::setup() {
psm::light::get_tests() {
TESTS=(
"fallback_test"
"federation_test"
)
}

Expand Down
76 changes: 61 additions & 15 deletions framework/helpers/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,11 @@ def _make_working_dir(base: pathlib.Path) -> str:


class Bootstrap:
def __init__(
self,
base: pathlib.Path,
primary_port: int,
fallback_port: int,
host_name: str,
):
self.primary_port = primary_port
self.fallback_port = fallback_port
def __init__(self, base: pathlib.Path, **kwargs):
self.mount_dir = _make_working_dir(base)
# Use Mako
template = mako.template.Template(filename=BOOTSTRAP_JSON_TEMPLATE)
file = template.render(
servers=[
f"{host_name}:{primary_port}",
f"{host_name}:{fallback_port}",
]
)
file = template.render(**kwargs)
destination = self.mount_dir / "bootstrap.json"
with open(destination, "w", encoding="utf-8") as f:
f.write(file)
Expand Down Expand Up @@ -107,6 +94,7 @@ def Configure(config, image: str, name: str):
config["detach"] = True
config["environment"] = {
"GRPC_EXPERIMENTAL_XDS_FALLBACK": "true",
"GRPC_EXPERIMENTAL_XDS_FEDERATION": "true",
"GRPC_TRACE": "xds_client",
"GRPC_VERBOSITY": "info",
"GRPC_XDS_BOOTSTRAP": "/grpc/bootstrap.json",
Expand Down Expand Up @@ -310,3 +298,61 @@ def expect_channel_status(
break
time.sleep(poll_interval.microseconds * 0.000001)
return status


class Server(GrpcProcess):
def __init__(
self,
manager: ProcessManager,
port: int,
maintenance_port: int,
name: str,
image: str,
):
super().__init__(
manager=manager,
port=port,
image=image,
name=name,
command=[
f"--port={port}",
f"--maintenance_port={maintenance_port}",
"--secure_mode=true",
],
ports={port: port, maintenance_port: maintenance_port},
volumes={
manager.bootstrap.mount_dir: {
"bind": "/grpc",
"mode": "ro",
}
},
)
self.maintenance_port = maintenance_port

def management_channel(self) -> grpc.Channel:
if self.grpc_channel is None:
self.grpc_channel = grpc.insecure_channel(
f"localhost:{self.maintenance_port}"
)
return self.grpc_channel

def expect_channel_status(
self,
port: int,
expected_status: channelz_pb2.ChannelConnectivityState,
timeout: datetime.timedelta,
poll_interval: datetime.timedelta,
) -> channelz_pb2.ChannelConnectivityState:
deadline = datetime.datetime.now() + timeout
channelz = ChannelzServiceClient(self.management_channel())
status = None
while datetime.datetime.now() < deadline:
status = None
for ch in channelz.list_channels():
if ch.data.target.endswith(str(port)):
status = ch.data.state.state
break
if status == expected_status:
break
time.sleep(poll_interval.microseconds * 0.000001)
return status
85 changes: 71 additions & 14 deletions framework/helpers/xds_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from envoy.config.endpoint.v3 import endpoint_components_pb2
from envoy.config.endpoint.v3 import endpoint_pb2
from envoy.config.listener.v3 import api_listener_pb2
from envoy.config.listener.v3 import listener_components_pb2
from envoy.config.listener.v3 import listener_pb2
from envoy.config.route.v3 import route_components_pb2
from envoy.config.route.v3 import route_pb2
Expand All @@ -39,7 +40,7 @@ def _wrap_in_any(msg: message.Message) -> any_pb2.Any:
return any_msg


def _build_listener(listener_name: str, cluster_name: str):
def build_listener(listener_name: str, cluster_name: str):
hcm = http_connection_manager_pb2.HttpConnectionManager(
route_config=route_pb2.RouteConfiguration(
virtual_hosts=[
Expand Down Expand Up @@ -70,7 +71,7 @@ def _build_listener(listener_name: str, cluster_name: str):
)


def _build_endpoint(
def build_endpoint(
cluster_name: str, upstream_host: str, upstream_port: int
) -> endpoint_pb2.ClusterLoadAssignment:
endpoint = endpoint_components_pb2.Endpoint(
Expand Down Expand Up @@ -104,7 +105,7 @@ def _build_endpoint(
)


def _build_cluster(cluster_name: str, service_name: str) -> cluster_pb2.Cluster:
def build_cluster(cluster_name: str, service_name: str) -> cluster_pb2.Cluster:
return cluster_pb2.Cluster(
name=cluster_name,
type=cluster_pb2.Cluster.DiscoveryType.EDS,
Expand All @@ -117,6 +118,63 @@ def _build_cluster(cluster_name: str, service_name: str) -> cluster_pb2.Cluster:
)


def build_server_listener(
server_listener_name: str, port: int, route_config_name: str
) -> listener_pb2.Listener:
hcm = http_connection_manager_pb2.HttpConnectionManager(
rds=http_connection_manager_pb2.Rds(
route_config_name=route_config_name,
config_source=config_source_pb2.ConfigSource(
ads=config_source_pb2.AggregatedConfigSource()
),
),
http_filters=[
http_connection_manager_pb2.HttpFilter(
name="router", typed_config=_wrap_in_any(router_pb2.Router())
)
],
)
return listener_pb2.Listener(
name=server_listener_name,
address=address_pb2.Address(
socket_address=address_pb2.SocketAddress(
protocol=address_pb2.SocketAddress.TCP,
address="0.0.0.0",
port_value=port,
)
),
filter_chains=[
listener_components_pb2.FilterChain(
filters=[
listener_components_pb2.Filter(
name="default_filter", typed_config=_wrap_in_any(hcm)
)
]
)
],
)


def build_server_route_config(
route_config_name: str,
) -> route_pb2.RouteConfiguration:
route = route_pb2.RouteConfiguration(
name=route_config_name,
virtual_hosts=[
route_components_pb2.VirtualHost(
domains=["*"],
routes=[
route_components_pb2.Route(
match=route_components_pb2.RouteMatch(prefix=""),
non_forwarding_action=route_components_pb2.NonForwardingAction(),
)
],
)
],
)
return route


def _build_resource_to_set(resource: message.Message):
name = (
resource.cluster_name
Expand All @@ -130,21 +188,20 @@ def _build_resource_to_set(resource: message.Message):
)


def build_set_resources_request(resources: list[message.Message]):
return xdsconfig_pb2.SetResourcesRequest(
resources=[_build_resource_to_set(r) for r in resources]
)


def build_listener_and_cluster(
listener_name: str,
cluster_name: str,
upstream_host: str,
upstream_port: int,
) -> xdsconfig_pb2.SetResourcesRequest:
service_name = f"{cluster_name}_eds_service"
listener = _build_listener(listener_name, cluster_name)
cluster = _build_cluster(cluster_name, service_name)
load_assignment = _build_endpoint(
service_name, upstream_host, upstream_port
)
return xdsconfig_pb2.SetResourcesRequest(
resources=[
_build_resource_to_set(r)
for r in [listener, cluster, load_assignment]
]
)
listener = build_listener(listener_name, cluster_name)
cluster = build_cluster(cluster_name, service_name)
load_assignment = build_endpoint(service_name, upstream_host, upstream_port)
return build_set_resources_request([listener, cluster, load_assignment])
24 changes: 23 additions & 1 deletion templates/bootstrap.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"xds_servers": [
% for i, server in enumerate(servers):
% for i, server in enumerate(servers or []):
{
"server_uri": "${server}",
"channel_creds": [
Expand All @@ -14,6 +14,28 @@
}${','*bool(i < len(servers)-1)}
% endfor
],
"authorities": {
% for i, (name, server) in enumerate((authorities or {}).items()):
"${name}": {
"xds_servers": [
{
"server_uri": "${server}",
"channel_creds": [
{
"type": "insecure"
}
],
"server_features": [
"xds_v3"
]
}
]
}${','*bool(i < len(authorities) - 1)}
% endfor
},
% if server_template:
"server_listener_resource_name_template": "${server_template}",
% endif
"node": {
"id": "test-id",
"cluster": "cluster",
Expand Down
Loading