@@ -8,30 +8,52 @@ use std::fs;
88use std:: io:: { self , Write } ;
99use std:: path;
1010use tar:: Archive ;
11+ use thiserror:: Error ;
1112
1213use crate :: entities:: * ;
1314
1415#[ cfg( test) ]
1516use mockall:: automock;
1617
18+ #[ derive( Error , Debug ) ]
19+ pub enum AggregatorHandlerError {
20+ #[ error( "remote server technical error: '{0}'" ) ]
21+ RemoteServerTechnicalError ( String ) ,
22+ #[ error( "remote server logical error: '{0}'" ) ]
23+ RemoteServerLogicalError ( String ) ,
24+ #[ error( "aggregator server unreachable: '{0}'" ) ]
25+ UnreachableAggregatorServer ( String ) ,
26+ #[ error( "json parsing failed: '{0}'" ) ]
27+ JsonParseFailed ( String ) ,
28+ #[ error( "io error:" ) ]
29+ IOError ( #[ from] io:: Error ) ,
30+ }
31+
1732/// AggregatorHandler represents a read interactor with an aggregator
1833#[ cfg_attr( test, automock) ]
1934#[ async_trait]
2035pub trait AggregatorHandler {
2136 /// List snapshots
22- async fn list_snapshots ( & self ) -> Result < Vec < Snapshot > , String > ;
37+ async fn list_snapshots ( & self ) -> Result < Vec < Snapshot > , AggregatorHandlerError > ;
2338
2439 /// Get snapshot details
25- async fn get_snapshot_details ( & self , digest : & str ) -> Result < Snapshot , String > ;
40+ async fn get_snapshot_details ( & self , digest : & str ) -> Result < Snapshot , AggregatorHandlerError > ;
2641
2742 /// Download snapshot
28- async fn download_snapshot ( & self , digest : & str , location : & str ) -> Result < String , String > ;
43+ async fn download_snapshot (
44+ & self ,
45+ digest : & str ,
46+ location : & str ,
47+ ) -> Result < String , AggregatorHandlerError > ;
2948
3049 /// Unpack snapshot
31- async fn unpack_snapshot ( & self , digest : & str ) -> Result < String , String > ;
50+ async fn unpack_snapshot ( & self , digest : & str ) -> Result < String , AggregatorHandlerError > ;
3251
3352 /// Get certificate details
34- async fn get_certificate_details ( & self , certificate_hash : & str ) -> Result < Certificate , String > ;
53+ async fn get_certificate_details (
54+ & self ,
55+ certificate_hash : & str ,
56+ ) -> Result < Certificate , AggregatorHandlerError > ;
3557}
3658
3759/// AggregatorHTTPClient is a http client for an aggregator
@@ -54,62 +76,76 @@ impl AggregatorHTTPClient {
5476#[ async_trait]
5577impl AggregatorHandler for AggregatorHTTPClient {
5678 /// List snapshots
57- async fn list_snapshots ( & self ) -> Result < Vec < Snapshot > , String > {
79+ async fn list_snapshots ( & self ) -> Result < Vec < Snapshot > , AggregatorHandlerError > {
5880 debug ! ( "List snapshots" ) ;
5981 let url = format ! ( "{}/snapshots" , self . aggregator_endpoint) ;
6082 let response = reqwest:: get ( url. clone ( ) ) . await ;
6183 match response {
6284 Ok ( response) => match response. status ( ) {
6385 StatusCode :: OK => match response. json :: < Vec < Snapshot > > ( ) . await {
6486 Ok ( snapshots) => Ok ( snapshots) ,
65- Err ( err) => Err ( err. to_string ( ) ) ,
87+ Err ( err) => Err ( AggregatorHandlerError :: JsonParseFailed ( err. to_string ( ) ) ) ,
6688 } ,
67- status_error => Err ( format ! ( "error {} received" , status_error) ) ,
89+ status_error => Err ( AggregatorHandlerError :: RemoteServerTechnicalError (
90+ status_error. to_string ( ) ,
91+ ) ) ,
6892 } ,
69- Err ( err) => Err ( err. to_string ( ) ) ,
93+ Err ( err) => Err ( AggregatorHandlerError :: UnreachableAggregatorServer (
94+ err. to_string ( ) ,
95+ ) ) ,
7096 }
7197 }
7298
7399 /// Get snapshot details
74- async fn get_snapshot_details ( & self , digest : & str ) -> Result < Snapshot , String > {
100+ async fn get_snapshot_details ( & self , digest : & str ) -> Result < Snapshot , AggregatorHandlerError > {
75101 debug ! ( "Details snapshot {}" , digest) ;
76102 let url = format ! ( "{}/snapshot/{}" , self . aggregator_endpoint, digest) ;
77103 let response = reqwest:: get ( url. clone ( ) ) . await ;
78104 match response {
79105 Ok ( response) => match response. status ( ) {
80106 StatusCode :: OK => match response. json :: < Snapshot > ( ) . await {
81107 Ok ( snapshot) => Ok ( snapshot) ,
82- Err ( err) => Err ( err. to_string ( ) ) ,
108+ Err ( err) => Err ( AggregatorHandlerError :: JsonParseFailed ( err. to_string ( ) ) ) ,
83109 } ,
84- StatusCode :: NOT_FOUND => Err ( "Snapshot not found" . to_string ( ) ) ,
85- status_error => Err ( format ! ( "error {} received" , status_error) ) ,
110+ StatusCode :: NOT_FOUND => Err ( AggregatorHandlerError :: RemoteServerLogicalError (
111+ "snapshot not found" . to_string ( ) ,
112+ ) ) ,
113+ status_error => Err ( AggregatorHandlerError :: RemoteServerTechnicalError (
114+ status_error. to_string ( ) ,
115+ ) ) ,
86116 } ,
87- Err ( err) => Err ( err. to_string ( ) ) ,
117+ Err ( err) => Err ( AggregatorHandlerError :: UnreachableAggregatorServer (
118+ err. to_string ( ) ,
119+ ) ) ,
88120 }
89121 }
90122
91123 /// Download Snapshot
92- async fn download_snapshot ( & self , digest : & str , location : & str ) -> Result < String , String > {
124+ async fn download_snapshot (
125+ & self ,
126+ digest : & str ,
127+ location : & str ,
128+ ) -> Result < String , AggregatorHandlerError > {
93129 debug ! ( "Download snapshot {} from {}" , digest, location) ;
94130 let response = reqwest:: get ( location) . await ;
95131 match response {
96132 Ok ( response) => match response. status ( ) {
97133 StatusCode :: OK => {
98134 let local_path = archive_file_path ( digest, & self . network ) ?;
99- fs:: create_dir_all ( & local_path. parent ( ) . unwrap ( ) )
100- . map_err ( |e| format ! ( "can't create snapshot dir: {}" , e ) ) ?;
101- let mut local_file = fs :: File :: create ( & local_path )
102- . map_err ( |e| format ! ( "can't access snapshot file: {}" , e ) ) ? ;
103- let bytes_total = response
104- . content_length ( )
105- . ok_or_else ( || "can't get content length" . to_string ( ) ) ?;
135+ fs:: create_dir_all ( & local_path. parent ( ) . unwrap ( ) ) ? ;
136+ let mut local_file = fs :: File :: create ( & local_path ) ?;
137+ let bytes_total = response . content_length ( ) . ok_or_else ( || {
138+ AggregatorHandlerError :: RemoteServerTechnicalError (
139+ "can't get content length" . to_string ( ) ,
140+ )
141+ } ) ?;
106142 let mut bytes_downloaded = 0 ;
107143 let mut remote_stream = response. bytes_stream ( ) ;
108144 while let Some ( item) = remote_stream. next ( ) . await {
109- let chunk = item. map_err ( |e| format ! ( "download failed: {}" , e ) ) ? ;
110- local_file
111- . write_all ( & chunk )
112- . map_err ( |e| format ! ( "can't write to snapshot file: {}" , e ) ) ?;
145+ let chunk = item. map_err ( |e| {
146+ AggregatorHandlerError :: RemoteServerTechnicalError ( e . to_string ( ) )
147+ } ) ? ;
148+ local_file . write_all ( & chunk ) ?;
113149 bytes_downloaded += chunk. len ( ) as u64 ;
114150 print ! (
115151 "Downloaded {}% - {}/{} Bytes\r " ,
@@ -121,31 +157,37 @@ impl AggregatorHandler for AggregatorHTTPClient {
121157 }
122158 Ok ( local_path. into_os_string ( ) . into_string ( ) . unwrap ( ) )
123159 }
124- StatusCode :: NOT_FOUND => Err ( "snapshot archive not found" . to_string ( ) ) ,
125- status_error => Err ( format ! ( "error {} received" , status_error) ) ,
160+ StatusCode :: NOT_FOUND => Err ( AggregatorHandlerError :: RemoteServerLogicalError (
161+ "snapshot archive not found" . to_string ( ) ,
162+ ) ) ,
163+ status_error => Err ( AggregatorHandlerError :: RemoteServerTechnicalError (
164+ status_error. to_string ( ) ,
165+ ) ) ,
126166 } ,
127- Err ( err) => Err ( err. to_string ( ) ) ,
167+ Err ( err) => Err ( AggregatorHandlerError :: UnreachableAggregatorServer (
168+ err. to_string ( ) ,
169+ ) ) ,
128170 }
129171 }
130172
131173 /// Unpack snapshot
132- async fn unpack_snapshot ( & self , digest : & str ) -> Result < String , String > {
174+ async fn unpack_snapshot ( & self , digest : & str ) -> Result < String , AggregatorHandlerError > {
133175 debug ! ( "Unpack snapshot {}" , digest) ;
134176 println ! ( "Unpacking snapshot..." ) ;
135177 let local_path = archive_file_path ( digest, & self . network ) ?;
136- let snapshot_file_tar_gz = fs:: File :: open ( local_path. clone ( ) )
137- . map_err ( |e| format ! ( "can't open snapshot file: {}" , e) ) ?;
178+ let snapshot_file_tar_gz = fs:: File :: open ( local_path. clone ( ) ) ?;
138179 let snapshot_file_tar = GzDecoder :: new ( snapshot_file_tar_gz) ;
139180 let unpack_dir_path = local_path. parent ( ) . unwrap ( ) . join ( path:: Path :: new ( "db" ) ) ;
140181 let mut snapshot_archive = Archive :: new ( snapshot_file_tar) ;
141- snapshot_archive
142- . unpack ( & unpack_dir_path)
143- . map_err ( |e| format ! ( "can't unpack snapshot archive: {}" , e) ) ?;
182+ snapshot_archive. unpack ( & unpack_dir_path) ?;
144183 Ok ( unpack_dir_path. into_os_string ( ) . into_string ( ) . unwrap ( ) )
145184 }
146185
147186 /// Get certificate details
148- async fn get_certificate_details ( & self , certificate_hash : & str ) -> Result < Certificate , String > {
187+ async fn get_certificate_details (
188+ & self ,
189+ certificate_hash : & str ,
190+ ) -> Result < Certificate , AggregatorHandlerError > {
149191 debug ! ( "Details certificate {}" , certificate_hash) ;
150192 let url = format ! (
151193 "{}/certificate/{}" ,
@@ -156,24 +198,28 @@ impl AggregatorHandler for AggregatorHTTPClient {
156198 Ok ( response) => match response. status ( ) {
157199 StatusCode :: OK => match response. json :: < Certificate > ( ) . await {
158200 Ok ( certificate) => Ok ( certificate) ,
159- Err ( err) => Err ( err. to_string ( ) ) ,
201+ Err ( err) => Err ( AggregatorHandlerError :: JsonParseFailed ( err. to_string ( ) ) ) ,
160202 } ,
161- StatusCode :: NOT_FOUND => Err ( "Certificate not found" . to_string ( ) ) ,
162- status_error => Err ( format ! ( "error {} received" , status_error) ) ,
203+ StatusCode :: NOT_FOUND => Err ( AggregatorHandlerError :: RemoteServerLogicalError (
204+ "certificate not found" . to_string ( ) ,
205+ ) ) ,
206+ status_error => Err ( AggregatorHandlerError :: RemoteServerTechnicalError (
207+ status_error. to_string ( ) ,
208+ ) ) ,
163209 } ,
164- Err ( err) => Err ( err. to_string ( ) ) ,
210+ Err ( err) => Err ( AggregatorHandlerError :: UnreachableAggregatorServer (
211+ err. to_string ( ) ,
212+ ) ) ,
165213 }
166214 }
167215}
168216
169217/// Computes local archive filepath
170- fn archive_file_path ( digest : & str , network : & str ) -> Result < path:: PathBuf , String > {
171- Ok ( env:: current_dir ( )
172- . map_err ( |e| format ! ( "current dir not available: {}" , e) ) ?
173- . join ( path:: Path :: new ( & format ! (
174- "data/{}/{}/snapshot.archive.tar.gz" ,
175- network, digest
176- ) ) ) )
218+ fn archive_file_path ( digest : & str , network : & str ) -> Result < path:: PathBuf , AggregatorHandlerError > {
219+ Ok ( env:: current_dir ( ) ?. join ( path:: Path :: new ( & format ! (
220+ "data/{}/{}/snapshot.archive.tar.gz" ,
221+ network, digest
222+ ) ) ) )
177223}
178224
179225#[ cfg( test) ]
0 commit comments