Skip to content

Commit

Permalink
Refactor script to make public files
Browse files Browse the repository at this point in the history
  • Loading branch information
dfsnow committed Jan 31, 2025
1 parent 1d70a4e commit 2fa0c24
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 81 deletions.
133 changes: 91 additions & 42 deletions .github/workflows/create-public-files.yaml
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
---

name: create-public-files
run-name: create-public-files-${{ inputs.version }}
run-name: create-public-files-${{ inputs.version }}-${{ inputs.mode }}

on:
workflow_dispatch:
inputs:
# Input values must match those in params.yaml
dataset:
required: true
description: Comma-separated list of datasets
default: 'times,points,missing_pairs,metadata'
type: string

version:
required: true
description: Version of data
Expand All @@ -21,20 +14,40 @@ on:

mode:
required: true
description: Comma-separated list of travel modes
default: 'auto,bicycle,pedestrian'
description: Mode of travel
default: 'car'
type: choice
options:
- car
- bicycle
- foot

override_years:
required: false
description: |
Comma-separated list of OSM data years to run e.g. 2020,2023.
Will run all (see params.yaml) if null
type: string

year:
required: true
description: Comma-separated list of years
default: '2020,2021,2022,2023,2024'
override_states:
required: false
description: |
Comma-separated state codes to run e.g. 01,06.
Will run all (see params.yaml) if null
type: string

geography:
required: true
description: Comma-separated list of Census geographies
default: 'state,county,county_subdivision,tract,zcta'
override_datasets:
required: false
description: |
Comma-separated list datasets to publish e.g. times,metadata.
Will run all (see params.yaml) if null
type: string

override_geographies:
required: false
description: |
Comma-separated geographies to limit run e.g. county,tract.
Will run all (see params.yaml) if null
type: string

env:
Expand All @@ -48,35 +61,52 @@ jobs:
runs-on: ubuntu-24.04

outputs:
modes: ${{ steps.parse-inputs.outputs.modes }}
years: ${{ steps.parse-inputs.outputs.years }}
geographies: ${{ steps.parse-inputs.outputs.geographies }}
years: ${{ steps.create-year-jobs.outputs.param }}
states: ${{ steps.create-state-jobs.outputs.param }}

steps:
- name: Parse inputs
id: parse-inputs
shell: bash
run: |
echo "modes=$(echo '${{ inputs.mode }}' | jq -R -c 'split(",")')" >> "$GITHUB_OUTPUT"
echo "years=$(echo '${{ inputs.year }}' | jq -R -c 'split(",")')" >> "$GITHUB_OUTPUT"
echo "geographies=$(echo '${{ inputs.geography }}' | jq -R -c 'split(",")')" >> "$GITHUB_OUTPUT"
- name: Checkout
uses: actions/checkout@v4

- name: Create year jobs
id: create-year-jobs
uses: ./.github/actions/parse-gh-input
with:
param_path: '.input.year'
param_override: '${{ inputs.override_years }}'

- name: Create state jobs
id: create-state-jobs
uses: ./.github/actions/parse-gh-input
with:
param_path: '.input.state'
param_override: '${{ inputs.override_states }}'

create-files:
runs-on: ubuntu-24.04
needs: setup-jobs
name: create-file-${{ inputs.version }}-${{ matrix.mode }}-${{ matrix.year }}-${{ matrix.geography }}
strategy:
# Don't fail all chunks if one fails
fail-fast: false
matrix:
mode: ${{ fromJSON(needs.setup-jobs.outputs.modes) }}
year: ${{ fromJSON(needs.setup-jobs.outputs.years) }}
geography: ${{ fromJSON(needs.setup-jobs.outputs.geographies) }}
state: ${{ fromJSON(needs.setup-jobs.outputs.states) }}

steps:
- name: Checkout
uses: actions/checkout@v4

