Skip to content

Commit

Permalink
Merge pull request #332 from arXiv/develop
Browse files Browse the repository at this point in the history
deploy aggregate hourly downloads logging changes
  • Loading branch information
kyokukou authored Oct 21, 2024
2 parents bb4edc4 + ca31260 commit 17b6381
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 27 deletions.
11 changes: 8 additions & 3 deletions .github/workflows/pullreqeust_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ jobs:
# TODO The types are in bad shape and need to be fixed
# run: poetry run mypy --exclude "test*" -p arxiv

- name: Install Chrome Driver
- name: Install Firefox ESR and test driver
run: |
sudo apt-get update
sudo apt-get install -y chromium-browser chromium-chromedriver
sudo add-apt-repository -y ppa:mozillateam/ppa
sudo apt update
sudo apt install -y firefox-esr
wget https://github.com/mozilla/geckodriver/releases/latest/download/geckodriver-v0.35.0-linux64.tar.gz
tar -xvzf geckodriver-v0.35.0-linux64.tar.gz
sudo mv geckodriver /usr/local/bin/
sudo chmod +x /usr/local/bin/geckodriver
- name: Run other tests
# These tests are split out because their coverage is low
Expand Down
25 changes: 11 additions & 14 deletions arxiv/auth/openid/tests/test_keycloak.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,23 @@
import pytest
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.firefox.service import Service
from selenium.webdriver.firefox.options import Options
import time

@pytest.fixture(scope="module")
def web_driver() -> webdriver.Chrome:
# Set up the Selenium WebDriver
# You'd need
# sudo apt-get update
# sudo apt-get install -y chromium-browser chromium-chromedriver

options = Options()
options.binary_location = "/usr/bin/chromium-browser"
options.add_argument("--headless")
options.add_argument("--disable-gpu")
service = Service(executable_path="/usr/bin/chromedriver")
_web_driver = webdriver.Chrome(service=service, options=options)
_web_driver.implicitly_wait(10) # Wait for elements to be ready
options.headless = True
options.binary_location = "/usr/bin/firefox-esr"
options.add_argument('--headless')

service = Service(executable_path="/usr/local/bin/geckodriver")
_web_driver = webdriver.Firefox(service=service, options=options)
_web_driver.implicitly_wait(10) # Wait for elements to be ready
yield _web_driver
_web_driver.quit() # Close the browser window after tests
_web_driver.quit() # Close the browser window after tests


@pytest.fixture(scope="module")
def toy_flask():
Expand Down
2 changes: 1 addition & 1 deletion arxiv/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class Settings(BaseSettings):
LATEXML_DB_URI: Optional[str] = DEFAULT_LATEXML_DB
ECHO_SQL: bool = False
CLASSIC_DB_TRANSACTION_ISOLATION_LEVEL: Optional[IsolationLevel] = None
LATEXML_DB_TRANSACTION_ISOLATION_LEVEL: Optional[IsolationLevel] = "READ UNCOMMITTED"
LATEXML_DB_TRANSACTION_ISOLATION_LEVEL: Optional[IsolationLevel] = None

LATEXML_DB_QUERY_TIMEOUT: int = 5
"""Maximium seconds any query to the latxml DB can run.
Expand Down
26 changes: 17 additions & 9 deletions gcp/cloud_functions/aggregate_hourly_downloads/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def aggregate_hourly_downloads(cloud_event: CloudEvent):
download_data: List[DownloadData]=[] #not a dictionary because no unique keys
problem_rows: List[Tuple[Any], Exception]=[]
problem_row_count=0
time_periods=[]
for row in download_result:
try:
d_type = "src" if row['download_type'] == "e-print" else row['download_type'] #combine e-print and src downloads
Expand All @@ -188,11 +189,17 @@ def aggregate_hourly_downloads(cloud_event: CloudEvent):
problem_row_count+=1
problem_rows.append((tuple(row), e)) if len(problem_rows) < 20 else None
continue #dont count this download
if problem_row_count>0:
logging.warning(f"Problem processing {problem_row_count} rows")
logging.debug(f"Selection of problem row errors: {problem_rows}")
time_period=row['start_dttm'].replace(minute=0, second=0, microsecond=0)
if time_period not in time_periods:
time_periods.append(time_period)

logging.info(f"fetched {len(download_data)} rows, unique paper ids: {len(paper_ids)}")
time_period_str= ', '.join([date.strftime('%Y-%m-%d %H:%M:%S') for date in time_periods])
if problem_row_count>30:
logging.warning(f"{time_period_str}: Problem processing {problem_row_count} rows")
logging.debug(f"{time_period_str}: Selection of problem row errors: {problem_rows}")

fetched_count=len(download_data)
unique_id_count=len(paper_ids)

if len(paper_ids) ==0:
logging.critical("No data retrieved from BigQuery")
Expand All @@ -201,14 +208,15 @@ def aggregate_hourly_downloads(cloud_event: CloudEvent):
#find categories for all the papers
paper_categories=get_paper_categories(paper_ids)
if len(paper_categories) ==0:
logging.critical("No category data retrieved from database")
logging.critical(f"{time_period_str}: No category data retrieved from database")
return #this will prevent retries (is that good?)

#aggregate download data
aggregated_data=aggregate_data(download_data, paper_categories)

#write all_data to tables
insert_into_database(aggregated_data, write_table)
add_count, update_count=insert_into_database(aggregated_data, write_table)
logging.info(f"{time_period_str}: SUCCESS! fetched rows: {fetched_count}, unique_ids: {unique_id_count}, unprocessable rows: {problem_row_count}, rows added: {add_count}, rows updated: {update_count}")


def get_paper_categories(paper_ids: Set[str])-> Dict[str, PaperCategories]:
Expand Down Expand Up @@ -272,13 +280,14 @@ def aggregate_data(download_data: List[DownloadData], paper_categories: Dict[str

return all_data

def insert_into_database(aggregated_data: Dict[DownloadKey, DownloadCounts], db_uri: str):
def insert_into_database(aggregated_data: Dict[DownloadKey, DownloadCounts], db_uri: str)->Tuple[int, int]:
"""adds the data from an hour of downloads into the database
uses bulk insert and update statements to increase efficiency
first compiles all the keys for the data we would like to add and checks for their presence in the database
present items are added to run update for, and removed from the aggregated dictionary
remaining items are inserted
data with duplicate keys will be overwritten to allow for reruns with updates
returns the number of rows added and updated
"""
#set up table
Base = declarative_base()
Expand Down Expand Up @@ -369,5 +378,4 @@ class HourlyDownloadData(Base):
session.bulk_update_mappings(HourlyDownloadData, update_data[i:i+MAX_QUERY_TO_WRITE])
session.commit()
session.close()
logging.info(f"added {len(data_to_insert)} rows, updated {len(update_data)} rows")
return
return (len(data_to_insert), len(update_data))

0 comments on commit 17b6381

Please sign in to comment.