Skip to content

Commit 9b86df4

Browse files
authored
Bg geoip update (#951)
* Add checkin models and initial route * Porting old code to new probe_services endpoint * Add geoip2 dependency * Add geoip tables as dependencies * Ported probe_geoip function * Porting more functions from checkin * Added health checks for clickhouse * First complete version of checkin endpoint port * Fix bad name for metric * Port some checkin integration tests * Add pytest-docker as test dependency * Add docker compose to launch clickhouse db for tests * Making tests work with the new setup * Fixed unhashable settings error * ported script to download geoip DB * Remove exit * Use setting to fetch db dir * Added setting for geoip db dir location * Rename download geoip file * Set download dir as input argument * download db before tests * Black reformat * fix error with bad header parsing * Making tests work * Add settings as a parameter to mock it in tests * Fix bad reader dependency * Skip broken test * Add function to populate db * Add data for tests * Fix fixture loading * Add clickhouse db fixture * Adding integration tests for check in * Removed todo comments * Add missin / at the start of path * Black reformat * Refactor to move prio code into a prio.py file * Add todo comment * Add timer to measure function run time * Improved writing of comment * Improved writing of comment * Added missing alembic migration * Revert "Added missing alembic migration" This reverts commit fb072d2. * Add help comment in init.sh * Black reformat * Add APScheduler for background tasks * Add geoip db update job * Changed APScheduler for fast api utils repeat-every * Add repeating task for updating the geoip DB * Add parameter to enable/disable repeating tasks The tests were hanging because of the repeating tasks, so I added a parameter to deactivate them in tests * Change task execution frequency * Remove print * Black reformat * Add TS calculation to function body * Re-raise consistency check exception in download_geoip * Add geoip data dir fixture * Removed unnecessary remove * Changed heavy fastpath query on health check * trigger ci * Reformat query * Changed health check to a less heavy query
1 parent 5dec72a commit 9b86df4

File tree

18 files changed

+1655
-23
lines changed

18 files changed

+1655
-23
lines changed

ooniapi/common/src/common/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,6 @@ class Settings(BaseSettings):
3131
email_source_address: str = "[email protected]"
3232

3333
vpn_credential_refresh_hours: int = 24
34+
35+
# Where the geoip DBs are downloaded to
36+
geoip_db_dir: str = "/var/lib/ooni/geoip"

ooniapi/common/src/common/metrics.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import secrets
22

3-
from typing import Annotated
3+
from typing import Annotated, Optional
4+
import timeit
45

56
from fastapi import FastAPI, Response, Depends, HTTPException, status
67
from fastapi.security import HTTPBasic, HTTPBasicCredentials
@@ -10,6 +11,7 @@
1011
CONTENT_TYPE_LATEST,
1112
CollectorRegistry,
1213
generate_latest,
14+
Histogram,
1315
)
1416

