Skip to content

Commit

Permalink
Mechanics for backfilling missing BigQuery metadata (pypi#17429)
Browse files Browse the repository at this point in the history
* Add MissingDatasetFile

* Add processed column to MissingDatasetFile

* Modify sync_bigquery_release_files to use MissingDatasetFile

* Run the task every minute

* Make processed column nullable and default to null

* Only process unprocessed files

* Run every 5 minutes instead

* Fix a bug

* Give tests access to the transaction manager

* Commit early via the transaction manager

* Add the objects back into the session

* Squash migrations
  • Loading branch information
di authored Jan 17, 2025
1 parent 5ae94d8 commit eda0e75
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 157 deletions.
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,8 +668,9 @@ def query_recorder(app_config):


@pytest.fixture
def db_request(pyramid_request, db_session):
def db_request(pyramid_request, db_session, tm):
pyramid_request.db = db_session
pyramid_request.tm = tm
pyramid_request.flags = admin.flags.Flags(pyramid_request)
pyramid_request.banned = admin.bans.Bans(pyramid_request)
pyramid_request.organization_access = True
Expand Down Expand Up @@ -734,7 +735,6 @@ def tm():
# Create a new transaction manager for dependant test cases
tm = transaction.TransactionManager(explicit=True)
tm.begin()
tm.doom()

yield tm

Expand Down
11 changes: 6 additions & 5 deletions tests/unit/packaging/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
)
from warehouse.packaging.models import AlternateRepository, File, Project, Release, Role
from warehouse.packaging.services import project_service_factory
from warehouse.packaging.tasks import ( # sync_bigquery_release_files,
from warehouse.packaging.tasks import (
check_file_cache_tasks_outstanding,
sync_bigquery_release_files,
update_description_html,
)
from warehouse.rate_limiting import IRateLimiter, RateLimit
Expand Down Expand Up @@ -169,10 +170,10 @@ def key_factory(keystring, iterate_on=None, if_attr_exists=None):
]

if with_bq_sync:
# assert (
# pretend.call(crontab(minute=0), sync_bigquery_release_files)
# in config.add_periodic_task.calls
# )
assert (
pretend.call(crontab(minute="*/5"), sync_bigquery_release_files)
in config.add_periodic_task.calls
)
pass

