1
1
from __future__ import annotations
2
2
3
- from datetime import datetime , timedelta # noqa: H301
3
+ import contextlib
4
+ import datetime as dtm
4
5
import functools
5
6
import operator
6
7
import typing as t
17
18
_SORT_KEY_START = operator .attrgetter (_F_START )
18
19
19
20
21
+ def _now (tz : dtm .timezone = dtm .UTC ) -> dtm .datetime :
22
+ return dtm .datetime .now (tz = tz )
23
+
24
+
25
+ def utcnow () -> dtm .datetime :
26
+ return dtm .datetime .now (tz = dtm .UTC )
27
+
28
+
20
29
def _jumping_sequence (length : int ) -> t .Generator [int , None , None ]:
21
30
middle , tail = divmod (length , 2 )
22
31
for left , right in zip (range (middle - 1 , - 1 , - 1 ),
@@ -27,66 +36,72 @@ def _jumping_sequence(length: int) -> t.Generator[int, None, None]:
27
36
yield length - 1
28
37
29
38
30
- def Tuple (start : datetime , end : datetime ) -> _T_DT_PAIR :
39
+ def Tuple (start : dtm . datetime , end : dtm . datetime ) -> _T_DT_PAIR :
31
40
return start , end
32
41
33
42
34
43
class PeriodProto (t .Protocol ):
35
- start : datetime
36
- end : datetime
44
+ start : dtm . datetime
45
+ end : dtm . datetime
37
46
38
47
39
- _T_DT_PAIR = t .Tuple [datetime , datetime ]
48
+ _T_DT_PAIR = t .Tuple [dtm . datetime , dtm . datetime ]
40
49
41
- _T_FACTORY = t .Callable [[datetime , datetime ], t .Any ]
42
- _T_FACTORY_RESULT = t .Union [PeriodProto , t .Tuple [datetime , datetime ]]
43
- _T_FACTORY_RESULT_OPT = t .Union [PeriodProto , t .Tuple [datetime , datetime ], None ]
50
+ _T_FACTORY = t .Callable [[dtm . datetime , dtm . datetime ], t .Any ]
51
+ _T_FACTORY_RESULT = t .Union [PeriodProto , t .Tuple [dtm . datetime , dtm . datetime ]]
52
+ _T_FACTORY_RESULT_OPT = t .Union [PeriodProto , t .Tuple [dtm . datetime , dtm . datetime ], None ]
44
53
45
54
46
55
class Period :
47
56
48
- start : datetime
49
- end : datetime
57
+ start : dtm . datetime
58
+ end : dtm . datetime
50
59
51
60
__slots__ = (_F_START , _F_END , _F__DURATION )
52
61
53
- def __init__ (self , start : datetime , end : datetime ):
62
+ def __init__ (self , start : dtm . datetime , end : dtm . datetime ):
54
63
validate_edges (start , end )
55
64
object .__setattr__ (self , _F_START , start )
56
65
object .__setattr__ (self , _F_END , end )
57
66
58
- def __set_duration (self ) -> timedelta :
67
+ def __set_duration (self ) -> dtm . timedelta :
59
68
duration = self .end - self .start
60
69
object .__setattr__ (self , _F__DURATION , duration )
61
70
return duration
62
71
63
72
@property
64
- def duration (self ) -> timedelta :
73
+ def duration (self ) -> dtm . timedelta :
65
74
try :
66
75
return getattr (self , _F__DURATION )
67
76
except AttributeError :
68
77
return self .__set_duration ()
69
78
70
79
@classmethod
71
- def load_edges (cls , start : datetime , end : datetime ) -> Period :
80
+ def load_edges (cls , start : dtm . datetime , end : dtm . datetime ) -> Period :
72
81
"""Unsafe load Period from edges without edge validation"""
73
82
inst = cls .__new__ (cls )
74
83
object .__setattr__ (inst , _F_START , start )
75
84
object .__setattr__ (inst , _F_END , end )
76
85
return inst
77
86
78
87
@classmethod
79
- def from_start (cls , start : datetime , duration : timedelta ) -> Period :
88
+ def from_start (cls , start : dtm . datetime , duration : dtm . timedelta ) -> Period :
80
89
"""Make a Period from start and duration"""
81
90
82
91
return cls (start , start + duration )
83
92
84
93
@classmethod
85
- def from_end (cls , end : datetime , duration : timedelta ) -> Period :
94
+ def from_end (cls , end : dtm . datetime , duration : dtm . timedelta ) -> Period :
86
95
"""Make a Period from end and duration"""
87
96
88
97
return cls (end - duration , end )
89
98
99
+ @classmethod
100
+ def record (cls , start : dtm .datetime ) -> Period :
101
+ """Make a Period from start and now()"""
102
+
103
+ return cls (start , _now (tz = t .cast (dtm .timezone , start .tzinfo )))
104
+
90
105
def __setattr__ (self , key : str , value : t .Any ) -> None :
91
106
raise NotImplementedError ("method not allowed" )
92
107
@@ -121,8 +136,8 @@ def __deepcopy__(self, memo): # TODO(d.burmistrov)
121
136
return memo [self ]
122
137
123
138
def replace (self ,
124
- start : t .Optional [datetime ] = None ,
125
- end : t .Optional [datetime ] = None ,
139
+ start : t .Optional [dtm . datetime ] = None ,
140
+ end : t .Optional [dtm . datetime ] = None ,
126
141
) -> Period :
127
142
"""Return Period with new specified fields."""
128
143
@@ -144,12 +159,12 @@ def as_args(self) -> _T_DT_PAIR:
144
159
def as_tuple (self ): # TOD
145
160
return as_tuple (self )
146
161
147
- def as_kwargs (self ) -> dict [str , datetime ]:
162
+ def as_kwargs (self ) -> dict [str , dtm . datetime ]:
148
163
"""Return a dictionary of edges"""
149
164
150
165
return dict (start = self .start , end = self .end )
151
166
152
- def as_dict (self ) -> dict [str , datetime | timedelta ]:
167
+ def as_dict (self ) -> dict [str , dtm . datetime | dtm . timedelta ]:
153
168
"""Return a dictionary of edges and durations"""
154
169
155
170
return dict (start = self .start , end = self .end , duration = self .duration )
@@ -243,7 +258,7 @@ def ascend_start(*periods: PeriodProto,
243
258
244
259
# validation
245
260
246
- def validate_edges (start : datetime , end : datetime ) -> None :
261
+ def validate_edges (start : dtm . datetime , end : dtm . datetime ) -> None :
247
262
f"""Validate period edges
248
263
249
264
Exception will be raised for invalid data.
@@ -256,9 +271,9 @@ def validate_edges(start: datetime, end: datetime) -> None:
256
271
"""
257
272
258
273
# types
259
- if not isinstance (start , datetime ):
274
+ if not isinstance (start , dtm . datetime ):
260
275
raise TypeError (f"'{ _F_START } ' must be datetime: '{ type (start )} '" )
261
- elif not isinstance (end , datetime ):
276
+ elif not isinstance (end , dtm . datetime ):
262
277
raise TypeError (f"'{ _F_END } ' must be datetime: '{ type (end )} '" )
263
278
264
279
# timezones
@@ -292,14 +307,14 @@ def validate_period(period: PeriodProto) -> None:
292
307
293
308
# ~set proto
294
309
295
- def contains (period : PeriodProto , item : datetime | PeriodProto ) -> bool :
310
+ def contains (period : PeriodProto , item : dtm . datetime | PeriodProto ) -> bool :
296
311
"""Report whether period contains another period or timestamp
297
312
298
313
:param period: period-like object
299
314
:param item: timestamp or period-like object
300
315
"""
301
316
302
- if isinstance (item , datetime ):
317
+ if isinstance (item , dtm . datetime ):
303
318
return period .start <= item <= period .end
304
319
305
320
return (period .start <= item .start ) and (item .end <= period .end )
@@ -445,10 +460,10 @@ def difference(period: PeriodProto,
445
460
# I. "p + timedelta"
446
461
# II. "p1 + p2"
447
462
def add (period : PeriodProto ,
448
- other : PeriodProto | timedelta ,
463
+ other : PeriodProto | dtm . timedelta ,
449
464
factory : _T_FACTORY = Period ,
450
465
) -> _T_FACTORY_RESULT_OPT :
451
- if not isinstance (other , timedelta ):
466
+ if not isinstance (other , dtm . timedelta ):
452
467
return join (period , other , factory = factory )
453
468
454
469
secs = other .total_seconds ()
@@ -466,10 +481,10 @@ def add(period: PeriodProto,
466
481
# I. "p - timedelta"
467
482
# II. "p1 - p2"
468
483
def sub (period : PeriodProto ,
469
- other : PeriodProto | timedelta ,
484
+ other : PeriodProto | dtm . timedelta ,
470
485
factory : _T_FACTORY = Period ,
471
486
) -> _T_FACTORY_RESULT_OPT :
472
- if isinstance (other , timedelta ):
487
+ if isinstance (other , dtm . timedelta ):
473
488
return add (period , - other , factory = factory )
474
489
475
490
# TODO(d.burmistrov): extract this to `cut(period, other, *others, ...)`
@@ -496,24 +511,24 @@ def mul(period: PeriodProto, factor: int | float, factory: _T_FACTORY = Period,
496
511
return factory (start , period .end )
497
512
498
513
499
- def floordiv (period : PeriodProto , other : timedelta | int ,
500
- ) -> timedelta | int :
501
- if not isinstance (other , (timedelta , int )):
514
+ def floordiv (period : PeriodProto , other : dtm . timedelta | int ,
515
+ ) -> dtm . timedelta | int :
516
+ if not isinstance (other , (dtm . timedelta , int )):
502
517
raise NotImplementedError ()
503
518
504
519
return (period .end - period .start ) // other
505
520
506
521
507
- def mod (period : PeriodProto , other : timedelta ) -> timedelta :
508
- if not isinstance (other , timedelta ):
522
+ def mod (period : PeriodProto , other : dtm . timedelta ) -> dtm . timedelta :
523
+ if not isinstance (other , dtm . timedelta ):
509
524
raise NotImplementedError ()
510
525
511
526
return (period .end - period .start ) % other
512
527
513
528
514
- def truediv (period : PeriodProto , other : timedelta | int | float ,
515
- ) -> timedelta | float :
516
- if not isinstance (other , (timedelta , int , float )):
529
+ def truediv (period : PeriodProto , other : dtm . timedelta | int | float ,
530
+ ) -> dtm . timedelta | float :
531
+ if not isinstance (other , (dtm . timedelta , int , float )):
517
532
raise NotImplementedError ()
518
533
519
534
return (period .end - period .start ) / other
@@ -550,24 +565,24 @@ def eq(period: PeriodProto, other: PeriodProto, *others: PeriodProto) -> bool:
550
565
551
566
552
567
def lshift (period : PeriodProto ,
553
- delta : timedelta ,
568
+ delta : dtm . timedelta ,
554
569
factory : _T_FACTORY = Period ,
555
570
) -> _T_FACTORY_RESULT :
556
571
"""Shift left right by timedelta (p << delta)"""
557
572
558
- if not isinstance (delta , timedelta ):
573
+ if not isinstance (delta , dtm . timedelta ):
559
574
raise NotImplementedError ()
560
575
561
576
return factory (period .start - delta , period .end - delta )
562
577
563
578
564
579
def rshift (period : PeriodProto ,
565
- delta : timedelta ,
580
+ delta : dtm . timedelta ,
566
581
factory : _T_FACTORY = Period ,
567
582
) -> _T_FACTORY_RESULT :
568
583
"""Shift period right by timedelta (p >> delta)"""
569
584
570
- if not isinstance (delta , timedelta ):
585
+ if not isinstance (delta , dtm . timedelta ):
571
586
raise NotImplementedError ()
572
587
573
588
return factory (period .start + delta , period .end + delta )
@@ -577,7 +592,7 @@ def rshift(period: PeriodProto,
577
592
578
593
# TODO(d.burmistrov): jumping search + check ISO spec for sep alphabets
579
594
def fromisoformat (s : str , sep : str = _SEP , factory : _T_FACTORY = Period ):
580
- conv = datetime .fromisoformat
595
+ conv = dtm . datetime .fromisoformat
581
596
start , _ , end = s .partition (sep )
582
597
return factory (conv (start ), conv (end ))
583
598
@@ -587,7 +602,7 @@ def isoformat(obj: PeriodProto,
587
602
dt_sep = _DT_SEP ,
588
603
timespec = _TIMESPEC ,
589
604
sep : str = _SEP ) -> str :
590
- conv = functools .partial (datetime .isoformat ,
605
+ conv = functools .partial (dtm . datetime .isoformat ,
591
606
sep = dt_sep , timespec = timespec )
592
607
return f"{ conv (obj .start )} { sep } { conv (obj .end )} "
593
608
@@ -616,8 +631,8 @@ def strptime(period_string: str,
616
631
continue
617
632
618
633
try :
619
- start = datetime .strptime (period_string [:i ], date_format )
620
- end = datetime .strptime (period_string [j :], date_format )
634
+ start = dtm . datetime .strptime (period_string [:i ], date_format )
635
+ end = dtm . datetime .strptime (period_string [j :], date_format )
621
636
except ValueError :
622
637
continue
623
638
else :
@@ -649,7 +664,17 @@ def as_tuple(period: PeriodProto) -> _T_DT_PAIR:
649
664
return period .start , period .end
650
665
651
666
652
- def as_dict (period : PeriodProto ) -> dict [str , datetime ]:
667
+ def as_dict (period : PeriodProto ) -> dict [str , dtm . datetime ]:
653
668
"""Return a dictionary of edges"""
654
669
655
670
return dict (start = period .start , end = period .end )
671
+
672
+
673
+ @contextlib .contextmanager
674
+ def timer ():
675
+ box = [utcnow ()]
676
+ try :
677
+ yield box
678
+ finally :
679
+ box .append (utcnow ())
680
+ box .append (Period .load_edges (* box ))
0 commit comments