Skip to content

Commit

Permalink
Merge pull request #1485 from weaviate/fix/embedded-backups
Browse files Browse the repository at this point in the history
Fix backup/restore for the embedded server
  • Loading branch information
dirkkul authored Dec 16, 2024
2 parents dee9f6b + 9d4f948 commit 7b0a7e7
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 7 deletions.
34 changes: 34 additions & 0 deletions integration_embedded/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,37 @@ def test_embedded_startup_with_blocked_grpc_port(tmp_path_factory: pytest.TempPa
)
finally:
client.close()


def test_embedded_create_restore_backup(tmp_path_factory: pytest.TempPathFactory) -> None:
num_objects = 10
collection_name = "BackedUp"
backup_id = "backup0"

client = weaviate.connect_to_embedded(
persistence_data_path=tmp_path_factory.mktemp("data"),
port=8164,
grpc_port=50500,
environment_variables={
"ENABLE_MODULES": "backup-filesystem",
"BACKUP_FILESYSTEM_PATH": tmp_path_factory.mktemp("backup"),
"DISABLE_TELEMETRY": "true",
},
version="latest",
)

try:
col = client.collections.create(collection_name)
with col.batch.dynamic() as batch:
for i in range(num_objects):
batch.add_object(properties={"text": f"text-{i}"})
assert len(col) == num_objects

col.backup.create(backup_id=backup_id, backend="filesystem", wait_for_completion=True)
client.collections.delete(collection_name)
col.backup.restore(backup_id=backup_id, backend="filesystem", wait_for_completion=True)

assert len(col) == num_objects
client.collections.delete(collection_name)
finally:
client.close()
32 changes: 25 additions & 7 deletions weaviate/embedded.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,25 +205,41 @@ def start(self) -> None:
my_env.setdefault("AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED", "true")
my_env.setdefault("QUERY_DEFAULTS_LIMIT", "20")
my_env.setdefault("PERSISTENCE_DATA_PATH", self.options.persistence_data_path)
# Bug with weaviate requires setting gossip and data bind port
my_env.setdefault("CLUSTER_GOSSIP_BIND_PORT", str(get_random_port()))
my_env.setdefault("PROFILING_PORT", str(get_random_port()))
# Limitation with weaviate server requires setting
# data_bind_port to gossip_bind_port + 1
gossip_bind_port = get_random_port()
data_bind_port = gossip_bind_port + 1
my_env.setdefault("CLUSTER_GOSSIP_BIND_PORT", str(gossip_bind_port))
my_env.setdefault("CLUSTER_DATA_BIND_PORT", str(data_bind_port))
my_env.setdefault("GRPC_PORT", str(self.grpc_port))
my_env.setdefault("RAFT_BOOTSTRAP_EXPECT", str(1))
my_env.setdefault("CLUSTER_IN_LOCALHOST", str(True))

raft_port = get_random_port()
# Each call to `get_random_port()` will likely result in
# a port 1 higher than the last time it was called. With
# this, we end up with raft_port == gossip_bind_port + 1,
# which is the same as data_bind_port. This kind of
# configuration leads to failed cross cluster communication.
# Although the current version of embedded does not support
# multi-node instances, the backup process communication
# passes through the internal cluster server, and will fail.
#
# So we here we ensure that raft_port never collides with
# data_bind_port.
raft_port = data_bind_port + 1
raft_internal_rpc_port = raft_port + 1
my_env.setdefault("RAFT_PORT", str(raft_port))
my_env.setdefault("RAFT_INTERNAL_RPC_PORT", str(raft_port + 1))
my_env.setdefault("PROFILING_PORT", str(get_random_port()))
my_env.setdefault("RAFT_INTERNAL_RPC_PORT", str(raft_internal_rpc_port))

my_env.setdefault(
"ENABLE_MODULES",
"text2vec-openai,text2vec-cohere,text2vec-huggingface,ref2vec-centroid,generative-openai,qna-openai,"
"reranker-cohere",
)

# have a deterministic hostname in case of changes in the network name. This allows to run multiple parallel
# instances
# have a deterministic hostname in case of changes in the network name.
# This allows to run multiple parallel instances
cluster_hostname = f"Embedded_at_{self.options.port}"
my_env.setdefault("CLUSTER_HOSTNAME", cluster_hostname)
my_env.setdefault("RAFT_JOIN", f"{cluster_hostname}:{raft_port}")
Expand All @@ -243,6 +259,8 @@ def start(self) -> None:
str(self.options.port),
"--scheme",
"http",
"--read-timeout=600s",
"--write-timeout=600s",
],
env=my_env,
)
Expand Down

0 comments on commit 7b0a7e7

Please sign in to comment.