Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions applications/data_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ path = "src/main.rs"
[dependencies]
axum = "0.8.4"
chrono = { version = "0.4.44", features = ["serde"] }
chrono-tz = "0.10"
polars = { version = "0.51.0", features = [
"json",
"lazy",
Expand Down
227 changes: 105 additions & 122 deletions applications/data_manager/src/equity_bars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use axum::{
response::{IntoResponse, Response},
};
use chrono::{DateTime, Datelike, Utc, Weekday};
use chrono_tz::US::Eastern;
use polars::prelude::*;
use serde::Deserialize;
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -117,100 +118,68 @@ pub async fn query(
}
}

pub async fn sync(
AxumState(state): AxumState<State>,
Json(payload): Json<DailySync>,
) -> impl IntoResponse {
info!("Sync date: {}", payload.date);

let weekday = payload.date.weekday();
if weekday == Weekday::Sat || weekday == Weekday::Sun {
info!("Skipping weekend date: {}", payload.date.format("%Y-%m-%d"));
return (
StatusCode::OK,
"Skipping weekend, no trading data available",
)
.into_response();
}

pub async fn fetch_and_store(
state: &State,
date: &DateTime<Utc>,
) -> Result<Option<String>, String> {
let massive_api_key = state.massive.key.clone();

let date = payload.date.format("%Y-%m-%d").to_string();
let date_str = date.with_timezone(&Eastern).format("%Y-%m-%d").to_string();
let url = format!(
"{}/v2/aggs/grouped/locale/us/market/stocks/{}",
state.massive.base, date
state.massive.base, date_str
);
Comment thread
forstmeier marked this conversation as resolved.

info!("url: {}", url);
info!("Sending request to Massive API");
let response = match state
let response = state
.http_client
.get(&url)
.header("accept", "application/json")
.query(&[("adjusted", "true"), ("apiKey", massive_api_key.as_str())])
.send()
.await
{
Ok(resp) => {
info!(
"Received response from Massive API, status: {}",
resp.status()
);
resp
}
Err(err) => {
.map_err(|err| {
warn!(
"Failed to send request to Massive API: {}",
err.without_url()
);
return (
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to send API request",
)
.into_response();
}
};
"Failed to send API request".to_string()
})?;

let text_content = match response.error_for_status() {
Ok(response) => match response.text().await {
Ok(text) => {
info!("Received response body, length: {} bytes", text.len());
text
}
Err(err) => {
warn!("Failed to read response text: {}", err);
return (
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to read API response",
)
.into_response();
}
},
Err(err) => {
info!(
"Received response from Massive API, status: {}",
response.status()
);

let text_content = response
.error_for_status()
.map_err(|err| {
warn!("API request failed with error status: {}", err);
return (StatusCode::INTERNAL_SERVER_ERROR, "API request failed").into_response();
}
};
"API request failed".to_string()
})?
.text()
.await
.map_err(|err| {
warn!("Failed to read response text: {}", err);
"Failed to read API response".to_string()
})?;

info!(
"Received response body, length: {} bytes",
text_content.len()
);
info!("Parsing JSON response");
let json_content: serde_json::Value = match serde_json::from_str(&text_content) {
Ok(value) => {
debug!("JSON parsed successfully");
value
}
Err(err) => {
warn!("Failed to parse JSON response: {}", err);
let truncated: String = text_content.chars().take(500).collect();
warn!("Raw response (first 500 chars): {}", truncated);
return (
StatusCode::INTERNAL_SERVER_ERROR,
"Invalid JSON response from API",
)
.into_response();
}
};

// Log the status field if present
let json_content: serde_json::Value = serde_json::from_str(&text_content).map_err(|err| {
warn!("Failed to parse JSON response: {}", err);
let truncated: String = text_content.chars().take(500).collect();
warn!("Raw response (first 500 chars): {}", truncated);
"Invalid JSON response from API".to_string()
})?;

debug!("JSON parsed successfully");

if let Some(status) = json_content.get("status") {
info!("API response status field: {}", status);
}
Expand All @@ -231,29 +200,22 @@ pub async fn sync(
.as_object()
.map(|o| o.keys().collect::<Vec<_>>())
);
return (
StatusCode::NO_CONTENT,
"No market data available for this date",
)
.into_response();
return Ok(None);
}
};

info!("Parsing results into BarResult structs");
let bars: Vec<BarResult> = match serde_json::from_value::<Vec<BarResult>>(results.clone()) {
Ok(bars) => {
info!("Successfully parsed {} bar results", bars.len());
bars
}
Err(err) => {
let bars: Vec<BarResult> =
serde_json::from_value::<Vec<BarResult>>(results.clone()).map_err(|err| {
warn!("Failed to parse results into BarResult structs: {}", err);
warn!("Results type: {:?}", results.as_array().map(|a| a.len()));
if let Some(first_result) = results.as_array().and_then(|a| a.first()) {
warn!("First result sample: {}", first_result);
}
return (StatusCode::BAD_GATEWAY, text_content).into_response();
}
};
"Failed to parse equity bar results".to_string()
})?;
Comment thread
forstmeier marked this conversation as resolved.
Comment thread
forstmeier marked this conversation as resolved.

info!("Successfully parsed {} bar results", bars.len());

let tickers: Vec<String> = bars.iter().map(|b| b.ticker.clone()).collect();
let volumes: Vec<Option<i64>> = bars
Expand Down Expand Up @@ -281,7 +243,8 @@ pub async fn sync(
let timestamps: Vec<i64> = bars.iter().map(|b| b.t as i64).collect();
let transactions: Vec<Option<u64>> = bars.iter().map(|b| b.n).collect();

let bars_data = df! {
info!("Creating DataFrame from bar data");
let data = df! {
"ticker" => tickers,
"timestamp" => timestamps,
"open_price" => open_prices,
Expand All @@ -291,46 +254,66 @@ pub async fn sync(
"volume" => volumes,
"volume_weighted_average_price" => volume_weighted_average_prices,
"transactions" => transactions,
};
}
.map_err(|err| {
warn!("Failed to create DataFrame: {}", err);
"Failed to create equity bars DataFrame".to_string()
})?;

info!("Creating DataFrame from bar data");
match bars_data {
Ok(data) => {
info!(
"Created DataFrame with {} rows and {} columns",
info!(
"Created DataFrame with {} rows and {} columns",
data.height(),
data.width()
);
debug!("DataFrame schema: {:?}", data.schema());

info!("Uploading DataFrame to S3");
let s3_key = write_equity_bars_dataframe_to_s3(state, &data, date)
.await
.map_err(|err| {
warn!(
"Failed to upload to S3: {}, rows: {}, columns: {}, date: {}",
err,
data.height(),
data.width()
data.width(),
date.with_timezone(&Eastern).format("%Y-%m-%d")
);
debug!("DataFrame schema: {:?}", data.schema());
format!(
"Failed to upload equity bars to storage for date {}",
date.with_timezone(&Eastern).format("%Y-%m-%d")
)
})?;

info!("Uploading DataFrame to S3");
match write_equity_bars_dataframe_to_s3(&state, &data, &payload.date).await {
Ok(s3_key) => {
info!("Successfully uploaded DataFrame to S3 at key: {}", s3_key);
let response_message = format!(
"DataFrame created with {} rows and uploaded to S3: {}",
data.height(),
s3_key
);
(StatusCode::OK, response_message).into_response()
}
Err(err) => {
warn!("Failed to upload to S3: {}", err);
let json_output = data.to_string();
(
StatusCode::BAD_GATEWAY,
format!(
"DataFrame created but S3 upload failed: {}\n\n{}",
err, json_output
),
)
.into_response()
}
}
}
Err(err) => {
warn!("Failed to create DataFrame: {}", err);
(StatusCode::INTERNAL_SERVER_ERROR, text_content).into_response()
info!("Successfully uploaded DataFrame to S3 at key: {}", s3_key);
Ok(Some(s3_key))
}

pub async fn sync(
AxumState(state): AxumState<State>,
Json(payload): Json<DailySync>,
) -> impl IntoResponse {
info!("Sync date: {}", payload.date);

let weekday = payload.date.weekday();
if weekday == Weekday::Sat || weekday == Weekday::Sun {
info!("Skipping weekend date: {}", payload.date.format("%Y-%m-%d"));
return (
StatusCode::OK,
"Skipping weekend, no trading data available",
)
.into_response();
}

match fetch_and_store(&state, &payload.date).await {
Ok(Some(s3_key)) => {
let response_message = format!("Data fetched and uploaded to S3: {}", s3_key);
(StatusCode::OK, response_message).into_response()
}
Ok(None) => (
StatusCode::NO_CONTENT,
"No market data available for this date",
)
.into_response(),
Err(error) => (StatusCode::INTERNAL_SERVER_ERROR, error).into_response(),
}
}
1 change: 1 addition & 0 deletions applications/data_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod health;
pub mod portfolios;
pub mod predictions;
pub mod router;
pub mod scheduler;
pub mod startup;
pub mod state;
pub mod storage;
Loading
Loading