1414// You should have received a copy of the GNU General Public License
1515// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
1616
17- use std:: { collections:: HashMap , fmt, sync:: Arc , collections:: BTreeMap } ;
17+ use std:: {
18+ collections:: { BTreeMap , HashMap } ,
19+ fmt,
20+ sync:: Arc ,
21+ time,
22+ } ;
1823use futures:: sync:: mpsc;
1924use parking_lot:: { Mutex , RwLock } ;
2025use serde:: { Serialize , de:: DeserializeOwned } ;
2126use txpool:: { self , Scoring , Readiness } ;
2227
28+ use error:: IntoPoolError ;
2329use listener:: Listener ;
30+ use rotator:: PoolRotator ;
2431use watcher:: Watcher ;
25- use error:: IntoPoolError ;
2632
2733use runtime_primitives:: { generic:: BlockId , traits:: Block as BlockT } ;
2834
@@ -40,16 +46,18 @@ pub type AllExtrinsics<A> = BTreeMap<<<A as ChainApi>::VEx as txpool::VerifiedTr
4046
4147/// Verified extrinsic struct. Wraps original extrinsic and verification info.
4248#[ derive( Debug ) ]
43- pub struct Verified < Ex : :: std :: fmt :: Debug , VEx : txpool :: VerifiedTransaction > {
49+ pub struct Verified < Ex , VEx > {
4450 /// Original extrinsic.
4551 pub original : Ex ,
4652 /// Verification data.
4753 pub verified : VEx ,
54+ /// Pool deadline, after it's reached we remove the extrinsic from the pool.
55+ pub valid_till : time:: Instant ,
4856}
4957
50- impl < Ex , VEx > txpool:: VerifiedTransaction for Verified < Ex , VEx >
51- where
52- Ex : :: std :: fmt:: Debug ,
58+ impl < Ex , VEx > txpool:: VerifiedTransaction for Verified < Ex , VEx >
59+ where
60+ Ex : fmt:: Debug ,
5361 VEx : txpool:: VerifiedTransaction ,
5462{
5563 type Hash = <VEx as txpool:: VerifiedTransaction >:: Hash ;
@@ -118,10 +126,17 @@ pub struct Ready<'a, 'b, B: 'a + ChainApi> {
118126 api : & ' a B ,
119127 at : & ' b BlockId < B :: Block > ,
120128 context : B :: Ready ,
129+ rotator : & ' a PoolRotator < B :: Hash > ,
130+ now : time:: Instant ,
121131}
122132
123133impl < ' a , ' b , B : ChainApi > txpool:: Ready < VerifiedFor < B > > for Ready < ' a , ' b , B > {
124134 fn is_ready ( & mut self , xt : & VerifiedFor < B > ) -> Readiness {
135+ if self . rotator . ban_if_stale ( & self . now , xt) {
136+ debug ! ( target: "extrinsic-pool" , "[{:?}] Banning as stale." , txpool:: VerifiedTransaction :: hash( xt) ) ;
137+ return Readiness :: Stale ;
138+ }
139+
125140 self . api . is_ready ( self . at , & mut self . context , xt)
126141 }
127142}
@@ -155,6 +170,11 @@ impl<T: ChainApi> Scoring<VerifiedFor<T>> for ScoringAdapter<T> {
155170 }
156171}
157172
173+ /// Maximum time the transaction will be kept in the pool.
174+ ///
175+ /// Transactions that don't get included within the limit are removed from the pool.
176+ const POOL_TIME : time:: Duration = time:: Duration :: from_secs ( 60 * 5 ) ;
177+
158178/// Extrinsics pool.
159179pub struct Pool < B : ChainApi > {
160180 api : B ,
@@ -164,6 +184,7 @@ pub struct Pool<B: ChainApi> {
164184 Listener < B :: Hash > ,
165185 > > ,
166186 import_notification_sinks : Mutex < Vec < mpsc:: UnboundedSender < ( ) > > > ,
187+ rotator : PoolRotator < B :: Hash > ,
167188}
168189
169190impl < B : ChainApi > Pool < B > {
@@ -173,6 +194,7 @@ impl<B: ChainApi> Pool<B> {
173194 pool : RwLock :: new ( txpool:: Pool :: new ( Listener :: default ( ) , ScoringAdapter :: < B > ( Default :: default ( ) ) , options) ) ,
174195 import_notification_sinks : Default :: default ( ) ,
175196 api,
197+ rotator : Default :: default ( ) ,
176198 }
177199 }
178200
@@ -206,19 +228,28 @@ impl<B: ChainApi> Pool<B> {
206228 {
207229 xts
208230 . into_iter ( )
209- . map ( |xt| ( self . api . verify_transaction ( at, & xt) , xt) )
231+ . map ( |xt| {
232+ match self . api . verify_transaction ( at, & xt) {
233+ Ok ( ref verified) if self . rotator . is_banned ( txpool:: VerifiedTransaction :: hash ( verified) ) => {
234+ return ( Err ( txpool:: Error :: from ( "Temporarily Banned" . to_owned ( ) ) . into ( ) ) , xt)
235+ } ,
236+ result => ( result, xt) ,
237+ }
238+ } )
210239 . map ( |( v, xt) | {
211- let xt = Verified { original : xt, verified : v? } ;
240+ let xt = Verified {
241+ original : xt,
242+ verified : v?,
243+ valid_till : time:: Instant :: now ( ) + POOL_TIME ,
244+ } ;
212245 Ok ( self . pool . write ( ) . import ( xt) ?)
213246 } )
214247 . collect ( )
215248 }
216249
217250 /// Imports one unverified extrinsic to the pool
218251 pub fn submit_one ( & self , at : & BlockId < B :: Block > , xt : ExtrinsicFor < B > ) -> Result < Arc < VerifiedFor < B > > , B :: Error > {
219- let v = self . api . verify_transaction ( at, & xt) ?;
220- let xt = Verified { original : xt, verified : v } ;
221- Ok ( self . pool . write ( ) . import ( xt) ?)
252+ Ok ( self . submit_at ( at, :: std:: iter:: once ( xt) ) ?. pop ( ) . expect ( "One extrinsic passed; one result returned; qed" ) )
222253 }
223254
224255 /// Import a single extrinsic and starts to watch their progress in the pool.
@@ -244,7 +275,8 @@ impl<B: ChainApi> Pool<B> {
244275 senders : Option < & [ <B :: VEx as txpool:: VerifiedTransaction >:: Sender ] > ,
245276 ) -> usize
246277 {
247- let ready = Ready { api : & self . api , context : self . api . ready ( ) , at } ;
278+ self . rotator . clear_timeouts ( & time:: Instant :: now ( ) ) ;
279+ let ready = self . ready ( at) ;
248280 self . pool . write ( ) . cull ( senders, ready)
249281 }
250282
@@ -284,9 +316,9 @@ impl<B: ChainApi> Pool<B> {
284316 pub fn pending < F , T > ( & self , at : & BlockId < B :: Block > , f : F ) -> T where
285317 F : FnOnce ( txpool:: PendingIterator < VerifiedFor < B > , Ready < B > , ScoringAdapter < B > , Listener < B :: Hash > > ) -> T ,
286318 {
287- let ready = Ready { api : & self . api , context : self . api . ready ( ) , at } ;
319+ let ready = self . ready ( at ) ;
288320 f ( self . pool . read ( ) . pending ( ready) )
289- }
321+ }
290322
291323 /// Retry to import all verified transactions from given sender.
292324 pub fn retry_verification ( & self , at : & BlockId < B :: Block > , sender : <B :: VEx as txpool:: VerifiedTransaction >:: Sender ) -> Result < ( ) , B :: Error > {
@@ -326,6 +358,16 @@ impl<B: ChainApi> Pool<B> {
326358 map
327359 } )
328360 }
361+
362+ fn ready < ' a , ' b > ( & ' a self , at : & ' b BlockId < B :: Block > ) -> Ready < ' a , ' b , B > {
363+ Ready {
364+ api : & self . api ,
365+ rotator : & self . rotator ,
366+ context : self . api . ready ( ) ,
367+ at,
368+ now : time:: Instant :: now ( ) ,
369+ }
370+ }
329371}
330372
331373 /// A Readiness implementation that returns `Ready` for all transactions.
@@ -337,7 +379,7 @@ impl<VEx> txpool::Ready<VEx> for AlwaysReady {
337379}
338380
339381#[ cfg( test) ]
340- mod tests {
382+ pub mod tests {
341383 use txpool;
342384 use super :: { VerifiedFor , ExtrinsicFor } ;
343385 use std:: collections:: HashMap ;
@@ -353,9 +395,9 @@ mod tests {
353395
354396 #[ derive( Clone , Debug ) ]
355397 pub struct VerifiedTransaction {
356- hash : Hash ,
357- sender : AccountId ,
358- nonce : u64 ,
398+ pub hash : Hash ,
399+ pub sender : AccountId ,
400+ pub nonce : u64 ,
359401 }
360402
361403 impl txpool:: VerifiedTransaction for VerifiedTransaction {
@@ -419,7 +461,7 @@ mod tests {
419461
420462 result
421463 }
422-
464+
423465 fn ready ( & self ) -> Self :: Ready {
424466 HashMap :: default ( )
425467 }
0 commit comments