From c9531712110ede10b916862c334de5f66fa29b74 Mon Sep 17 00:00:00 2001 From: Parker Duckworth Date: Fri, 13 Dec 2024 17:57:58 -0500 Subject: [PATCH 1/2] fix embedded server port setting/intra-cluster comms --- integration_embedded/test_client.py | 34 +++++++++++++++++++++++++++++ weaviate/embedded.py | 32 +++++++++++++++++++++------ 2 files changed, 59 insertions(+), 7 deletions(-) diff --git a/integration_embedded/test_client.py b/integration_embedded/test_client.py index 91606af6d..a132a15ac 100644 --- a/integration_embedded/test_client.py +++ b/integration_embedded/test_client.py @@ -158,3 +158,37 @@ def test_embedded_startup_with_blocked_grpc_port(tmp_path_factory: pytest.TempPa ) finally: client.close() + + +def test_embedded_create_restore_backup(tmp_path_factory: pytest.TempPathFactory) -> None: + num_objects = 10 + collection_name = "BackedUp" + backup_id = "backup0" + + try: + client = weaviate.connect_to_embedded( + persistence_data_path=tmp_path_factory.mktemp("data"), + port=8164, + grpc_port=50500, + environment_variables={ + "ENABLE_MODULES": "backup-filesystem", + "BACKUP_FILESYSTEM_PATH": tmp_path_factory.mktemp("backup"), + "DISABLE_TELEMETRY": "true", + }, + version="latest", + ) + + col = client.collections.create(collection_name) + with col.batch.dynamic() as batch: + for i in range(num_objects): + batch.add_object(properties={"text": f"text-{i}"}) + assert len(col) == num_objects + + col.backup.create(backup_id=backup_id, backend="filesystem", wait_for_completion=True) + client.collections.delete(collection_name) + col.backup.restore(backup_id=backup_id, backend="filesystem", wait_for_completion=True) + + assert len(col) == num_objects + client.collections.delete(collection_name) + finally: + client.close() diff --git a/weaviate/embedded.py b/weaviate/embedded.py index 03768f82e..718181ae6 100644 --- a/weaviate/embedded.py +++ b/weaviate/embedded.py @@ -205,16 +205,32 @@ def start(self) -> None: my_env.setdefault("AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED", "true") my_env.setdefault("QUERY_DEFAULTS_LIMIT", "20") my_env.setdefault("PERSISTENCE_DATA_PATH", self.options.persistence_data_path) - # Bug with weaviate requires setting gossip and data bind port - my_env.setdefault("CLUSTER_GOSSIP_BIND_PORT", str(get_random_port())) + my_env.setdefault("PROFILING_PORT", str(get_random_port())) + # Limitation with weaviate server requires setting + # data_bind_port to gossip_bind_port + 1 + gossip_bind_port = get_random_port() + data_bind_port = gossip_bind_port + 1 + my_env.setdefault("CLUSTER_GOSSIP_BIND_PORT", str(gossip_bind_port)) + my_env.setdefault("CLUSTER_DATA_BIND_PORT", str(data_bind_port)) my_env.setdefault("GRPC_PORT", str(self.grpc_port)) my_env.setdefault("RAFT_BOOTSTRAP_EXPECT", str(1)) my_env.setdefault("CLUSTER_IN_LOCALHOST", str(True)) - raft_port = get_random_port() + # Each call to `get_random_port()` will likely result in + # a port 1 higher than the last time it was called. With + # this, we end up with raft_port == gossip_bind_port + 1, + # which is the same as data_bind_port. This kind of + # configuration leads to failed cross cluster communication. + # Although the current version of embedded does not support + # multi-node instances, the backup process communication + # passes through the internal cluster server, and will fail. + # + # So we here we ensure that raft_port never collides with + # data_bind_port. + raft_port = data_bind_port + 1 + raft_internal_rpc_port = raft_port + 1 my_env.setdefault("RAFT_PORT", str(raft_port)) - my_env.setdefault("RAFT_INTERNAL_RPC_PORT", str(raft_port + 1)) - my_env.setdefault("PROFILING_PORT", str(get_random_port())) + my_env.setdefault("RAFT_INTERNAL_RPC_PORT", str(raft_internal_rpc_port)) my_env.setdefault( "ENABLE_MODULES", @@ -222,8 +238,8 @@ def start(self) -> None: "reranker-cohere", ) - # have a deterministic hostname in case of changes in the network name. This allows to run multiple parallel - # instances + # have a deterministic hostname in case of changes in the network name. + # This allows to run multiple parallel instances cluster_hostname = f"Embedded_at_{self.options.port}" my_env.setdefault("CLUSTER_HOSTNAME", cluster_hostname) my_env.setdefault("RAFT_JOIN", f"{cluster_hostname}:{raft_port}") @@ -243,6 +259,8 @@ def start(self) -> None: str(self.options.port), "--scheme", "http", + "--read-timeout=600s", + "--write-timeout=600s", ], env=my_env, ) From 9d4f948374ae2236b06230c812f3335ebff0fd33 Mon Sep 17 00:00:00 2001 From: Parker Duckworth Date: Fri, 13 Dec 2024 18:15:32 -0500 Subject: [PATCH 2/2] small integration test adjustment --- integration_embedded/test_client.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/integration_embedded/test_client.py b/integration_embedded/test_client.py index a132a15ac..86f591930 100644 --- a/integration_embedded/test_client.py +++ b/integration_embedded/test_client.py @@ -165,19 +165,19 @@ def test_embedded_create_restore_backup(tmp_path_factory: pytest.TempPathFactory collection_name = "BackedUp" backup_id = "backup0" - try: - client = weaviate.connect_to_embedded( - persistence_data_path=tmp_path_factory.mktemp("data"), - port=8164, - grpc_port=50500, - environment_variables={ - "ENABLE_MODULES": "backup-filesystem", - "BACKUP_FILESYSTEM_PATH": tmp_path_factory.mktemp("backup"), - "DISABLE_TELEMETRY": "true", - }, - version="latest", - ) + client = weaviate.connect_to_embedded( + persistence_data_path=tmp_path_factory.mktemp("data"), + port=8164, + grpc_port=50500, + environment_variables={ + "ENABLE_MODULES": "backup-filesystem", + "BACKUP_FILESYSTEM_PATH": tmp_path_factory.mktemp("backup"), + "DISABLE_TELEMETRY": "true", + }, + version="latest", + ) + try: col = client.collections.create(collection_name) with col.batch.dynamic() as batch: for i in range(num_objects):