Skip to content

Commit

Permalink
Add query_account to jetstream::Context
Browse files Browse the repository at this point in the history
  • Loading branch information
caspervonb committed Jun 21, 2022
1 parent efc1bbd commit e2c365f
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 3 deletions.
59 changes: 59 additions & 0 deletions async-nats/src/jetstream/account.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
pub struct Limits {
/// Maximum memory for this account (-1 if no limit)
pub max_memory: i64,
/// Maximum storage for this account (-1 if no limit)
pub max_storage: i64,
/// Maximum streams for this account (-1 if no limit)
pub max_streams: i64,
/// Maximum consumers for this account (-1 if no limit)
pub max_consumers: i64,
/// Indicates if Streams created in this account requires the max_bytes property set
pub max_bytes_required: bool,
/// The maximum number of outstanding ACKs any consumer may configure
pub max_ack_pending: i64,
/// The maximum size any single memory stream may be
pub memory_max_stream_bytes: i64,
/// The maximum size any single storage based stream may be
pub storage_max_stream_bytes: i64,
}

#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
pub struct Requests {
/// Total number of requests received for this account.
pub total: u64,
/// Total number of requests that resulted in an error response.
pub errors: u64,
}

#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
pub struct Tier {
/// Memory Storage being used for Stream Message storage
pub memory: u64,
/// File Storage being used for Stream Message storage
pub storage: u64,
/// Number of active Streams
pub streams: usize,
/// Number of active Consumers
pub consumers: usize,
///
pub limits: Limits,
pub requests: Requests,
}

#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct Account {
pub memory: u64,
pub storage: u64,
pub streams: usize,
pub consumers: usize,
pub domain: Option<String>,
pub limits: Limits,
#[serde(rename = "api")]
pub requests: Requests,
#[serde(default)]
pub tiers: HashMap<String, Tier>,
}
22 changes: 19 additions & 3 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
//
//! Manage operations on [Context], create/delete/update [Stream][crate::jetstream::stream::Stream]
use std::borrow::Borrow;
use std::io::{self, ErrorKind};

use crate::jetstream::account::Account;
use crate::jetstream::publish::PublishAck;
use crate::jetstream::response::Response;
use crate::{Client, Error};
use bytes::Bytes;
use http::HeaderMap;
use serde::{de::DeserializeOwned, Serialize};
use serde_json::{self, json};
use std::borrow::Borrow;
use std::io::{self, ErrorKind};

use super::stream::{Config, DeleteStatus, Info, Stream};

Expand Down Expand Up @@ -133,6 +133,22 @@ impl Context {
}
}

/// Query the server for account information
pub async fn query_account(&self) -> Result<Account, Error> {
let response: Response<Account> = self.request("INFO".into(), b"").await?;

match response {
Response::Err { error } => Err(Box::new(std::io::Error::new(
ErrorKind::Other,
format!(
"nats: error while querying account information: {}, {}, {}",
error.code, error.status, error.description
),
))),
Response::Ok(account) => Ok(account),
}
}

/// Create a JetStream [Stream] with given config and return a handle to it.
/// That handle can be used to manage and use [Consumer][crate::jetstream::consumer::Consumer].
///
Expand Down
1 change: 1 addition & 0 deletions async-nats/src/jetstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
use crate::{Client, Error};

pub mod account;
pub mod consumer;
pub mod context;
pub mod publish;
Expand Down
13 changes: 13 additions & 0 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ mod jetstream {
use futures::stream::{StreamExt, TryStreamExt};
use time::OffsetDateTime;

#[tokio::test]
async fn query_account_requests() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let context = async_nats::jetstream::new(client);

let account = context.query_account().await.unwrap();
assert_eq!(account.requests.total, 0);

let account = context.query_account().await.unwrap();
assert_eq!(account.requests.total, 1);
}

#[tokio::test]
async fn publish_with_headers() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down

0 comments on commit e2c365f

Please sign in to comment.