1
1
import functools
2
2
import os
3
3
import shutil
4
- import subprocess
5
- from datetime import datetime
6
4
from pathlib import Path
7
- from subprocess import CalledProcessError
8
5
from typing import Optional
9
6
10
- from dateutil .parser import parse
11
- from requests import HTTPError
7
+ from git import Repo , InvalidGitRepositoryError , GitCommandError
12
8
13
9
14
10
class GitUtils :
15
- _cwd_folder = None
16
-
17
- @classmethod
18
- def set_cwd_folder (cls , cwd : Optional [Path ] = None ):
19
- cls ._cwd_folder = cwd
20
-
21
11
@classmethod
22
12
@functools .lru_cache ()
23
- def exec (cls , cmd : str ) -> str :
24
- return subprocess .run (
25
- args = cmd .split (" " ),
26
- stdout = subprocess .PIPE ,
27
- stderr = subprocess .DEVNULL ,
28
- cwd = cls ._cwd_folder
29
- ).stdout .decode ("utf-8" ).strip ()
30
-
31
- @classmethod
32
- def version (cls ) -> str :
33
- return cls .exec ("git --version" )
34
-
35
- @classmethod
36
- def is_enable (cls ) -> bool :
37
- return cls .version () is not None
38
-
39
- @classmethod
40
- def branch_all (cls ) -> str :
41
- return cls .exec ("git branch --all" )
42
-
43
- @classmethod
44
- def current_branch (cls ) -> str :
45
- for out in cls .branch_all ().splitlines ():
46
- if not out .startswith ("*" ):
47
- continue
48
-
49
- out = out .strip ().split (maxsplit = 1 )
50
- return out [- 1 ]
51
-
52
- @classmethod
53
- def commit_id (cls ) -> str :
54
- return cls .exec ("git rev-parse --short HEAD" )
55
-
56
- @classmethod
57
- def commit_count (cls ) -> int :
58
- return int (cls .exec ("git rev-list --count HEAD" ))
59
-
60
- @classmethod
61
- def has_tag (cls , name : str ) -> bool :
62
- return cls .exec (f"git tag -l { name } " ) != ""
13
+ def current_branch (cls , repo_dir : Path ) -> Optional [str ]:
14
+ try :
15
+ return Repo (repo_dir ).active_branch .name
16
+ except InvalidGitRepositoryError :
17
+ return None
63
18
64
19
@classmethod
65
20
def clone_and_zip (cls , url : str , out : Path ) -> float :
66
- out .parent .mkdir (parents = True , exist_ok = True )
67
21
repo_dir = out .with_suffix ("" )
68
-
69
- if not cls .is_enable ():
70
- raise RuntimeError ("git command not found" )
22
+ if repo_dir .exists ():
23
+ shutil .rmtree (repo_dir )
71
24
72
25
try :
73
- subprocess .run (
74
- args = ["git" , "clone" , url , repo_dir .as_posix (), "--depth=1" ],
75
- cwd = out .parent .as_posix (),
76
- stderr = subprocess .DEVNULL ,
77
- check = True
78
- )
79
- except CalledProcessError :
26
+ repo = Repo .clone_from (url , repo_dir )
27
+ last_committed = float (repo .commit ().committed_date )
28
+ except GitCommandError :
80
29
shutil .rmtree (repo_dir , ignore_errors = True )
81
- raise HTTPError (f"clone failed: { url } " )
82
-
83
- try :
84
- result = subprocess .run (
85
- ["git" , "log" , "--format=%cd" ],
86
- stdout = subprocess .PIPE ,
87
- stderr = subprocess .DEVNULL ,
88
- cwd = repo_dir .as_posix ()
89
- ).stdout .decode ("utf-8" )
90
-
91
- last_committed = parse (result ).timestamp ()
92
- except CalledProcessError :
93
- last_committed = datetime .now ().timestamp ()
30
+ raise GitCommandError (f"clone failed: { url } " )
94
31
95
- for path in repo_dir .rglob ( "*" ):
32
+ for path in repo_dir .iterdir ( ):
96
33
if path .name .startswith (".git" ):
97
34
if path .is_dir ():
98
35
shutil .rmtree (path , ignore_errors = True )
@@ -103,7 +40,10 @@ def clone_and_zip(cls, url: str, out: Path) -> float:
103
40
104
41
os .utime (path , (last_committed , last_committed ))
105
42
106
- shutil .make_archive (repo_dir .as_posix (), format = "zip" , root_dir = repo_dir )
107
- shutil .rmtree (repo_dir )
43
+ try :
44
+ shutil .make_archive (repo_dir .as_posix (), format = "zip" , root_dir = repo_dir )
45
+ shutil .rmtree (repo_dir )
46
+ except FileNotFoundError :
47
+ raise FileNotFoundError (f"archive failed: { repo_dir .as_posix ()} " )
108
48
109
49
return last_committed
0 commit comments