- name: Setup Cloudflare credentials
uses: ./.github/actions/setup-cloudflare-s3
with:
CLOUDFLARE_S3_API_ACCESS_KEY_ID: ${{ secrets.CLOUDFLARE_S3_API_ACCESS_KEY_ID }}
CLOUDFLARE_S3_API_SECRET_ACCESS_KEY: ${{ secrets.CLOUDFLARE_S3_API_SECRET_ACCESS_KEY }}

- name: Remove unnecessary software and increase swap space
uses: ./.github/actions/prep-disk-and-swap
with:
swap_override: 50000

- name: Install uv
uses: astral-sh/setup-uv@v5
with:
Expand All @@ -90,25 +120,44 @@ jobs:
id: install-python-dependencies
shell: bash
run: |
sudo apt-get install libgeos-dev
uv python install
uv venv
uv pip install ".[site,data]"
- name: Setup Cloudflare credentials
uses: ./.github/actions/setup-cloudflare-s3
- name: Create file jobs per dataset
id: create-dataset-jobs
uses: ./.github/actions/parse-gh-input
with:
CLOUDFLARE_S3_API_ACCESS_KEY_ID: ${{ secrets.CLOUDFLARE_S3_API_ACCESS_KEY_ID }}
CLOUDFLARE_S3_API_SECRET_ACCESS_KEY: ${{ secrets.CLOUDFLARE_S3_API_SECRET_ACCESS_KEY }}
param_path: '.output.dataset'
param_override: '${{ inputs.override_datasets }}'

- name: Create file jobs per geography
id: create-geo-jobs
uses: ./.github/actions/parse-gh-input
with:
param_path: '.input.census.geography.all'
param_override: '${{ inputs.override_geographies }}'

- name: Setup tmate session
uses: mxschmitt/action-tmate@v3
with:
limit-access-to-actor: true

- name: Create public files
id: create-public-files
working-directory: 'data'
shell: bash
run: |
datasets_parsed=($(echo "${{ inputs.dataset }}" | tr -d ' ' | tr ',' ' '))
for dataset in "${datasets_parsed[@]}"; do
uv run ./src/create_public_files.py \
--dataset "$dataset" --version ${{ inputs.version }} \
--mode ${{ matrix.mode }} --year ${{ matrix.year }} \
--geography ${{ matrix.geography }}
geographies='${{ steps.create-geo-jobs.outputs.param }}'
geographies_array=($(echo "$geographies" | jq -r '.[]'))
datasets='${{ steps.create-dataset-jobs.outputs.param }}'
datasets_array=($(echo "$datasets" | jq -r '.[]'))
for geo in "${geographies_array[@]}"; do
for dataset in "${datasets_array[@]}"; do
uv run ./src/create_public_files.py \
--dataset "$dataset" --version ${{ inputs.version }} \
--mode ${{ inputs.mode }} --year ${{ matrix.year }} \
--geography "$geo" --state ${{ matrix.state }}
done
done
5 changes: 1 addition & 4 deletions .github/workflows/update-data-site.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ name: update-data-site

on:
workflow_dispatch:
workflow_run:
workflows: [create-public-files]
types:
- completed

env:
AWS_DEFAULT_REGION: us-east-1
Expand Down Expand Up @@ -36,6 +32,7 @@ jobs:
id: install-python-dependencies
shell: bash
run: |
sudo apt-get install libgeos-dev
uv python install
uv venv
uv pip install ".[site,data]"
Expand Down
7 changes: 7 additions & 0 deletions data/params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,10 @@ output:
compression:
type: zstd
level: 12

# List of OpenTimes table names
dataset:
- times
- points
- missing_pairs
- metadata
75 changes: 40 additions & 35 deletions data/src/create_public_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def create_public_files(
mode: str,
year: str,
geography: str,
state: str,
) -> None:
"""
Janky function to pull data from the S3 output bucket and repartition it
Expand All @@ -27,12 +28,11 @@ def create_public_files(
mode: Travel mode, one of ['walk', 'bicycle', 'car', 'transit'].
year: Year of the data.
geography: Census geography of the data. See params.yaml for list.
state: State of the data.
"""
with open("params.yaml") as file:
params = yaml.safe_load(file)
states = params["input"]["state"]
con = create_duckdb_connection()
logger.info("Successfully connected to DuckDB")

