diff --git a/Cargo.lock b/Cargo.lock index cc5135d3b..4ed7eac45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1420,15 +1420,19 @@ name = "datamanager" version = "0.1.0" dependencies = [ "aws-config", + "aws-credential-types", "aws-sdk-s3", "axum", "chrono", "duckdb", + "http-body-util", + "hyper 1.7.0", "polars", "reqwest", "serde", "serde_json", "tokio", + "tokio-test", "tower", "tower-http", "tracing", @@ -4893,6 +4897,30 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-test" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-util" version = "0.7.16" diff --git a/applications/datamanager/.claude/settings.local.json b/applications/datamanager/.claude/settings.local.json index a3588b7b2..83c7d98f3 100644 --- a/applications/datamanager/.claude/settings.local.json +++ b/applications/datamanager/.claude/settings.local.json @@ -12,7 +12,9 @@ "Bash(kill:*)", "Bash(RUST_LOG=info cargo run)", "Bash(pkill:*)", - "Bash(RUST_LOG=debug cargo run)" + "Bash(RUST_LOG=debug cargo run)", + "Bash(brew install:*)", + "Bash(export:*)" ], "deny": [], "defaultMode": "acceptEdits" diff --git a/applications/datamanager/Cargo.lock b/applications/datamanager/Cargo.lock index a763f1a17..973e88540 100644 --- a/applications/datamanager/Cargo.lock +++ b/applications/datamanager/Cargo.lock @@ -1429,15 +1429,19 @@ name = "datamanager" version = "0.1.0" dependencies = [ "aws-config", + "aws-credential-types", "aws-sdk-s3", "axum", "chrono", "duckdb", + "http-body-util", + "hyper 1.7.0", "polars", "reqwest", "serde", "serde_json", "tokio", + "tokio-test", "tower", "tower-http", "tracing", @@ -4913,6 +4917,30 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-test" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-util" version = "0.7.16" diff --git a/applications/datamanager/Cargo.toml b/applications/datamanager/Cargo.toml index f5906bfe3..c496d9e3e 100644 --- a/applications/datamanager/Cargo.toml +++ b/applications/datamanager/Cargo.toml @@ -1,7 +1,15 @@ [package] name = "datamanager" version = "0.1.0" -edition = "2024" +edition = "2021" + +[lib] +name = "datamanager" +path = "src/lib.rs" + +[[bin]] +name = "datamanager" +path = "src/main.rs" [dependencies] axum = "0.8.4" @@ -17,4 +25,13 @@ tracing = "0.1.41" tracing-subscriber = { version = "0.3.20", features = ["env-filter", "fmt"] } aws-config = "1.5.8" aws-sdk-s3 = "1.48.0" +aws-credential-types = "1.2.6" duckdb = { version = "1.0", features = ["r2d2", "chrono"] } + +[dev-dependencies] +tokio-test = "0.4" +serde_json = "1.0" +tower = { version = "0.5", features = ["util"] } +hyper = { version = "1.0", features = ["full"] } +http-body-util = "0.1" +reqwest = { version = "0.12", features = ["json"] } diff --git a/applications/datamanager/src/datamanager/main.py b/applications/datamanager/src/datamanager/main.py deleted file mode 100644 index 4f9fdb52f..000000000 --- a/applications/datamanager/src/datamanager/main.py +++ /dev/null @@ -1,18 +0,0 @@ -from fastapi import FastAPI, Response, status -from structlog import get_logger - -logger = get_logger() - -app: FastAPI = FastAPI() - - -@app.get("/health") -def health_check() -> Response: - return Response(status_code=status.HTTP_200_OK) - - -@app.get("/portfolio-check") -def check_portfolio() -> Response: - logger.info("I was called by the portfoliomanager") - - return Response(status_code=status.HTTP_200_OK) diff --git a/applications/datamanager/src/lib.rs b/applications/datamanager/src/lib.rs new file mode 100644 index 000000000..f2ece13f8 --- /dev/null +++ b/applications/datamanager/src/lib.rs @@ -0,0 +1,76 @@ +use aws_sdk_s3::Client as S3Client; +use axum::{routing::get, Router}; +use reqwest::Client; +use tower_http::trace::TraceLayer; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +pub mod routes; +use routes::equity; +use routes::health; + +#[derive(Clone)] +pub struct AlpacaSecrets { + pub base: String, + pub key: String, + pub secret: String, +} + +#[derive(Clone)] +pub struct PolygonSecrets { + pub base: String, + pub key: String, +} + +#[derive(Clone)] +pub struct AppState { + pub client: Client, + pub polygon: PolygonSecrets, + pub alpaca: AlpacaSecrets, + pub s3_client: S3Client, + pub bucket_name: String, +} + +impl AppState { + pub async fn from_env() -> Self { + let client = Client::builder() + .timeout(std::time::Duration::from_secs(10)) + .build() + .unwrap(); + + let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; + let s3_client = S3Client::new(&config); + let bucket_name = + std::env::var("S3_BUCKET_NAME").unwrap_or("pocketsizefund-ml-data".to_string()); + + Self { + client, + polygon: PolygonSecrets { + base: std::env::var("POLYGON_BASE_URL") + .unwrap_or("https://api.polygon.io".to_string()), + key: std::env::var("POLYGON_API_KEY") + .expect("POLYGON_API_KEY must be set in environment"), + }, + alpaca: AlpacaSecrets { + base: std::env::var("ALPACA_BASE_URL") + .unwrap_or("https://data.alpaca.markets".to_string()), + key: std::env::var("ALPACA_API_KEY") + .expect("ALPACA_API_KEY must be set in environment"), + secret: std::env::var("ALPACA_API_SECRET") + .expect("ALPACA_API_SECRET must be set in environment"), + }, + s3_client, + bucket_name, + } + } +} + +pub async fn create_app() -> Router { + let state = AppState::from_env().await; + + Router::::new() + .route("/health", get(health::check)) + .merge(equity::router()) + .with_state(state) + .layer(TraceLayer::new_for_http()) +} + diff --git a/applications/datamanager/src/main.rs b/applications/datamanager/src/main.rs index a7b858b96..ff45884d2 100644 --- a/applications/datamanager/src/main.rs +++ b/applications/datamanager/src/main.rs @@ -1,68 +1,5 @@ -use axum::{Router, routing::get}; -use reqwest::Client; -use tower_http::trace::TraceLayer; +use datamanager::create_app; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -use aws_sdk_s3::Client as S3Client; - -mod routes; -use routes::equity; -use routes::health; - -#[derive(Clone)] -struct AlpacaSecrets { - base: String, - key: String, - secret: String, -} - -#[derive(Clone)] -struct PolygonSecrets { - base: String, - key: String, -} - -#[derive(Clone)] -struct AppState { - client: Client, - polygon: PolygonSecrets, - alpaca: AlpacaSecrets, - s3_client: S3Client, - bucket_name: String, -} - -impl AppState { - async fn from_env() -> Self { - let client = Client::builder() - .timeout(std::time::Duration::from_secs(10)) - .build() - .unwrap(); - - let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; - let s3_client = S3Client::new(&config); - let bucket_name = std::env::var("S3_BUCKET_NAME") - .unwrap_or("pocketsizefund-ml-data".to_string()); - - Self { - client, - polygon: PolygonSecrets { - base: std::env::var("POLYGON_BASE_URL") - .unwrap_or("https://api.polygon.io".to_string()), - key: std::env::var("POLYGON_API_KEY") - .expect("POLYGON_API_KEY must be set in environment"), - }, - alpaca: AlpacaSecrets { - base: std::env::var("ALPACA_BASE_URL") - .unwrap_or("https://data.alpaca.markets".to_string()), - key: std::env::var("ALPACA_API_KEY") - .expect("ALPACA_API_KEY must be set in environment"), - secret: std::env::var("ALPACA_API_SECRET") - .expect("ALPACA_API_SECRET must be set in environment"), - }, - s3_client, - bucket_name, - } - } -} #[tokio::main] async fn main() { @@ -74,14 +11,7 @@ async fn main() { .with(tracing_subscriber::fmt::layer()) .init(); - let state = AppState::from_env().await; - - let app = Router::new() - .route("/health", get(health::check)) - .merge(equity::router()) - .with_state(state) - .layer(TraceLayer::new_for_http()); - + let app = create_app().await; let listener = tokio::net::TcpListener::bind("0.0.0.0:9000").await.unwrap(); axum::serve(listener, app).await.unwrap(); diff --git a/applications/datamanager/src/routes/equity.rs b/applications/datamanager/src/routes/equity.rs index bf5ad035a..08a0738bf 100644 --- a/applications/datamanager/src/routes/equity.rs +++ b/applications/datamanager/src/routes/equity.rs @@ -1,22 +1,21 @@ -use tracing::{debug, info}; - +use crate::AppState; +use aws_credential_types::provider::ProvideCredentials; use aws_sdk_s3::primitives::ByteStream; -use chrono::NaiveDate; -use duckdb::{Connection, Result as DuckResult}; -use polars::prelude::ParquetWriter; -use polars::prelude::*; - -use std::io::Cursor; - use axum::{ - Router, + body::Body, extract::{Json, State}, - http::StatusCode, + http::{header, StatusCode}, response::{IntoResponse, Response}, routing::{get, post}, + Router, }; - -use crate::AppState; +use chrono::NaiveDate; +use chrono::Utc; +use duckdb::Connection; +use polars::prelude::ParquetWriter; +use polars::prelude::*; +use std::io::Cursor; +use tracing::{debug, info}; #[derive(serde::Deserialize)] struct DailySync { @@ -48,9 +47,11 @@ struct BarResult { #[derive(serde::Deserialize, Debug)] struct PolygonResponse { adjusted: bool, - queryCount: u64, + #[serde(rename = "queryCount")] + query_count: u64, request_id: String, - resultsCount: u64, + #[serde(rename = "resultsCount")] + results_count: u64, status: String, results: Option>, } @@ -109,9 +110,152 @@ async fn upload_dataframe_to_s3( } } -async fn fetch(State(_state): State) -> Response { - info!("hello"); - (StatusCode::OK).into_response() +async fn fetch(State(state): State, query: Option>) -> Response { + info!("Fetching equity data from S3 partitioned files"); + + match query_s3_parquet_data(&state, query).await { + Ok(parquet_data) => { + let mut response = Response::new(Body::from(parquet_data)); + response.headers_mut().insert( + header::CONTENT_TYPE, + "application/octet-stream".parse().unwrap(), + ); + response.headers_mut().insert( + "Content-Disposition", + "attachment; filename=\"equity_data.parquet\"" + .parse() + .unwrap(), + ); + *response.status_mut() = StatusCode::OK; + response + } + Err(err) => { + info!("Failed to query S3 data: {}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Query failed: {}", err), + ) + .into_response() + } + } +} + +async fn query_s3_parquet_data( + state: &AppState, + query: Option>, +) -> Result, String> { + let conn = Connection::open_in_memory() + .map_err(|e| format!("Failed to create DuckDB connection: {}", e))?; + + conn.execute_batch("INSTALL httpfs; LOAD httpfs;") + .map_err(|e| format!("Failed to load httpfs extension: {}", e))?; + + let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; + let provider = config + .credentials_provider() + .ok_or_else(|| "No AWS credentials provider available".to_string())?; + let credentials = provider + .provide_credentials() + .await + .map_err(|e| format!("Failed to get AWS credentials: {}", e))?; + + let s3_config = format!( + " + SET s3_region='us-east-1'; + SET s3_url_style='path'; + SET s3_access_key_id='{}'; + SET s3_secret_access_key='{}'; + ", + credentials.access_key_id(), + credentials.secret_access_key() + ); + + conn.execute_batch(&s3_config) + .map_err(|e| format!("Failed to configure S3 settings: {}", e))?; + + let (start_date, end_date) = match query { + Some(q) => (q.start_date, q.end_date), + None => { + let end_date = chrono::Utc::now().naive_utc().date(); + let start_date = end_date - chrono::Duration::days(7); + (start_date, end_date) + } + }; + + info!("Querying data from {} to {}", start_date, end_date); + + let mut s3_paths = Vec::new(); + let mut current_date = start_date; + + while current_date <= end_date { + let year = current_date.format("%Y"); + let month = current_date.format("%m"); + let day = current_date.format("%d"); + + let s3_path = format!( + "s3://{}/equity/bars/daily/year={}/month={}/day={}/data.parquet", + state.bucket_name, year, month, day + ); + s3_paths.push(s3_path); + + current_date = current_date + chrono::Duration::days(1); + } + + if s3_paths.is_empty() { + return Err("No files to query for the given date range".to_string()); + } + + info!("Querying {} S3 files", s3_paths.len()); + + let s3_paths_str = s3_paths + .iter() + .map(|path| format!("SELECT * FROM '{}'", path)) + .collect::>() + .join(" UNION ALL "); + + // Create a temporary parquet file path + let temp_file = format!( + "/tmp/query_result_{}.parquet", + Utc::now().timestamp_micros() + ); + + let export_sql = format!( + " + COPY ( + SELECT + ticker, + timestamp, + open_price, + high_price, + low_price, + close_price, + volume, + volume_weighted_average_price, + transactions + FROM ({}) + ORDER BY timestamp, ticker + ) TO '{}' (FORMAT PARQUET) + ", + s3_paths_str, temp_file + ); + + debug!("Executing export SQL: {}", export_sql); + + conn.execute(&export_sql, []) + .map_err(|e| format!("Failed to execute parquet export: {}", e))?; + + let parquet_data = + std::fs::read(&temp_file).map_err(|e| format!("Failed to read parquet file: {}", e))?; + + if let Err(e) = std::fs::remove_file(&temp_file) { + info!("Failed to clean up temp file {}: {}", temp_file, e); + } + + info!( + "Query exported {} bytes of parquet data", + parquet_data.len() + ); + Ok(parquet_data) } async fn sync(State(state): State, payload: Json) -> impl IntoResponse { @@ -203,13 +347,13 @@ async fn sync(State(state): State, payload: Json) -> impl I let df_result = df! { "ticker" => tickers, - "volume" => volumes, - "vwap" => vw_prices, - "open" => open_prices, - "close" => close_prices, - "high" => high_prices, - "low" => low_prices, "timestamp" => timestamps, + "open_price" => open_prices, + "high_price" => high_prices, + "low_price" => low_prices, + "close_price" => close_prices, + "volume" => volumes, + "volume_weighted_average_price" => vw_prices, "transactions" => num_transactions, }; diff --git a/applications/datamanager/tests/integration_tests.rs b/applications/datamanager/tests/integration_tests.rs new file mode 100644 index 000000000..87ced9420 --- /dev/null +++ b/applications/datamanager/tests/integration_tests.rs @@ -0,0 +1,315 @@ +use axum::body::Body; +use axum::http::{Request, StatusCode}; +use http_body_util::BodyExt; +use serde_json::json; +use tokio::net::TcpListener; +use tower::ServiceExt; + +#[tokio::test] +async fn test_health_endpoint() { + let app = datamanager::create_app().await; + + let request = Request::builder() + .uri("/health") + .body(Body::empty()) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + + let body = response.into_body().collect().await.unwrap().to_bytes(); + let body_str = String::from_utf8(body.to_vec()).unwrap(); + + // Health endpoint returns empty body with 200 status + assert_eq!(body_str, ""); +} + +#[tokio::test] +async fn test_equity_fetch_no_date_range() { + let app = datamanager::create_app().await; + + let request = Request::builder() + .method("GET") + .uri("/equity") + .body(Body::empty()) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + // Should return either success with data or error about missing files + // Both are acceptable for this test since we're testing the endpoint works + assert!( + response.status() == StatusCode::OK || + response.status() == StatusCode::INTERNAL_SERVER_ERROR + ); +} + +#[tokio::test] +async fn test_equity_fetch_with_date_range() { + let app = datamanager::create_app().await; + + let request_body = json!({ + "start_date": "2025-08-29", + "end_date": "2025-08-29" + }); + + let request = Request::builder() + .method("GET") + .uri("/equity") + .header("content-type", "application/json") + .body(Body::from(request_body.to_string())) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + // Should return either success with data or error about missing files + let status = response.status(); + assert!( + status == StatusCode::OK || + status == StatusCode::INTERNAL_SERVER_ERROR + ); + + let body = response.into_body().collect().await.unwrap().to_bytes(); + + // If successful, should be parquet binary data or error message + if status == StatusCode::OK { + // Should be binary parquet data, check it's not empty and starts with parquet magic bytes + assert!(!body.is_empty(), "Parquet response should not be empty"); + // Parquet files start with "PAR1" magic bytes + if body.len() >= 4 { + let magic = &body[0..4]; + assert_eq!(magic, b"PAR1", "Should be valid parquet data starting with PAR1"); + } + } else { + // For errors, try to convert to string to check error message + if let Ok(body_str) = String::from_utf8(body.to_vec()) { + assert!(body_str.contains("Query failed") || body_str.contains("404")); + } + } +} + +#[tokio::test] +async fn test_equity_sync_endpoint() { + let app = datamanager::create_app().await; + + // Test with a recent weekday date that should have market data + let request_body = json!({ + "date": "2024-08-29" + }); + + let request = Request::builder() + .method("POST") + .uri("/equity") + .header("content-type", "application/json") + .body(Body::from(request_body.to_string())) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + // Should return OK - either with data or with an explanation + assert_eq!(response.status(), StatusCode::OK); + + let body = response.into_body().collect().await.unwrap().to_bytes(); + let body_str = String::from_utf8(body.to_vec()).unwrap(); + + // Should either contain DataFrame info, market data, or no data message + assert!( + body_str.contains("DataFrame") || + body_str.contains("market data") || + body_str.contains("No market data") || + body_str.contains("results") + ); +} + +#[tokio::test] +async fn test_equity_sync_invalid_date() { + let app = datamanager::create_app().await; + + let request_body = json!({ + "date": "invalid-date" + }); + + let request = Request::builder() + .method("POST") + .uri("/equity") + .header("content-type", "application/json") + .body(Body::from(request_body.to_string())) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + // Should return error for invalid date format + assert_eq!(response.status(), StatusCode::UNPROCESSABLE_ENTITY); +} + +#[tokio::test] +async fn test_equity_sync_missing_date() { + let app = datamanager::create_app().await; + + let request_body = json!({}); + + let request = Request::builder() + .method("POST") + .uri("/equity") + .header("content-type", "application/json") + .body(Body::from(request_body.to_string())) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + // Should return error for missing date field + assert_eq!(response.status(), StatusCode::UNPROCESSABLE_ENTITY); +} + +#[tokio::test] +async fn test_nonexistent_endpoint() { + let app = datamanager::create_app().await; + + let request = Request::builder() + .uri("/nonexistent") + .body(Body::empty()) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + assert_eq!(response.status(), StatusCode::NOT_FOUND); +} + +#[tokio::test] +async fn test_equity_fetch_invalid_json() { + let app = datamanager::create_app().await; + + let request = Request::builder() + .method("GET") + .uri("/equity") + .header("content-type", "application/json") + .body(Body::from("invalid json")) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + // Should return error for invalid JSON (Axum returns 400 for malformed JSON) + assert_eq!(response.status(), StatusCode::BAD_REQUEST); +} + +#[tokio::test] +async fn test_equity_fetch_invalid_date_format() { + let app = datamanager::create_app().await; + + let request_body = json!({ + "start_date": "2025-13-32", // Invalid date + "end_date": "2025-08-29" + }); + + let request = Request::builder() + .method("GET") + .uri("/equity") + .header("content-type", "application/json") + .body(Body::from(request_body.to_string())) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + // Should return error for invalid date format + assert_eq!(response.status(), StatusCode::UNPROCESSABLE_ENTITY); +} + +#[tokio::test] +async fn test_full_workflow_sync_then_fetch() { + let app = datamanager::create_app().await; + + // First, try to sync data for a specific date + let sync_body = json!({ + "date": "2024-08-29" + }); + + let sync_request = Request::builder() + .method("POST") + .uri("/equity") + .header("content-type", "application/json") + .body(Body::from(sync_body.to_string())) + .unwrap(); + + let sync_response = app.clone().oneshot(sync_request).await.unwrap(); + assert_eq!(sync_response.status(), StatusCode::OK); + + // Then try to fetch data for the same date + let fetch_body = json!({ + "start_date": "2024-08-29", + "end_date": "2024-08-29" + }); + + let fetch_request = Request::builder() + .method("GET") + .uri("/equity") + .header("content-type", "application/json") + .body(Body::from(fetch_body.to_string())) + .unwrap(); + + let fetch_response = app.oneshot(fetch_request).await.unwrap(); + + // Fetch should work if sync was successful + let fetch_status = fetch_response.status(); + assert!( + fetch_status == StatusCode::OK || + fetch_status == StatusCode::INTERNAL_SERVER_ERROR + ); +} + +// Test spawning server on a separate thread and making HTTP requests +#[tokio::test] +async fn test_server_spawn_integration() { + // Find an available port + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let port = addr.port(); + + // Spawn the server in a separate task + let server_handle = tokio::spawn(async move { + let app = datamanager::create_app().await; + axum::serve(listener, app).await.unwrap(); + }); + + // Wait a moment for server to start + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Make HTTP requests to the spawned server + let client = reqwest::Client::new(); + + // Test health endpoint + let health_response = client + .get(&format!("http://127.0.0.1:{}/health", port)) + .send() + .await + .unwrap(); + + assert_eq!(health_response.status(), 200); + // Health endpoint returns empty body + assert_eq!(health_response.text().await.unwrap(), ""); + + // Test equity endpoint with date range + let equity_response = client + .get(&format!("http://127.0.0.1:{}/equity", port)) + .json(&json!({ + "start_date": "2025-08-29", + "end_date": "2025-08-29" + })) + .send() + .await + .unwrap(); + + // Should get some response (either success or expected error) + assert!(equity_response.status().as_u16() >= 200 && equity_response.status().as_u16() < 600); + + // If successful, should return binary parquet data + if equity_response.status().is_success() { + let content_type = equity_response.headers().get("content-type"); + if let Some(ct) = content_type { + assert_eq!(ct, "application/octet-stream"); + } + } + + // Clean up by aborting the server + server_handle.abort(); +} \ No newline at end of file