diff --git a/.github/workflows/create-public-files.yaml b/.github/workflows/create-public-files.yaml index 831c72e..a832027 100644 --- a/.github/workflows/create-public-files.yaml +++ b/.github/workflows/create-public-files.yaml @@ -1,18 +1,11 @@ --- name: create-public-files -run-name: create-public-files-${{ inputs.version }} +run-name: create-public-files-${{ inputs.version }}-${{ inputs.mode }} on: workflow_dispatch: inputs: - # Input values must match those in params.yaml - dataset: - required: true - description: Comma-separated list of datasets - default: 'times,points,missing_pairs,metadata' - type: string - version: required: true description: Version of data @@ -21,20 +14,40 @@ on: mode: required: true - description: Comma-separated list of travel modes - default: 'auto,bicycle,pedestrian' + description: Mode of travel + default: 'car' + type: choice + options: + - car + - bicycle + - foot + + override_years: + required: false + description: | + Comma-separated list of OSM data years to run e.g. 2020,2023. + Will run all (see params.yaml) if null type: string - year: - required: true - description: Comma-separated list of years - default: '2020,2021,2022,2023,2024' + override_states: + required: false + description: | + Comma-separated state codes to run e.g. 01,06. + Will run all (see params.yaml) if null type: string - geography: - required: true - description: Comma-separated list of Census geographies - default: 'state,county,county_subdivision,tract,zcta' + override_datasets: + required: false + description: | + Comma-separated list datasets to publish e.g. times,metadata. + Will run all (see params.yaml) if null + type: string + + override_geographies: + required: false + description: | + Comma-separated geographies to limit run e.g. county,tract. + Will run all (see params.yaml) if null type: string env: @@ -48,35 +61,52 @@ jobs: runs-on: ubuntu-24.04 outputs: - modes: ${{ steps.parse-inputs.outputs.modes }} - years: ${{ steps.parse-inputs.outputs.years }} - geographies: ${{ steps.parse-inputs.outputs.geographies }} + years: ${{ steps.create-year-jobs.outputs.param }} + states: ${{ steps.create-state-jobs.outputs.param }} steps: - - name: Parse inputs - id: parse-inputs - shell: bash - run: | - echo "modes=$(echo '${{ inputs.mode }}' | jq -R -c 'split(",")')" >> "$GITHUB_OUTPUT" - echo "years=$(echo '${{ inputs.year }}' | jq -R -c 'split(",")')" >> "$GITHUB_OUTPUT" - echo "geographies=$(echo '${{ inputs.geography }}' | jq -R -c 'split(",")')" >> "$GITHUB_OUTPUT" + - name: Checkout + uses: actions/checkout@v4 + + - name: Create year jobs + id: create-year-jobs + uses: ./.github/actions/parse-gh-input + with: + param_path: '.input.year' + param_override: '${{ inputs.override_years }}' + + - name: Create state jobs + id: create-state-jobs + uses: ./.github/actions/parse-gh-input + with: + param_path: '.input.state' + param_override: '${{ inputs.override_states }}' create-files: runs-on: ubuntu-24.04 needs: setup-jobs - name: create-file-${{ inputs.version }}-${{ matrix.mode }}-${{ matrix.year }}-${{ matrix.geography }} strategy: # Don't fail all chunks if one fails fail-fast: false matrix: - mode: ${{ fromJSON(needs.setup-jobs.outputs.modes) }} year: ${{ fromJSON(needs.setup-jobs.outputs.years) }} - geography: ${{ fromJSON(needs.setup-jobs.outputs.geographies) }} + state: ${{ fromJSON(needs.setup-jobs.outputs.states) }} steps: - name: Checkout uses: actions/checkout@v4 + - name: Setup Cloudflare credentials + uses: ./.github/actions/setup-cloudflare-s3 + with: + CLOUDFLARE_S3_API_ACCESS_KEY_ID: ${{ secrets.CLOUDFLARE_S3_API_ACCESS_KEY_ID }} + CLOUDFLARE_S3_API_SECRET_ACCESS_KEY: ${{ secrets.CLOUDFLARE_S3_API_SECRET_ACCESS_KEY }} + + - name: Remove unnecessary software and increase swap space + uses: ./.github/actions/prep-disk-and-swap + with: + swap_override: 50000 + - name: Install uv uses: astral-sh/setup-uv@v5 with: @@ -90,25 +120,44 @@ jobs: id: install-python-dependencies shell: bash run: | + sudo apt-get install libgeos-dev uv python install uv venv uv pip install ".[site,data]" - - name: Setup Cloudflare credentials - uses: ./.github/actions/setup-cloudflare-s3 + - name: Create file jobs per dataset + id: create-dataset-jobs + uses: ./.github/actions/parse-gh-input with: - CLOUDFLARE_S3_API_ACCESS_KEY_ID: ${{ secrets.CLOUDFLARE_S3_API_ACCESS_KEY_ID }} - CLOUDFLARE_S3_API_SECRET_ACCESS_KEY: ${{ secrets.CLOUDFLARE_S3_API_SECRET_ACCESS_KEY }} + param_path: '.output.dataset' + param_override: '${{ inputs.override_datasets }}' + + - name: Create file jobs per geography + id: create-geo-jobs + uses: ./.github/actions/parse-gh-input + with: + param_path: '.input.census.geography.all' + param_override: '${{ inputs.override_geographies }}' + + - name: Setup tmate session + uses: mxschmitt/action-tmate@v3 + with: + limit-access-to-actor: true - name: Create public files - id: create-public-files working-directory: 'data' shell: bash run: | - datasets_parsed=($(echo "${{ inputs.dataset }}" | tr -d ' ' | tr ',' ' ')) - for dataset in "${datasets_parsed[@]}"; do - uv run ./src/create_public_files.py \ - --dataset "$dataset" --version ${{ inputs.version }} \ - --mode ${{ matrix.mode }} --year ${{ matrix.year }} \ - --geography ${{ matrix.geography }} + geographies='${{ steps.create-geo-jobs.outputs.param }}' + geographies_array=($(echo "$geographies" | jq -r '.[]')) + datasets='${{ steps.create-dataset-jobs.outputs.param }}' + datasets_array=($(echo "$datasets" | jq -r '.[]')) + + for geo in "${geographies_array[@]}"; do + for dataset in "${datasets_array[@]}"; do + uv run ./src/create_public_files.py \ + --dataset "$dataset" --version ${{ inputs.version }} \ + --mode ${{ inputs.mode }} --year ${{ matrix.year }} \ + --geography "$geo" --state ${{ matrix.state }} + done done diff --git a/.github/workflows/update-data-site.yaml b/.github/workflows/update-data-site.yaml index de1103e..71d7e12 100644 --- a/.github/workflows/update-data-site.yaml +++ b/.github/workflows/update-data-site.yaml @@ -4,10 +4,6 @@ name: update-data-site on: workflow_dispatch: - workflow_run: - workflows: [create-public-files] - types: - - completed env: AWS_DEFAULT_REGION: us-east-1 @@ -36,6 +32,7 @@ jobs: id: install-python-dependencies shell: bash run: | + sudo apt-get install libgeos-dev uv python install uv venv uv pip install ".[site,data]" diff --git a/data/params.yaml b/data/params.yaml index d195340..098e158 100644 --- a/data/params.yaml +++ b/data/params.yaml @@ -158,3 +158,10 @@ output: compression: type: zstd level: 12 + + # List of OpenTimes table names + dataset: + - times + - points + - missing_pairs + - metadata diff --git a/data/src/create_public_files.py b/data/src/create_public_files.py index b09a721..ba010d3 100644 --- a/data/src/create_public_files.py +++ b/data/src/create_public_files.py @@ -15,6 +15,7 @@ def create_public_files( mode: str, year: str, geography: str, + state: str, ) -> None: """ Janky function to pull data from the S3 output bucket and repartition it @@ -27,12 +28,11 @@ def create_public_files( mode: Travel mode, one of ['walk', 'bicycle', 'car', 'transit']. year: Year of the data. geography: Census geography of the data. See params.yaml for list. + state: State of the data. """ with open("params.yaml") as file: params = yaml.safe_load(file) - states = params["input"]["state"] con = create_duckdb_connection() - logger.info("Successfully connected to DuckDB") # Check that the input strings are valid datasets = list(DATASET_DICT[version].keys()) @@ -59,41 +59,40 @@ def create_public_files( f"Input geography must be one of: {', '.join(geographies)}" ) - for state in states: - filename = f"{dataset}-{version}-{mode}-{year}-{geography}-{state}" - partitions = "/*" * DATASET_DICT[version][dataset]["partition_levels"] + filename = f"{dataset}-{version}-{mode}-{year}-{geography}-{state}" + partitions = "/*" * DATASET_DICT[version][dataset]["partition_levels"] - con.sql( - f""" - COPY ( - SELECT - {', '.join(DATASET_DICT[version][dataset]['public_file_columns'])}, - regexp_extract(filename, 'part-(\\d+-\\d+)\\.parquet', 1) AS chunk_id - FROM read_parquet( - 'r2://{params['s3']['data_bucket']}/{dataset}{partitions}/*.parquet', - hive_partitioning = true, - hive_types_autocast = false, - filename = true - ) - WHERE version = '{version}' - AND mode = '{mode}' - AND year = '{year}' - AND geography = '{geography}' - AND state = '{state}' - ORDER BY {', '.join(DATASET_DICT[version][dataset]['order_by_columns'])} + con.sql( + f""" + COPY ( + SELECT + {", ".join(DATASET_DICT[version][dataset]["public_file_columns"])}, + regexp_extract(filename, 'part-(\\d+-\\d+)\\.parquet', 1) AS chunk_id + FROM read_parquet( + 'r2://{params["s3"]["data_bucket"]}/{dataset}{partitions}/*.parquet', + hive_partitioning = true, + hive_types_autocast = false, + filename = true ) - TO 'r2://{params['s3']['public_bucket']}/{dataset}/version={version}/mode={mode}/year={year}/geography={geography}/state={state}' - ( - FORMAT 'parquet', - COMPRESSION '{params['output']['compression']['type']}', - COMPRESSION_LEVEL {params['output']['compression']['level']}, - OVERWRITE_OR_IGNORE true, - FILENAME_PATTERN '{filename}-', - FILE_SIZE_BYTES 475000000 - ); - """ + WHERE version = '{version}' + AND mode = '{mode}' + AND year = '{year}' + AND geography = '{geography}' + AND state = '{state}' + ORDER BY {", ".join(DATASET_DICT[version][dataset]["order_by_columns"])} ) - logger.info(f"Created file: {filename}") + TO 'r2://{params["s3"]["public_bucket"]}/{dataset}/version={version}/mode={mode}/year={year}/geography={geography}/state={state}' + ( + FORMAT 'parquet', + COMPRESSION '{params["output"]["compression"]["type"]}', + COMPRESSION_LEVEL {params["output"]["compression"]["level"]}, + OVERWRITE_OR_IGNORE true, + FILENAME_PATTERN '{filename}-', + FILE_SIZE_BYTES 475000000 + ); + """ + ) + logger.info(f"Created file: {filename}") con.close() @@ -105,9 +104,15 @@ def main() -> None: parser.add_argument("--mode", required=True, type=str) parser.add_argument("--year", required=True, type=str) parser.add_argument("--geography", required=True, type=str) + parser.add_argument("--state", required=True, type=str) args = parser.parse_args() create_public_files( - args.dataset, args.version, args.mode, args.year, args.geography + args.dataset, + args.version, + args.mode, + args.year, + args.geography, + args.state, )