# Check that the input strings are valid
datasets = list(DATASET_DICT[version].keys())
Expand All @@ -59,41 +59,40 @@ def create_public_files(
f"Input geography must be one of: {', '.join(geographies)}"
)

for state in states:
filename = f"{dataset}-{version}-{mode}-{year}-{geography}-{state}"
partitions = "/*" * DATASET_DICT[version][dataset]["partition_levels"]
filename = f"{dataset}-{version}-{mode}-{year}-{geography}-{state}"
partitions = "/*" * DATASET_DICT[version][dataset]["partition_levels"]

con.sql(
f"""
COPY (
SELECT
{', '.join(DATASET_DICT[version][dataset]['public_file_columns'])},
regexp_extract(filename, 'part-(\\d+-\\d+)\\.parquet', 1) AS chunk_id
FROM read_parquet(
'r2://{params['s3']['data_bucket']}/{dataset}{partitions}/*.parquet',
hive_partitioning = true,
hive_types_autocast = false,
filename = true
)
WHERE version = '{version}'
AND mode = '{mode}'
AND year = '{year}'
AND geography = '{geography}'
AND state = '{state}'
ORDER BY {', '.join(DATASET_DICT[version][dataset]['order_by_columns'])}
con.sql(
f"""
COPY (
SELECT
{", ".join(DATASET_DICT[version][dataset]["public_file_columns"])},
regexp_extract(filename, 'part-(\\d+-\\d+)\\.parquet', 1) AS chunk_id
FROM read_parquet(
'r2://{params["s3"]["data_bucket"]}/{dataset}{partitions}/*.parquet',
hive_partitioning = true,
hive_types_autocast = false,
filename = true
)
TO 'r2://{params['s3']['public_bucket']}/{dataset}/version={version}/mode={mode}/year={year}/geography={geography}/state={state}'
(
FORMAT 'parquet',
COMPRESSION '{params['output']['compression']['type']}',
COMPRESSION_LEVEL {params['output']['compression']['level']},
OVERWRITE_OR_IGNORE true,
FILENAME_PATTERN '{filename}-',
FILE_SIZE_BYTES 475000000
);
"""
WHERE version = '{version}'
AND mode = '{mode}'
AND year = '{year}'
AND geography = '{geography}'
AND state = '{state}'
ORDER BY {", ".join(DATASET_DICT[version][dataset]["order_by_columns"])}
)
logger.info(f"Created file: {filename}")
TO 'r2://{params["s3"]["public_bucket"]}/{dataset}/version={version}/mode={mode}/year={year}/geography={geography}/state={state}'
(
FORMAT 'parquet',
COMPRESSION '{params["output"]["compression"]["type"]}',
COMPRESSION_LEVEL {params["output"]["compression"]["level"]},
OVERWRITE_OR_IGNORE true,
FILENAME_PATTERN '{filename}-',
FILE_SIZE_BYTES 475000000
);
"""
)
logger.info(f"Created file: {filename}")

con.close()

Expand All @@ -105,9 +104,15 @@ def main() -> None:
parser.add_argument("--mode", required=True, type=str)
parser.add_argument("--year", required=True, type=str)
parser.add_argument("--geography", required=True, type=str)
parser.add_argument("--state", required=True, type=str)
args = parser.parse_args()
create_public_files(
args.dataset, args.version, args.mode, args.year, args.geography
args.dataset,
args.version,
args.mode,
args.year,
args.geography,
args.state,
)


Expand Down

0 comments on commit 2fa0c24

Please sign in to comment.