assert (
Expand Down
69 changes: 9 additions & 60 deletions tests/unit/packaging/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import tempfile

from contextlib import contextmanager
from itertools import product

import pretend
import pytest
Expand All @@ -24,7 +23,7 @@
import warehouse.packaging.tasks

from warehouse.accounts.models import WebAuthn
from warehouse.packaging.models import Description
from warehouse.packaging.models import Description, MissingDatasetFile
from warehouse.packaging.tasks import (
check_file_cache_tasks_outstanding,
compute_2fa_metrics,
Expand Down Expand Up @@ -734,19 +733,18 @@ def test_sync_rows(
DependencyFactory.create(release=release, kind=2)
DependencyFactory.create(release=release, kind=3)
DependencyFactory.create(release=release, kind=4)
load_config = pretend.call_recorder(lambda *a, **kw: None)
monkeypatch.setattr("warehouse.packaging.tasks.LoadJobConfig", load_config)
missing = MissingDatasetFile(file_id=release_file.id)
db_request.db.add(missing)

query = pretend.stub(
result=pretend.call_recorder(
lambda *a, **kw: [{"md5_digest": release_file2.md5_digest}]
)
)
get_table = pretend.stub(schema=bq_schema)
get_result = pretend.stub(result=lambda: None)
bigquery = pretend.stub(
get_table=pretend.call_recorder(lambda t: get_table),
load_table_from_json=pretend.call_recorder(lambda *a, **kw: get_result),
insert_rows_json=pretend.call_recorder(lambda *a, **kw: None),
query=pretend.call_recorder(lambda q: query),
)

Expand All @@ -765,17 +763,11 @@ def find_service(name=None):

assert db_request.find_service.calls == [pretend.call(name="gcloud.bigquery")]
assert bigquery.get_table.calls == expected_get_table_calls
assert bigquery.query.calls == [
pretend.call(query.format(table))
for table in release_files_table.split()
for query in [
"SELECT md5_digest FROM {} WHERE md5_digest LIKE 'ff%'",
"SELECT md5_digest FROM {} WHERE md5_digest LIKE 'fe%'",
]
]
assert bigquery.load_table_from_json.calls == [
assert bigquery.query.calls == []
assert bigquery.insert_rows_json.calls == [
pretend.call(
[
table=table,
json_rows=[
{
"metadata_version": None,
"name": project.name,
Expand Down Expand Up @@ -818,54 +810,11 @@ def find_service(name=None):
"blake2_256_digest": release_file.blake2_256_digest,
},
],
table,
job_config=None,
)
for table in release_files_table.split()
]

@pytest.mark.parametrize("bq_schema", [bq_schema])
def test_no_diff(self, db_request, monkeypatch, bq_schema):
project = ProjectFactory.create()
release = ReleaseFactory.create(project=project)
release_file = FileFactory.create(
release=release, filename=f"foobar-{release.version}.tar.gz"
)

query = pretend.stub(
result=pretend.call_recorder(
lambda *a, **kw: [{"md5_digest": release_file.md5_digest}]
)
)
get_table = pretend.stub(schema=bq_schema)
bigquery = pretend.stub(
get_table=pretend.call_recorder(lambda t: get_table),
query=pretend.call_recorder(lambda q: query),
)

@pretend.call_recorder
def find_service(name=None):
if name == "gcloud.bigquery":
return bigquery
raise LookupError

db_request.find_service = find_service
db_request.registry.settings = {
"warehouse.release_files_table": "example.pypi.distributions"
}

sync_bigquery_release_files(db_request)

assert db_request.find_service.calls == [pretend.call(name="gcloud.bigquery")]
assert bigquery.get_table.calls == [pretend.call("example.pypi.distributions")]
assert bigquery.query.calls == [
pretend.call(
"SELECT md5_digest "
"FROM example.pypi.distributions "
f"WHERE md5_digest LIKE '{first}{second}%'",
)
for first, second in product("fedcba9876543210", repeat=2)
]
assert missing.processed

def test_var_is_none(self):
request = pretend.stub(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Add MissingDatasetFile
Revision ID: 77d52a945a5f
Revises: 12a43f12cc18
Create Date: 2025-01-17 16:56:09.082853
"""

import sqlalchemy as sa

from alembic import op

revision = "77d52a945a5f"
down_revision = "12a43f12cc18"


def upgrade():
op.create_table(
"missing_dataset_files",
sa.Column("file_id", sa.UUID(), nullable=False),
sa.Column("processed", sa.Boolean(), nullable=True),
sa.Column(
"id", sa.UUID(), server_default=sa.text("gen_random_uuid()"), nullable=False
),
sa.ForeignKeyConstraint(
["file_id"],
["release_files.id"],
),
sa.PrimaryKeyConstraint("id"),
)


def downgrade():
op.drop_table("missing_dataset_files")
6 changes: 3 additions & 3 deletions warehouse/packaging/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
check_file_cache_tasks_outstanding,
compute_2fa_metrics,
compute_packaging_metrics,
sync_bigquery_release_files,
update_description_html,
)
from warehouse.rate_limiting import IRateLimiter, RateLimit
Expand Down Expand Up @@ -197,6 +198,5 @@ def includeme(config):
# Add a periodic task to generate general metrics
config.add_periodic_task(crontab(minute="*/5"), compute_packaging_metrics)

# TODO: restore this
# if config.get_settings().get("warehouse.release_files_table"):
# config.add_periodic_task(crontab(minute=0), sync_bigquery_release_files)
if config.get_settings().get("warehouse.release_files_table"):
config.add_periodic_task(crontab(minute="*/5"), sync_bigquery_release_files)
8 changes: 8 additions & 0 deletions warehouse/packaging/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1133,3 +1133,11 @@ class AlternateRepository(db.Model):
name: Mapped[str]
url: Mapped[str]
description: Mapped[str]


class MissingDatasetFile(db.Model):
__tablename__ = "missing_dataset_files"

file_id: Mapped[UUID] = mapped_column(ForeignKey("release_files.id"))
file: Mapped[File] = orm.relationship()
processed: Mapped[bool] = mapped_column(default=None, nullable=True)
Loading

0 comments on commit eda0e75

Please sign in to comment.