Skip to content

Commit

Permalink
fix: ensure streams have updated subjects
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <[email protected]>
  • Loading branch information
brooksmtownsend committed Apr 12, 2024
1 parent 9243793 commit 2dda860
Showing 1 changed file with 27 additions and 11 deletions.
38 changes: 27 additions & 11 deletions bin/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use async_nats::{
Client, ConnectOptions,
};

use tracing::warn;
use wadm::DEFAULT_EXPIRY_TIME;

/// Creates a NATS client from the given options
Expand Down Expand Up @@ -120,18 +121,33 @@ pub async fn ensure_stream(
subjects: Vec<String>,
description: Option<String>,
) -> Result<Stream> {
let stream_config = StreamConfig {
name: name.clone(),
description,
num_replicas: 1,
retention: async_nats::jetstream::stream::RetentionPolicy::WorkQueue,
subjects,
max_age: DEFAULT_EXPIRY_TIME,
storage: async_nats::jetstream::stream::StorageType::File,
allow_rollup: false,
..Default::default()
};

if let Ok(stream) = context.get_stream(&name).await {
// For now, we only check if the subjects are the same in order to make sure that
// newer versions of wadm adjust subjects appropriately. In the case that developers
// want to alter the storage or replicas of a stream, for example,
// we don't want to override that configuration.
if stream.cached_info().config.subjects == stream_config.subjects {
return Ok(stream);
} else {
warn!("Found stream {name} with different configuration, deleting and recreating");
context.delete_stream(name).await?;
}
}

context
.get_or_create_stream(StreamConfig {
name,
description,
num_replicas: 1,
retention: async_nats::jetstream::stream::RetentionPolicy::WorkQueue,
subjects,
max_age: DEFAULT_EXPIRY_TIME,
storage: async_nats::jetstream::stream::StorageType::File,
allow_rollup: false,
..Default::default()
})
.get_or_create_stream(stream_config)
.await
.map_err(|e| anyhow::anyhow!("{e:?}"))
}
Expand Down

0 comments on commit 2dda860

Please sign in to comment.