1517
security = HTTPBasic()
@@ -40,3 +42,51 @@ def metrics(
4042

4143
endpoint = "/metrics"
4244
app.get(endpoint, include_in_schema=True, tags=None)(metrics)
45+
46+
47+
FUNCTION_TIME = Histogram(
48+
"fn_execution_time_seconds",
49+
"Function execution time in seconds",
50+
labelnames=["name", "status"],
51+
)
52+
53+
54+
def timer(func, timer_name: Optional[str] = None):
55+
"""Measure function execution time in seconds.
56+
57+
Metric will include execution status (error or success) and function name, you
58+
can also specify another name with `timer_name`. Example:
59+
60+
```
61+
@timer
62+
def add(x,y):
63+
return x + y
64+
```
65+
66+
Args:
67+
func (Callable): function to time
68+
timer_name (Optional[str], optional): Alternative name for this timer, uses function name if
69+
not provided. Defaults to None.
70+
"""
71+
72+
name = timer_name or func.__name__
73+
74+
def timed_function(*args, **kwargs):
75+
76+
start_time = timeit.default_timer()
77+
result = None
78+
successs = False
79+
80+
try:
81+
result = func(*args, **kwargs)
82+
successs = True
83+
finally:
84+
end_time = timeit.default_timer()
85+
status = "success" if successs else "error"
86+
FUNCTION_TIME.labels(name=name, status=status).observe(
87+
end_time - start_time
88+
)
89+
90+
return result
91+
92+
return timed_function

ooniapi/services/oonimeasurements/src/oonimeasurements/main.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,10 @@ async def health(
8585
errors = []
8686

8787
try:
88-
query = """SELECT *
89-
FROM fastpath FINAL
88+
query = """
89+
SELECT COUNT()
90+
FROM fastpath
91+
WHERE measurement_start_time < NOW() AND measurement_start_time > NOW() - INTERVAL 3 HOUR
9092
"""
9193
query_click(db=db, query=query, query_params={})
9294
except Exception as exc:

ooniapi/services/ooniprobe/pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ dependencies = [
2424
"prometheus-fastapi-instrumentator ~= 6.1.0",
2525
"prometheus-client",
2626
"pem ~= 23.1.0",
27+
"geoip2 ~= 5.0.1",
28+
"fastapi-utils[all] ~= 0.8.0"
2729
]
2830

2931
readme = "README.md"
@@ -67,6 +69,7 @@ dependencies = [
6769
"pytest-postgresql",
6870
"pytest-asyncio",
6971
"freezegun",
72+
"pytest-docker",
7073
]
7174
path = ".venv/"
7275

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
1-
from functools import lru_cache
2-
from typing import Annotated
1+
from typing import Annotated, TypeAlias
2+
from pathlib import Path
33

44
from fastapi import Depends
55

6+
import geoip2.database
7+
68
from sqlalchemy import create_engine
79
from sqlalchemy.orm import sessionmaker
810

11+
from clickhouse_driver import Client as Clickhouse
12+
913
from .common.config import Settings
1014
from .common.dependencies import get_settings
1115

1216

13-
def get_postgresql_session(settings: Annotated[Settings, Depends(get_settings)]):
17+
SettingsDep: TypeAlias = Annotated[Settings, Depends(get_settings)]
18+
19+
20+
def get_postgresql_session(settings: SettingsDep):
1421
engine = create_engine(settings.postgresql_url)
1522
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
1623

@@ -19,3 +26,30 @@ def get_postgresql_session(settings: Annotated[Settings, Depends(get_settings)])
1926
yield db
2027
finally:
2128
db.close()
29+
30+
31+
def get_cc_reader(settings: SettingsDep):
32+
db_path = Path(settings.geoip_db_dir, "cc.mmdb")
33+
return geoip2.database.Reader(db_path)
34+
35+
36+
CCReaderDep = Annotated[geoip2.database.Reader, Depends(get_cc_reader)]
37+
38+
39+
def get_asn_reader(settings: SettingsDep):
40+
db_path = Path(settings.geoip_db_dir, "asn.mmdb")
41+
return geoip2.database.Reader(db_path)
42+
43+
44+
ASNReaderDep = Annotated[geoip2.database.Reader, Depends(get_asn_reader)]
45+
46+
47+
def get_clickhouse_session(settings: SettingsDep):
48+
db = Clickhouse.from_url(settings.clickhouse_url)
49+
try:
50+
yield db
51+
finally:
52+
db.disconnect()
53+
54+
55+
ClickhouseDep = Annotated[Clickhouse, Depends(get_clickhouse_session)]
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
"""
2+
Updates asn.mmdb and cc.mmdb in /var/lib/ooniapi/
3+
4+
"""
5+
6+
import sys
7+
import gzip
8+
import timeit
9+
import shutil
10+
import logging
11+
12+
import geoip2.database
13+
from pathlib import Path
14+
from datetime import datetime, timezone
15+
from urllib.error import HTTPError
16+
from urllib.request import urlopen, Request
17+
18+
from prometheus_client import metrics
19+
20+
21+
class Metrics:
22+
GEOIP_ASN_NODE_CNT = metrics.Gauge("geoip_asn_node_cnt", "Count of geoi nodes")
23+
GEOIP_ASN_EPOCH = metrics.Gauge("geoip_asn_epoch", "Geoip current ASN epoch")
24+
GEOIP_CC_NODE_CNT = metrics.Gauge("geoip_cc_node_cnt", "Geoip asn node count")
25+
GEOIP_CC_EPOCH = metrics.Gauge("geoip_cc_epoch", "Geoip current CC epoch")
26+
GEOIP_CHECKFAIL = metrics.Counter(
27+
"ooni_geoip_checkfail", "How many times did the check fail in geo ip fail"
28+
)
29+
GEOIP_UPDATED = metrics.Counter(
30+
"ooni_geoip_updated", "How many times was the geoip database updated"
31+
)
32+
GEOIP_DOWNLOAD_TIME = metrics.Histogram(
33+
"geoip_download_time", "How long it takes to download the DB"
34+
)
35+
36+
log = logging.getLogger("ooni_download_geoip")
37+
38+
log.addHandler(logging.StreamHandler(sys.stdout))
39+
log.setLevel(logging.DEBUG)
40+
41+
42+
def get_request(url):
43+
req = Request(url)
44+
# We need to set the user-agent otherwise db-ip gives us a 403
45+
req.add_header("User-Agent", "ooni-downloader")
46+
return urlopen(req)
47+
48+
49+
def is_already_updated(db_dir: Path, ts : str) -> bool:
50+
try:
51+
with (db_dir / "geoipdbts").open() as in_file:
52+
current_ts = in_file.read()
53+
except FileNotFoundError:
54+
return False
55+
56+
return current_ts == ts
57+
58+
59+
def is_latest_available(url: str) -> bool:
60+
log.info(f"fetching {url}")
61+
try:
62+
resp = get_request(url)
63+
return resp.status == 200
64+
except HTTPError as err:
65+
if resp.status == 404: # type: ignore
66+
log.info(f"{url} hasn't been updated yet")
67+
return False
68+
log.info(f"unexpected status code '{err.code}' in {url}")
69+
return False
70+
71+
72+
def check_geoip_db(path: Path) -> None:
73+
assert "cc" in path.name or "asn" in path.name, "invalid path"
74+
75+
with geoip2.database.Reader(str(path)) as reader:
76+
if "asn" in path.name:
77+
r1 = reader.asn("8.8.8.8")
78+
assert r1 is not None, "database file is invalid"
79+
m = reader.metadata()
80+
Metrics.GEOIP_ASN_NODE_CNT.set(m.node_count)
81+
Metrics.GEOIP_ASN_EPOCH.set(m.build_epoch)
82+
83+
elif "cc" in path.name:
84+
r2 = reader.country("8.8.8.8")
85+
assert r2 is not None, "database file is invalid"
86+
m = reader.metadata()
87+
Metrics.GEOIP_CC_NODE_CNT.set(m.node_count)
88+
Metrics.GEOIP_CC_EPOCH.set(m.build_epoch)
89+
90+
91+
def download_geoip(db_dir: Path, url: str, filename: str) -> None:
92+
start_time = timeit.default_timer() # Start timer
93+
log.info(f"Updating geoip database for {url} ({filename})")
94+
95+
tmp_gz_out = db_dir / f"{filename}.gz.tmp"
96+
tmp_out = db_dir / f"{filename}.tmp"
97+
98+
with get_request(url) as resp:
99+
with tmp_gz_out.open("wb") as out_file:
100+
shutil.copyfileobj(resp, out_file)
101+
with gzip.open(str(tmp_gz_out)) as in_file:
102+
with tmp_out.open("wb") as out_file:
103+
shutil.copyfileobj(in_file, out_file)
104+
tmp_gz_out.unlink()
105+
106+
try:
107+
check_geoip_db(tmp_out)
108+
except Exception as exc:
109+
log.error(f"consistenty check on the geoip DB failed: {exc}")
110+
Metrics.GEOIP_CHECKFAIL.inc()
111+
raise
112+
113+
tmp_out.rename(db_dir / filename)
114+
endtime = timeit.default_timer() # End timer
115+
Metrics.GEOIP_DOWNLOAD_TIME.observe(endtime - start_time)
116+
117+
118+
def update_geoip(db_dir: Path, ts : str, asn_url : str, cc_url : str) -> None:
119+
db_dir.mkdir(parents=True, exist_ok=True)
120+
download_geoip(db_dir, asn_url, "asn.mmdb")
121+
download_geoip(db_dir, cc_url, "cc.mmdb")
122+
123+
with (db_dir / "geoipdbts").open("w") as out_file:
124+
out_file.write(ts)
125+
126+
log.info("Updated GeoIP databases")
127+
Metrics.GEOIP_UPDATED.inc()
128+
129+
130+
def try_update(db_dir: str):
131+
db_dir_path = Path(db_dir)
132+
133+
ts = datetime.now(timezone.utc).strftime("%Y-%m")
134+
asn_url = f"https://download.db-ip.com/free/dbip-asn-lite-{ts}.mmdb.gz"
135+
cc_url = f"https://download.db-ip.com/free/dbip-country-lite-{ts}.mmdb.gz"
136+
137+
if is_already_updated(db_dir_path, ts):
138+
log.debug("Database already updated. Exiting.")
139+
return
140+
141+
if not is_latest_available(asn_url) or not is_latest_available(cc_url):
142+
log.debug("Update not available yet. Exiting.")
143+
return
144+
145+
update_geoip(db_dir_path, ts, asn_url, cc_url)

0 commit comments

Comments
 (0)