Update Pulumi infrastructure to build Docker images#547
Conversation
|
Warning Rate limit exceeded@chrisaddy has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 25 minutes and 2 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (13)
WalkthroughThis update introduces new cloud-native, partition-aware data management and testing infrastructure for the datamanager application. It adds Flyte-based workflow automation, Docker Compose integration, BDD test suites, and robust configuration models. The refactor replaces legacy code with asynchronous FastAPI endpoints, Google Cloud Storage integration, and Polars/DuckDB-based data processing. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant FastAPI
participant DuckDB
participant GCS
participant PolygonAPI
Client->>FastAPI: GET /equity-bars?start_date&end_date
FastAPI->>DuckDB: Query Parquet files on GCS via httpfs
DuckDB->>GCS: Read partitioned Parquet files
DuckDB-->>FastAPI: Return filtered data
FastAPI-->>Client: Respond with Arrow IPC or 404
Client->>FastAPI: POST /equity-bars { date }
FastAPI->>PolygonAPI: Fetch daily bars JSON
FastAPI->>Polars: Convert JSON to DataFrame, add partitions
FastAPI->>GCS: Write Parquet partitioned by date
FastAPI-->>Client: Respond with summary
Client->>FastAPI: DELETE /equity-bars { date }
FastAPI->>GCS: Delete Parquet files for date prefix
FastAPI-->>Client: Respond 204 or 404
sequenceDiagram
participant FlyteWorkflow
participant FastAPI
loop For each date in [start_date, end_date]
FlyteWorkflow->>FastAPI: POST /equity-bars { date }
FastAPI-->>FlyteWorkflow: Return count or raise error
end
FlyteWorkflow-->>FlyteWorkflow: Aggregate counts and return
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
infrastructure/images.py (2)
12-26: Consider parameterizing the Docker Hub organization name.The Docker Hub organization name "pocketsizefund" is hardcoded. Consider making it configurable via Pulumi config to improve flexibility.
config = Config() docker_username = config.require_secret("dockerhub_username") docker_password = config.require_secret("dockerhub_password") image_tag = config.get("image_tag") or pulumi.get_stack() +docker_org = config.get("docker_org") or "pocketsizefund" images = {} for dockerfile in glob(os.path.join("..", "application", "*", "Dockerfile")): service_name = os.path.basename(os.path.dirname(dockerfile)) images[service_name] = docker.Image( f"{service_name}-image", build=docker.DockerBuild( context="..", dockerfile=f"application/{service_name}/Dockerfile" ), - image_name=f"docker.io/pocketsizefund/{service_name}:{image_tag}", + image_name=f"docker.io/{docker_org}/{service_name}:{image_tag}", registry=docker.RegistryArgs( server="docker.io", username=docker_username, password=docker_password, ), )
1-27: Consider exporting image URIs for other consumers.For improved usability, consider exporting the full image URIs as Pulumi outputs, which would make them accessible to other stacks or CI/CD systems.
import os from glob import glob import pulumi import pulumi_docker as docker from pulumi import Config +from pulumi import export config = Config() docker_username = config.require_secret("dockerhub_username") docker_password = config.require_secret("dockerhub_password") image_tag = config.get("image_tag") or pulumi.get_stack() images = {} +image_uris = {} for dockerfile in glob(os.path.join("..", "application", "*", "Dockerfile")): service_name = os.path.basename(os.path.dirname(dockerfile)) + image_uri = f"docker.io/pocketsizefund/{service_name}:{image_tag}" + image_uris[service_name] = image_uri images[service_name] = docker.Image( f"{service_name}-image", build=docker.DockerBuild( context="..", dockerfile=f"application/{service_name}/Dockerfile" ), - image_name=f"docker.io/pocketsizefund/{service_name}:{image_tag}", + image_name=image_uri, registry=docker.RegistryArgs( server="docker.io", username=docker_username, password=docker_password, ), ) + +# Export image URIs as stack outputs +export('image_uris', image_uris)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (4)
infrastructure/__main__.py(1 hunks)infrastructure/cloud_run.py(2 hunks)infrastructure/images.py(1 hunks)infrastructure/pyproject.toml(1 hunks)
🔇 Additional comments (5)
infrastructure/__main__.py (1)
3-3: LGTM! Import properly added.The import statement for the new
imagesmodule has been correctly added. The# noqa: F401comment is appropriate, as it acknowledges that while the import isn't directly used in this file, it's necessary to ensure the Pulumi Docker image resources are loaded during execution.infrastructure/pyproject.toml (1)
10-10: LGTM! New dependency properly added.The
pulumi-dockerdependency with a minimum version of 3.0.0 has been correctly added to support the Docker image building and publishing functionality implemented in the newimages.pymodule.infrastructure/cloud_run.py (2)
7-7: LGTM! Import correctly added.The import for the
imagesmodule has been properly added to access the Docker images built and published by the Pulumi infrastructure.
23-23: LGTM! Image reference updated to use dynamically built image.The container image reference has been properly updated to use the dynamically built and published Docker image from the
imagesmodule, replacing the previously hardcoded image reference.infrastructure/images.py (1)
1-11: LGTM! Module initialization with proper configuration.The imports and configuration setup look good. Using secrets for Docker Hub credentials is a security best practice, and defaulting to the Pulumi stack name for the image tag is a sensible approach.
Graphite Automations"Assign author to pull request" took an action on this PR • (05/21/25)1 assignee was added to this PR based on John Forstmeier's automation. |
forstmeier
left a comment
There was a problem hiding this comment.
CodeRabbit suggestion is good.
cleanups from PR
ad366d2 to
d800417
Compare
There was a problem hiding this comment.
Caution
Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments. If you are seeing this consistently it is likely a permissions issue. Please check "Moderation" -> "Code review limits" under your organization settings.
Actionable comments posted: 10
🔭 Outside diff range comments (1)
application/datamanager/features/equity_bars.feature (1)
24-25:⚠️ Potential issueIncomplete "Skip weekends" scenario needs implementation
The "Skip weekends" scenario is incomplete - it has a title but no steps defined. This should either be completed or removed before merging.
Complete the scenario with appropriate steps or remove it if it's not ready for implementation:
- Scenario Outline: Skip weekends +# TODO: Implement weekend skipping scenario +# Scenario Outline: Skip weekends +# Given I have weekend dates: +# | start_date | end_date | +# | <start_date> | <end_date> | +# When I send a POST request to "/equity-bars" for date range +# Then the response status code should be 200 +# And the response should indicate weekends were skipped
🧹 Nitpick comments (9)
workflows/backfill_datamanager.py (2)
8-12: Good use of task retries but could benefit from more robust error handlingThe Flyte task includes retries which is excellent for resilience. However, consider adding more detailed error handling and logging to help diagnose failures.
@task(retries=3) def backfill_single_date(base_url: str, day: date) -> int: - response = httpx.post(f"{base_url}/equity-bars", json={"date": day.isoformat()}) - response.raise_for_status() - return response.json().get("count", 0) + try: + response = httpx.post( + f"{base_url}/equity-bars", + json={"date": day.isoformat()}, + timeout=30.0 + ) + response.raise_for_status() + return response.json().get("count", 0) + except httpx.HTTPStatusError as e: + print(f"HTTP error for {day}: {e.response.status_code} - {e.response.text}") + raise + except httpx.RequestError as e: + print(f"Request error for {day}: {str(e)}") + raise
15-22: Sequential processing might be inefficient for large date rangesThe workflow processes dates sequentially, which could be inefficient for large date ranges. Consider adding parallelism options for better performance with extended date ranges.
@workflow def backfill_equity_bars(base_url: str, start_date: date, end_date: date) -> List[int]: results: List[int] = [] current = start_date + # For large date ranges, consider using map_task for parallel execution + # dates = [start_date + timedelta(days=i) for i in range((end_date - start_date).days + 1)] + # return map_task(backfill_single_date)(base_url=[base_url]*len(dates), day=dates) while current <= end_date: results.append(backfill_single_date(base_url=base_url, day=current)) current += timedelta(days=1) return resultsapplication/datamanager/src/datamanager/models.py (2)
24-35: DateRange model lacks to_payload method for consistencyThe
DateRangemodel in positionmanager includes ato_payloadmethod that's missing here. Consider adding this method for consistency across services.class DateRange(BaseModel): start: datetime.date end: datetime.date @field_validator("end") @classmethod def check_end_after_start(cls, end_value, info): start_value = info.data.get("start") if start_value and end_value <= start_value: raise ValueError("End date must be after start date.") return end_value + + def to_payload(self) -> dict[str, str]: + return { + "start_date": self.start.isoformat(), + "end_date": self.end.isoformat(), + }
37-39: Consider adding validation for BarsSummary date fieldThe
BarsSummaryclass has a string date field but lacks validation to ensure it follows a consistent date format. Consider adding validation or using a datetime.date type with appropriate serialization.class BarsSummary(BaseModel): - date: str + date: datetime.date count: int + + model_config = {"json_encoders": {datetime.date: lambda d: d.strftime("%Y-%m-%d")}}.mise.toml (3)
80-85: Expose the API on a configurable host/port for local dev.
uvicornis started without the--host/--portflags, so it will default to127.0.0.1:8000.
Most Cloud Run-bound services listen on0.0.0.0:${PORT:-8080}. Without mirroring this locally you risk “works-on-my-machine” bugs (e.g. container health-checks failing).-cd application/{{arg(name="service_name")}} -uv run uvicorn src.{{arg(name="service_name")}}.main:application --reload +cd application/{{arg(name="service_name")}} +uv run uvicorn \ + src.{{arg(name="service_name")}}.main:application:application \ + --reload \ + --host 0.0.0.0 \ + --port ${PORT:-8080}
90-92: Use the new Docker CLI syntax to avoid deprecation warnings.
docker-compose(with a hyphen) is now superseded bydocker compose(space). Newer Docker releases emit deprecation notices; switching avoids noise in CI logs.-docker-compose up --build --abort-on-container-exit --remove-orphans +docker compose up --build --abort-on-container-exit --remove-orphans
52-58: Missing dependency chain can lead to stale test images.
python:test:behavejumps straight tobehavewithout guaranteeing the service image is (re)built first. Consider depending onapplication:service:build(or the compose test task) to ensure the freshest image is under test.[tasks."python:test:behave"] depends = ["application:service:build"] # <- newapplication/datamanager/features/steps/equity_bars_steps.py (1)
31-38: Minor: function name typo.
step_imp_get_requestis missing the “l” inimpl. Harmless at runtime, but it breaks naming consistency and hampers discoverability.application/datamanager/src/datamanager/main.py (1)
141-147: Preserve the original traceback when re-raising.Raising a new
HTTPExceptioninsideexceptloses the root cause. Use exception chaining as Ruff hints (B904):-raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to write data", -) +raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to write data", +) from e🧰 Tools
🪛 Ruff (0.11.9)
144-147: Within an
exceptclause, raise exceptions withraise ... from errorraise ... from Noneto distinguish them from errors in exception handling(B904)
🛑 Comments failed to post (10)
application/datamanager/features/steps/health_steps.py (2)
8-8:
⚠️ Potential issueFix variable naming inconsistency.
There's a mismatch between the variable names: environment.py sets
context.base_urlbut this file usescontext.api_url.def step_impl(context, endpoint): """Send a GET request to the specified endpoint.""" - url = f"{context.api_url}{endpoint}" + url = f"{context.base_url}{endpoint}" context.response = requests.get(url)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.def step_impl(context, endpoint): """Send a GET request to the specified endpoint.""" url = f"{context.base_url}{endpoint}" context.response = requests.get(url)🤖 Prompt for AI Agents
In application/datamanager/features/steps/health_steps.py at line 8, the variable context.api_url is used but the correct variable set in environment.py is context.base_url. Replace context.api_url with context.base_url in the URL construction to fix the naming inconsistency.
6-9: 🛠️ Refactor suggestion
Add error handling and timeout to the request.
The current implementation lacks error handling and timeout settings for the HTTP request, which could cause test failures or hangs if the service is unavailable or slow to respond.
@when('I send a GET request to "{endpoint}"') def step_impl(context, endpoint): """Send a GET request to the specified endpoint.""" url = f"{context.api_url}{endpoint}" - context.response = requests.get(url) + try: + context.response = requests.get(url, timeout=10) + except requests.exceptions.RequestException as e: + context.response = None + context.request_error = str(e)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.@when('I send a GET request to "{endpoint}"') def step_impl(context, endpoint): """Send a GET request to the specified endpoint.""" url = f"{context.api_url}{endpoint}" try: context.response = requests.get(url, timeout=10) except requests.exceptions.RequestException as e: context.response = None context.request_error = str(e)🤖 Prompt for AI Agents
In application/datamanager/features/steps/health_steps.py around lines 6 to 9, the GET request lacks error handling and timeout, risking test failures or hangs if the service is down or slow. Modify the requests.get call to include a timeout parameter and wrap it in a try-except block to catch exceptions like requests.exceptions.RequestException. On exception, handle the error appropriately, such as logging it or setting a failure state in the context.application/datamanager/features/environment.py (1)
4-6:
⚠️ Potential issueFix variable naming inconsistency with step definitions.
There's a mismatch between variable names: this file sets
context.base_urlbut health_steps.py usescontext.api_url.def before_all(context): """Set up test environment.""" - context.base_url = os.environ.get("BASE_URL", "http://datamanager:8000") + context.api_url = os.environ.get("BASE_URL", "http://datamanager:8000") + print(f"Using API URL: {context.api_url}")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.def before_all(context): """Set up test environment.""" context.api_url = os.environ.get("BASE_URL", "http://datamanager:8000") print(f"Using API URL: {context.api_url}")🤖 Prompt for AI Agents
In application/datamanager/features/environment.py around lines 4 to 6, the variable name set on the context is `base_url` but the step definitions in health_steps.py expect `api_url`. Rename `context.base_url` to `context.api_url` to ensure consistency and proper variable usage across the test steps.application/datamanager/Dockerfile (1)
15-15: 💡 Verification agent
🧩 Analysis chain
Port changed from 8000 to 8080
The port change to 8080 aligns with Cloud Run's default port. Note that any existing services communicating with this container will need to be updated accordingly.
🏁 Script executed:
#!/bin/bash # Check if there are any references to port 8000 in the codebase that might need updating rg -g '!Dockerfile' '8000' --context 2 | grep -v '8080'Length of output: 2672
Inconsistent port referenced in test environment
The Dockerfile’s exposed port was updated to 8080, but your test setup still defaults to port 8000. Update the BASE_URL default in
application/datamanager/features/environment.pyso tests target the new port:--- a/application/datamanager/features/environment.py +++ b/application/datamanager/features/environment.py @@ def before_all(context): - context.base_url = os.environ.get("BASE_URL", "http://datamanager:8000") + context.base_url = os.environ.get("BASE_URL", "http://datamanager:8080")No other occurrences of port 8000 were found outside lock files.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.def before_all(context): context.base_url = os.environ.get("BASE_URL", "http://datamanager:8080")🤖 Prompt for AI Agents
In application/datamanager/Dockerfile at line 15, the exposed port was changed from 8000 to 8080 to match Cloud Run's default. To fix the inconsistency, update the BASE_URL default port in application/datamanager/features/environment.py from 8000 to 8080 so that the test environment targets the correct port. This ensures all references to the service port are consistent across the codebase.application/datamanager/src/datamanager/models.py (1)
5-21: 🛠️ Refactor suggestion
SummaryDate model uses deprecated validator decorator
The
SummaryDateclass uses the deprecated@validatordecorator instead of the newer@field_validatorwhich is used elsewhere in the file. For consistency and forward compatibility, update to the newer decorator.class SummaryDate(BaseModel): date: datetime.date = Field( default_factory=lambda: datetime.datetime.utcnow().date() ) - @validator("date", pre=True) - def parse_date(cls, value): + @field_validator("date", mode="before") + @classmethod + def parse_date(cls, value): if isinstance(value, datetime.date): return value for fmt in ("%Y-%m-%d", "%Y/%m/%d"): try: return datetime.datetime.strptime(value, fmt).date() except ValueError: continue raise ValueError("Invalid date format: expected YYYY-MM-DD or YYYY/MM/DD") model_config = {"json_encoders": {datetime.date: lambda d: d.strftime("%Y/%m/%d")}}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.class SummaryDate(BaseModel): date: datetime.date = Field( default_factory=lambda: datetime.datetime.utcnow().date() ) @field_validator("date", mode="before") @classmethod def parse_date(cls, value): if isinstance(value, datetime.date): return value for fmt in ("%Y-%m-%d", "%Y/%m/%d"): try: return datetime.datetime.strptime(value, fmt).date() except ValueError: continue raise ValueError("Invalid date format: expected YYYY-MM-DD or YYYY/MM/DD") model_config = {"json_encoders": {datetime.date: lambda d: d.strftime("%Y/%m/%d")}}🤖 Prompt for AI Agents
In application/datamanager/src/datamanager/models.py between lines 5 and 21, replace the deprecated @validator decorator on the parse_date method with the newer @field_validator decorator to ensure consistency and forward compatibility. Update the decorator usage accordingly, maintaining the same validation logic and parameters.application/datamanager/features/steps/equity_bars_steps.py (2)
57-63:
⚠️ Potential issueDeletion assertion is a no-op for GCS and path mismatch locally.
- When
GCP_GCS_BUCKETis set the test doesassert True, so any DELETE response will pass even if data remains.- The local fallback checks for
equity_bars_{date}.parquet, but the API writes partitioned files underequity/bars/year=YYYY/month=MM/day=DD/….Consider:
if os.environ.get("GCP_GCS_BUCKET"): client = storage.Client() bucket = client.bucket(os.environ["GCP_GCS_BUCKET"]) blobs = list(bucket.list_blobs(prefix=f"equity/bars/year={date.year}/month={date.month}/day={date.day}")) assert not blobs, f"Blobs still present for {date_str}" else: expected_dir = Path(f"equity/bars/year={date.year}/month={date.month}/day={date.day}") assert not expected_dir.exists(), f"{expected_dir} still exists"🤖 Prompt for AI Agents
In application/datamanager/features/steps/equity_bars_steps.py around lines 57 to 63, the deletion assertion is ineffective because it uses a no-op assert for GCP and checks the wrong local path. Replace the GCP check to instantiate a storage client, list blobs with the correct prefix for the date partition, and assert that no blobs remain. For the local fallback, check the existence of the partitioned directory path matching the date (equity/bars/year=YYYY/month=MM/day=DD) and assert it does not exist.
12-16:
⚠️ Potential issueLoop overwrites the range; only the last row is preserved.
If the
.featuretable supplies multiple ranges you will quietly lose all but the final one:for row in context.table: context.start_date = ... context.end_date = ...Either accumulate into a list or assert the table contains exactly one row.
-for row in context.table: - context.start_date = row["start_date"] - context.end_date = row["end_date"] +if len(context.table) != 1: + raise AssertionError("Expected a single date range, got %s rows" % len(context.table)) +row = context.table[0] +context.start_date, context.end_date = row["start_date"], row["end_date"]🤖 Prompt for AI Agents
In application/datamanager/features/steps/equity_bars_steps.py around lines 12 to 16, the loop overwrites context.start_date and context.end_date for each row, so only the last row's values are kept. To fix this, either accumulate all date ranges into a list stored in context (e.g., context.date_ranges) or add an assertion to ensure context.table has exactly one row before assigning start_date and end_date. This prevents silent data loss when multiple rows are present.application/datamanager/src/datamanager/main.py (3)
48-53: 🛠️ Refactor suggestion
Potential connection-sharing race condition.
duckdb.connect()returns a connection that is not thread-safe. FastAPI’s default worker model (uvicorn) serves requests concurrently, so two requests could mutate/read the same connection simultaneously, corrupting state.Options:
- Open a fresh connection per request in a dependency.
- Use
duckdb.connect(":memory:", read_only=True)clones.- Protect calls with an
asyncio.Lock().Recommend moving connection creation into a request-scoped dependency unless there is a strong performance reason not to.
🤖 Prompt for AI Agents
In application/datamanager/src/datamanager/main.py around lines 48 to 53, the current code creates a single DuckDB connection stored in app.state, which is not thread-safe and can cause race conditions under concurrent FastAPI requests. To fix this, refactor the code to create a new DuckDB connection per request by defining a FastAPI dependency that opens and yields a fresh connection for each request, ensuring no shared mutable state. Replace usage of the global connection with this dependency injection to avoid concurrent access issues.
45-59:
⚠️ Potential issueFail fast when GCS credentials are missing.
DUCKDB_ACCESS_KEY/DUCKDB_SECRETmay beNone, leading toCREATE SECRETwith literal"None"and obscure connection errors. Validate first:-DUCKDB_ACCESS_KEY = os.getenv("DUCKDB_ACCESS_KEY") -DUCKDB_SECRET = os.getenv("DUCKDB_SECRET") +DUCKDB_ACCESS_KEY = os.getenv("DUCKDB_ACCESS_KEY") +DUCKDB_SECRET = os.getenv("DUCKDB_SECRET") + +if not DUCKDB_ACCESS_KEY or not DUCKDB_SECRET: + raise RuntimeError("DUCKDB_ACCESS_KEY and DUCKDB_SECRET must be set")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.DUCKDB_ACCESS_KEY = os.getenv("DUCKDB_ACCESS_KEY") DUCKDB_SECRET = os.getenv("DUCKDB_SECRET") if not DUCKDB_ACCESS_KEY or not DUCKDB_SECRET: raise RuntimeError("DUCKDB_ACCESS_KEY and DUCKDB_SECRET must be set") app.state.connection = duckdb.connect() app.state.connection.execute(f""" INSTALL httpfs; LOAD httpfs; SET http_keep_alive=true; SET http_timeout=30000; CREATE SECRET ( TYPE GCS, KEY_ID '{DUCKDB_ACCESS_KEY}', SECRET '{DUCKDB_SECRET}' ); """)🤖 Prompt for AI Agents
In application/datamanager/src/datamanager/main.py around lines 45 to 59, the environment variables DUCKDB_ACCESS_KEY and DUCKDB_SECRET are used directly without validation, which can cause the CREATE SECRET command to use "None" as credentials leading to obscure errors. Add a check after retrieving these variables to verify they are not None, and if either is missing, raise an explicit error or exit the application immediately to fail fast and provide clear feedback about missing credentials.
18-35:
⚠️ Potential issueReturn-type annotation is incorrect.
bars_queryconstructs a SQL string but is annotated as returningpl.DataFrame, which misleads IDEs and static type checkers.-def bars_query(*, bucket: str, start_date: date, end_date: date) -> pl.DataFrame: +def bars_query(*, bucket: str, start_date: date, end_date: date) -> str:📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.def bars_query(*, bucket: str, start_date: date, end_date: date) -> str: path_pattern = f"gs://{bucket}/equity/bars/*/*/*/*" return f""" SELECT * FROM read_parquet( '{path_pattern}', HIVE_PARTITIONING=1 ) WHERE (year > {start_date.year} OR (year = {start_date.year} AND month > {start_date.month}) OR (year = {start_date.year} AND month = {start_date.month} AND day >= {start_date.day})) AND (year < {end_date.year} OR (year = {end_date.year} AND month < {end_date.month}) OR (year = {end_date.year} AND month = {end_date.month} AND day <= {end_date.day})) """🤖 Prompt for AI Agents
In application/datamanager/src/datamanager/main.py around lines 18 to 35, the function bars_query is annotated to return pl.DataFrame but actually returns a SQL query string. Change the return type annotation to str to accurately reflect that the function returns a SQL string, not a DataFrame.
…-using-datamanager Update datamanager backfill workflow fixes
d800417 to
55681d5
Compare
forstmeier
left a comment
There was a problem hiding this comment.
I think this is good. There's lots of duplicate code I've seen in the PRs so I'm not sold on whatever Git flow/tool this is.
Overview
Changes
application/*/Dockerfileusing Pulumipulumi-dockerdependencyComments
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Refactor
Chores