Skip to content

Commit

Permalink
add laposte data (#72)
Browse files Browse the repository at this point in the history
* add laposte data

* fix bug when in write data from csv to duckdb

* remove duck db client close
  • Loading branch information
NicolasDuchenne authored Feb 26, 2025
1 parent de39d30 commit 4c48b9e
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 186 deletions.
2 changes: 1 addition & 1 deletion dbt_/models/staging/communes/_communes_models.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ models:
- name: stg_communes__cog
description: "Liste des communes et leurs unités de distribution chargé depuis le site de l'insee"
- name: stg_communes__laposte
description: "Liste des communes et leurs unités de distribution chargé depuis open data gouv"
description: "Liste des communes et leurs unités de distribution chargé depuis le site de la poste https://datanova.laposte.fr/datasets/laposte-hexasmal"
22 changes: 8 additions & 14 deletions dbt_/models/staging/communes/stg_communes__laposte.sql
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
SELECT
TYPECOM::VARCHAR as typecom,
COM::VARCHAR AS com,
REG::SMALLINT as reg,
DEP::VARCHAR as dep,
CTCD::VARCHAR as ctcd,
ARR::VARCHAR as arr,
ARR::VARCHAR as arr,
TNCC::SMALLINT as tncc,
NCC::VARCHAR as ncc,
NCCENR::VARCHAR as nccenr,
LIBELLE::VARCHAR as libelle,
CAN::VARCHAR as can,
COMPARENT::VARCHAR as comparent,
de_partition::SMALLINT as de_partition
code_commune_insee::VARCHAR as code_commune_insee,
nom_de_la_commune::VARCHAR AS nom_de_la_commune,
code_postal::VARCHAR as code_postal,
libelle_d_acheminement::VARCHAR as libelle_d_acheminement,
ligne_5::VARCHAR as ligne_5,
_geopoint::VARCHAR as _geopoint,
de_partition::SMALLINT as de_partition,
de_ingestion_date::VARCHAR as de_ingestion_date
FROM {{ source('communes', 'laposte_communes') }}
15 changes: 9 additions & 6 deletions pipelines/tasks/build_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import logging
from typing import List

from pipelines.tasks.client.datagouv_client import COGDataset, DataGouvClient
from pipelines.tasks.client.insee_client import InseeClient
from pipelines.tasks.client.datagouv_client import DataGouvClient
from pipelines.tasks.client.https_to_duckdb_client import HTTPSToDuckDBClient
from pipelines.tasks.config.config_insee import get_insee_config
from pipelines.tasks.config.config_laposte import get_laposte_config

logger = logging.getLogger(__name__)

Expand All @@ -38,14 +40,15 @@ def execute(
:param drop_tables: Whether to drop edc tables in the database before data insertion.
"""
# Build database
insee = InseeClient()
insee.process_datasets()
laposte = COGDataset()
laposte.process_datasets()

data_gouv_client = DataGouvClient()
data_gouv_client.process_edc_datasets(
refresh_type=refresh_type,
custom_years=custom_years,
drop_tables=drop_tables,
check_update=check_update,
)
insee_client = HTTPSToDuckDBClient(get_insee_config())
insee_client.process_datasets()
laposte = HTTPSToDuckDBClient(get_laposte_config())
laposte.process_datasets()
58 changes: 11 additions & 47 deletions pipelines/tasks/client/datagouv_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
tqdm_common,
)
from pipelines.tasks.config.config_edc import get_edc_config
from pipelines.tasks.config.config_laposte import get_cog_config


class DataGouvClient(HTTPSClient):
Expand Down Expand Up @@ -73,7 +72,8 @@ def _get_edc_dataset_years_to_update(self, years: list) -> list:
"Check that EDC dataset are up to date according to www.data.gouv.fr"
)

conn = DuckDBClient().conn
duckdb_client = DuckDBClient()
conn = duckdb_client.conn

for year in years:
logger.info(f" Check EDC dataset datetime for {year}")
Expand Down Expand Up @@ -149,7 +149,7 @@ def _get_edc_dataset_years_to_update(self, years: list) -> list:
else:
logger.info(" All EDC dataset are already up to date")

conn.close()
# duckdb_client.close() # this line is commented because we get error duckdb.duckdb.ConnectionException: Connection Error: Connection already closed! when it is not
return update_years

def process_yearly_edc_data(self, year):
Expand All @@ -171,7 +171,7 @@ def process_yearly_edc_data(self, year):
extract_file(zip_file=zip_file, extract_folder=extract_folder)

logger.info(" Creating or updating tables in the database...")
duckcb_client = DuckDBClient()
duckdb_client = DuckDBClient()

files = self.config_edc["files"]

Expand All @@ -187,13 +187,13 @@ def process_yearly_edc_data(self, year):
year=year,
),
)
if duckcb_client.check_table_existence(
if duckdb_client.check_table_existence(
table_name=file_info["table_name"]
):
duckcb_client.delete_from_table(
duckdb_client.delete_from_table(
table_name=file_info["table_name"],
filters=[
duckcb_client.SQLFilters(
duckdb_client.SQLFilters(
colname="de_partition",
filter_value=year,
coltype="INTEGER",
Expand All @@ -206,7 +206,7 @@ def process_yearly_edc_data(self, year):
else:
ingest_type = "CREATE"

duckcb_client.ingest_from_csv(
duckdb_client.ingest_from_csv(
ingest_type=ingest_type,
table_name=file_info["table_name"],
de_partition=year,
Expand All @@ -215,7 +215,7 @@ def process_yearly_edc_data(self, year):
)
pbar.update(1)

duckcb_client.close()
# duckdb_client.close()

logger.info(" Cleaning up cache...")
clear_cache()
Expand Down Expand Up @@ -277,7 +277,7 @@ def process_edc_datasets(
]

duckdb_client.drop_tables(table_names=tables_names)
duckdb_client.close()
# duckdb_client.close()

logger.info(
f"Launching processing of EDC datasets for years: {years_to_update}"
Expand All @@ -287,41 +287,5 @@ def process_edc_datasets(
self.process_yearly_edc_data(year=year)

logger.info("Cleaning up cache...")
clear_cache(recreate_folder=False)
clear_cache()
return True


class COGDataset:
"""Dataset pour le Code Officiel Géographique (COG)
Chaque année, l'Insee met à disposition sur son site (insee.fr) le code officiel géographique
qui rassemble les codes et libellés des communes, des cantons, des arrondissements, des départements,
des régions et des pays et territoires étrangers au 1er janvier.
Source : https://www.data.gouv.fr/fr/datasets/code-officiel-geographique-cog/
"""

def __init__(self):
self.datagouv = DataGouvClient()
self.config = get_cog_config()

def process_datasets(self):
"""Process the COG datasets"""
# Process data
logger.info("Launching processing of laposte COG datasets")

# download dataset
self.datagouv.download_dataset_to_file(
dataset_id=self.config["source"]["id"],
filepath=Path(CACHE_FOLDER, self.config["file"]["file_name"]),
)

# create table in the database
duckdb_client = DuckDBClient()
duckdb_client.drop_tables(table_names=[self.config["file"]["table_name"]])
duckdb_client.ingest_from_csv(
ingest_type="CREATE",
table_name=self.config["file"]["table_name"],
de_partition=self.config["source"]["datetime"][:4],
dataset_datetime=self.config["source"]["datetime"],
filepath=Path(CACHE_FOLDER, self.config["file"]["file_name"]),
)
22 changes: 10 additions & 12 deletions pipelines/tasks/client/duckdb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,17 @@ def ingest_from_csv(
query = f"CREATE TABLE {table_name} AS "
else:
raise ValueError("ingest_type parameter needs to be INSERT or CREATE")

query = (
query
+ f"""
SELECT
*,
CAST({de_partition} AS INTEGER) AS de_partition,
current_date AS de_ingestion_date,
{dataset_datetime} AS de_dataset_datetime
FROM read_csv('{filepath}', header=true, delim=',');
"""
query_select = """
SELECT
*,
CAST(? AS INTEGER) AS de_partition,
current_date AS de_ingestion_date,
? AS de_dataset_datetime
FROM read_csv(?, header=true, delim=',');
"""
self.conn.execute(
query + query_select, (de_partition, dataset_datetime, str(filepath))
)
self.conn.execute(query)
return True

def close(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from pathlib import Path

from pipelines.tasks.client.duckdb_client import DuckDBClient
Expand All @@ -6,21 +7,19 @@
CACHE_FOLDER,
logger,
)
from pipelines.tasks.config.config_insee import get_insee_config


class InseeClient(HTTPSClient):
def __init__(
self, base_url="https://www.insee.fr/fr/statistiques/fichier/7766585/"
):
super().__init__(base_url)
self.config = get_insee_config()
class HTTPSToDuckDBClient(HTTPSClient):
def __init__(self, config):
super().__init__(config["source"]["base_url"])
self.config = config

def process_datasets(self):
"""Process the COG datasets"""
# Process data
logger.info("Launching processing of Insee communes")

os.makedirs(CACHE_FOLDER, exist_ok=True)
self.download_file_from_https(
path=self.config["source"]["id"],
filepath=Path(CACHE_FOLDER, self.config["file"]["file_name"]),
Expand All @@ -36,3 +35,4 @@ def process_datasets(self):
dataset_datetime=self.config["source"]["datetime"],
filepath=Path(CACHE_FOLDER, self.config["file"]["file_name"]),
)
# duckdb_client.close()
3 changes: 2 additions & 1 deletion pipelines/tasks/config/config_insee.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ def get_insee_config() -> dict:
"""Configuration for La Poste dataset"""
return {
"source": {
"base_url": "https://www.insee.fr/fr/statistiques/fichier/7766585/",
"id": "v_commune_2024.csv",
"datetime": "2024-02-20",
"datetime": "20240220",
},
"file": {
"file_name": "insee_communes_2024.csv",
Expand Down
7 changes: 4 additions & 3 deletions pipelines/tasks/config/config_laposte.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
def get_cog_config() -> dict:
def get_laposte_config() -> dict:
"""Configuration for La Poste dataset"""
return {
"source": {
"id": "8262de72-138f-4596-ad2f-10079e5f4d7c", # ID du fichier v_communes_2024.csv
"datetime": "2024-02-20",
"base_url": "https://datanova.laposte.fr/data-fair/api/v1/datasets/laposte-hexasmal/metadata-attachments/",
"id": "base-officielle-codes-postaux.csv",
"datetime": "20240220",
},
"file": {
"file_name": "laposte_communes_2024.csv",
Expand Down
95 changes: 0 additions & 95 deletions pipelines/utils/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import logging
from datetime import datetime
from pathlib import Path
from urllib.parse import urlparse

import duckdb
import requests

from pipelines.tasks.config.common import DUCKDB_FILE
from pipelines.tasks.config.config_edc import get_edc_config

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -47,94 +43,3 @@ def extract_dataset_datetime(url: str) -> str:
parsed_url = urlparse(metadata.get("location"))
path_parts = parsed_url.path.strip("/").split("/")
return path_parts[-2]


def get_edc_dataset_years_to_update(years: list) -> list:
"""
Return the list of EDC dataset's years that are no longer up to date
compared to the site www.data.gouv.fr
:param years: list of years to check
:return: list of years that are no longer up to date
"""
update_years = []

logger.info("Check that EDC dataset are up to date according to www.data.gouv.fr")

conn = duckdb.connect(DUCKDB_FILE)

for year in years:
logger.info(f" Check EDC dataset datetime for {year}")

edc_config = get_edc_config()
data_url = (
edc_config["source"]["base_url"]
+ edc_config["source"]["yearly_files_infos"][year]["id"]
)
files = edc_config["files"]

for file_info in files.values():
# Check database presence
query = """
SELECT COUNT(*)
FROM information_schema.tables
WHERE table_name = ?
;
"""
conn.execute(query, (file_info["table_name"],))
if conn.fetchone()[0] == 1:
# Check dataset year is present in the database
query = f"""
SELECT EXISTS (
SELECT 1
FROM {file_info["table_name"]}
WHERE de_partition = CAST(? as INTEGER)
);
"""
conn.execute(query, (year,))
if conn.fetchone()[0]:
# get dataset datetime
query = f"""
SELECT de_dataset_datetime
FROM {file_info["table_name"]}
WHERE de_partition = CAST(? as INTEGER)
;
"""
conn.execute(query, (year,))
current_dataset_datetime = conn.fetchone()[0]
logger.info(
f" Database - EDC dataset datetime: {current_dataset_datetime}"
)

format_str = "%Y%m%d-%H%M%S"
last_data_gouv_dataset_datetime = extract_dataset_datetime(data_url)
logger.info(
f" Datagouv - EDC dataset datetime: "
f"{last_data_gouv_dataset_datetime}"
)

last_data_gouv_dataset_datetime = datetime.strptime(
last_data_gouv_dataset_datetime, format_str
)
current_dataset_datetime = datetime.strptime(
current_dataset_datetime, format_str
)

if last_data_gouv_dataset_datetime > current_dataset_datetime:
update_years.append(year)
else:
logger.info(f" {year} doesn't exist in the database")
update_years.append(year)
else:
# EDC table will be created with process_edc_datasets
logger.info(" Database doesn't exists")
update_years.append(year)
# Only one check of a file is needed because the update is done for the whole
break

if update_years:
logger.info(f" EDC dataset update is necessary for {update_years}")
else:
logger.info(" All EDC dataset are already up to date")

conn.close()
return update_years

0 comments on commit 4c48b9e

Please sign in to comment.