1
1
from threading import Timer
2
- from multiprocessing import current_process
3
2
import re
4
3
import numpy as np
5
4
import pandas as pd
6
5
from abc import (ABC ,
7
6
abstractmethod )
8
7
from typing import List
9
8
from time import perf_counter
9
+
10
10
from app .scrappers .scrapping_exceptions import (ProcessException ,
11
11
ReworkException ,
12
12
ScrapException ,
@@ -24,7 +24,6 @@ class MeteoScrapper(ABC):
24
24
PROGRESS_TIMER_INTERVAL = 10 # en secondes
25
25
26
26
def __init__ (self ):
27
-
28
27
self .errors = dict ()
29
28
# date de départ de lancement des jobs
30
29
self ._start = 0
@@ -34,19 +33,16 @@ def __init__(self):
34
33
self ._todo = 0
35
34
# % de jobs traités
36
35
self ._progress = 0
37
- # vitesse en % / s
38
- self ._speed = 0
39
36
40
37
def _update (self ):
41
38
self ._done += 1
42
39
self ._progress = round (self ._done / self ._todo * 100 , 0 )
43
- self ._speed = round (self ._progress / perf_counter () - self ._start , 0 )
44
40
45
- def _print_progress (self , should_stop = False ) -> None :
46
- print (f"{ self . __class__ . __name__ } ( { current_process (). pid } ) - { self ._progress } % - { round (perf_counter () - self ._start , 0 )} s \n " )
41
+ def _print_progress (self , uc : ScrapperUC , should_stop = False ) -> None :
42
+ print (f"{ uc } - { self ._progress } % - { round (perf_counter () - self ._start , 0 )} s \n " )
47
43
48
44
if not should_stop :
49
- timer = Timer (self .PROGRESS_TIMER_INTERVAL , self ._print_progress )
45
+ timer = Timer (self .PROGRESS_TIMER_INTERVAL , self ._print_progress , [ uc ] )
50
46
timer .daemon = True
51
47
timer .start ()
52
48
@@ -107,11 +103,25 @@ def scrap_from_uc(self, uc: ScrapperUC):
107
103
108
104
self ._todo = sum ([1 for _ in uc .to_tps ()])
109
105
self ._start = perf_counter ()
110
- self ._print_progress ()
106
+ self ._print_progress (uc )
111
107
112
108
for tp in uc .to_tps ():
109
+ html_loading_trials = 3
110
+ html_data = None
111
+ while html_data is None and html_loading_trials > 0 :
112
+ try :
113
+ if html_loading_trials != 3 :
114
+ print ("retrying..." )
115
+ html_data = self ._load_html (tp )
116
+ except ProcessException as e :
117
+ html_loading_trials -= 1
118
+
119
+ if html_data is None :
120
+ self .errors [tp .key ] = {"url" : tp .url ,
121
+ "erreur" : str (HtmlPageException ())}
122
+ self ._update ()
123
+ continue
113
124
try :
114
- html_data = self ._load_html (tp )
115
125
col_names = self ._scrap_columns_names (html_data )
116
126
values = self ._scrap_columns_values (html_data )
117
127
local_df = self ._rework_data (values ,
@@ -120,16 +130,17 @@ def scrap_from_uc(self, uc: ScrapperUC):
120
130
except ProcessException as e :
121
131
122
132
self .errors [tp .key ] = {"url" : tp .url ,
123
- "error " : str (e )}
133
+ "erreur " : str (e )}
124
134
self ._update ()
125
135
continue
126
136
127
137
global_df = pd .concat ([global_df , local_df ])
128
138
self ._update ()
129
139
130
140
global_df .sort_values (by = "date" )
141
+ global_df = global_df [["date" ] + [x for x in global_df .columns if x != "date" ]]
131
142
132
- self ._print_progress (should_stop = True )
143
+ self ._print_progress (uc , should_stop = True )
133
144
134
145
return global_df
135
146
@@ -643,14 +654,133 @@ def _rework_data(self,
643
654
644
655
class OgimetHourly (MeteoScrapper ):
645
656
657
+ REGEX_FOR_DATES = r'\d+/\d+/\d+'
658
+
646
659
def _scrap_columns_names (self , table : Element ) -> "List[str]" :
647
- pass
660
+ try :
661
+ col_names = [th .text for th in table .find ("tr" )[0 ]
662
+ .find ("th" )]
663
+ except IndexError :
664
+ raise ScrapException ()
665
+
666
+ col_names = ["_" .join (colname .split ("\n " )) for colname in col_names ]
667
+ col_names = [colname .lower ()
668
+ .replace ("(c)" , "°C" )
669
+ .replace ("(mm)" , "mm" )
670
+ .replace (" " , "_" )
671
+ for colname in col_names ]
672
+
673
+ specific_index = col_names .index ("date" )
674
+ col_names .insert (specific_index + 1 , "time" )
675
+
676
+ return col_names
648
677
649
678
def _scrap_columns_values (self , table : Element ) -> "List[str]" :
650
- pass
651
679
652
- def _rework_data (self , values : "List[str]" , columns_names : "List[str]" , tp : TaskParameters ) -> pd .DataFrame :
653
- pass
680
+ values = [td .text
681
+ for tr in table .find ("tr" )[1 :- 1 ]
682
+ for td in tr .find ("td" )]
683
+
684
+ return values
685
+
686
+ def _rework_data (self ,
687
+ values : "List[str]" ,
688
+ columns_names : "List[str]" ,
689
+ tp : TaskParameters ) -> pd .DataFrame :
690
+
691
+ n_cols = len (columns_names )
692
+ values = self ._fill_partial_rows (values , n_cols )
693
+
694
+ df = pd .DataFrame (np .array (values )
695
+ .reshape (- 1 , n_cols ),
696
+ columns = columns_names )
697
+
698
+ df = df [[x for x in df .columns if x not in ["ww" , "w1" , "w2" ]]]
699
+
700
+ try :
701
+ df ["datetime" ] = df ["date" ] + ":" + df ["time" ]
702
+ except : # exception inconnue levée parfois
703
+ df ["datetime" ] = []
704
+
705
+ df = df .drop (["date" , "time" ], axis = "columns" )\
706
+ .rename (columns = {"datetime" : "date" })
707
+
708
+ df ["prec_mm" ] = ["" if "--" in x
709
+ else "_" .join (x .split ("\n " ))
710
+ for x in df ["prec_mm" ].values ]
711
+
712
+ start_day = [int (x .split ("=" )[1 ])
713
+ for x in tp .url .split ("&" )
714
+ if x .startswith ("day" )][0 ]
715
+
716
+ n_days = [int (x .split ("=" )[1 ])
717
+ for x in tp .url .split ("&" )
718
+ if x .startswith ("ndays" )][0 ]
719
+
720
+ times = [f"0{ x } :00" if x < 10 else f"{ x } :00" for x in range (0 , 24 )]
721
+
722
+ expected_dates = [f"{ tp .month_as_str } /0{ start_day - x } /{ tp .year_as_str } " if start_day - x < 10
723
+ else f"{ tp .month_as_str } /{ start_day - x } /{ tp .year_as_str } "
724
+ for x in range (0 , n_days )]
725
+
726
+ expected_datetimes = [f"{ expected_date } :{ time } "
727
+ for time in times
728
+ for expected_date in expected_dates ]
729
+
730
+ actual_datetimes = df ["date" ].values
731
+ missing_datetimes = [x for x in expected_datetimes if x not in actual_datetimes ]
732
+
733
+ for missing_datetime in missing_datetimes :
734
+ row = pd .DataFrame (np .array (["" ] * len (df .columns ))
735
+ .reshape (- 1 , len (df .columns )),
736
+ columns = df .columns )
737
+ row .loc [0 , ["date" ]] = missing_datetime
738
+ df = pd .concat ([df , row ])
739
+
740
+ df = df .reset_index (drop = True )
741
+
742
+ numeric_columns = [x for x in df .columns if x not in ["date" , "ddd" , "prec_mm" ]]
743
+ for numeric_column in numeric_columns :
744
+ df [numeric_column ] = pd .to_numeric (df [numeric_column ],
745
+ errors = "coerce" )
746
+
747
+ df ["date" ] = pd .to_datetime (df ["date" ],
748
+ format = "%m/%d/%Y:%H:%M" )
749
+ df = df .sort_values (by = "date" )
750
+
751
+ return df
752
+
753
+ @classmethod
754
+ def _fill_partial_rows (cls ,
755
+ values : "List[str]" ,
756
+ n_cols : int ) -> "List[str]" :
757
+
758
+ has_complete_lines = len (values ) % n_cols == 0
759
+
760
+ if len (values ) == 0 or has_complete_lines :
761
+ return values
762
+
763
+ done = []
764
+ while not has_complete_lines :
765
+
766
+ row = values [:n_cols ]
767
+ dates = [x
768
+ for x in row
769
+ if re .search (cls .REGEX_FOR_DATES , x ) is not None ]
770
+
771
+ if len (dates ) > 1 :
772
+ index = row .index (dates [1 ])
773
+ row = row [:index ]
774
+
775
+ row_length = len (row )
776
+ row += ["" ] * (n_cols - row_length )
777
+ done += row
778
+ values = values [row_length :]
779
+
780
+ has_complete_lines = len (values ) == 0 \
781
+ and len (done ) % n_cols == 0
782
+
783
+ return done
654
784
655
785
656
786
class WundergroundDaily (MeteoScrapper ):
0 commit comments