14
14
import gzip
15
15
import logging
16
16
import os
17
+ import sys
18
+
17
19
from pathlib import Path
18
20
from typing import Callable , Iterator , TextIO
19
21
from Bio import bgzf
20
22
21
- log = logging .getLogger (__name__ )
23
+ _LOG = logging .getLogger (__name__ )
22
24
23
25
24
26
def is_compressed (file : str | Path ) -> bool :
@@ -85,9 +87,9 @@ def open_output(path: str | Path, mode: str = 'wt') -> Iterator[TextIO]:
85
87
86
88
Raises
87
89
------
88
- FileExistsError
90
+ 3 = FileExistsError
89
91
Raised if the output file already exists.
90
- PermissionError
92
+ 11 = PermissionError
91
93
Raised if the calling process does not have adequate access rights to
92
94
write to the output file.
93
95
"""
@@ -100,7 +102,8 @@ def open_output(path: str | Path, mode: str = 'wt') -> Iterator[TextIO]:
100
102
# bgzf is old code and doesn't use "xt" mode, annoyingly. This manual check should suffice.
101
103
if mode == "xt" :
102
104
if output_path .exists ():
103
- raise FileExistsError (f"file '{ path } ' already exists" )
105
+ _LOG .error (f"file '{ path } ' already exists" )
106
+ sys .exit (3 )
104
107
else :
105
108
mode = "wt"
106
109
open_ = bgzf .open
@@ -125,26 +128,28 @@ def validate_input_path(path: str | Path):
125
128
126
129
Raises
127
130
------
128
- FileNotFoundError
131
+ 5 = FileNotFoundError
129
132
Raised if the input file does not exist or is not a file.
130
- RuntimeError
133
+ 7 = RuntimeError
131
134
Raised if the input file is empty.
132
- PermissionError
135
+ 9 = PermissionError
133
136
Raised if the calling process has no read access to the file.
134
137
"""
135
138
path = Path (path )
136
139
mssg = ''
137
140
138
141
if not path .is_file ():
139
142
mssg += f"Path '{ path } ' does not exist or not a file"
140
- raise FileNotFoundError (mssg )
143
+ _LOG .error (mssg )
144
+ sys .exit (5 )
141
145
stats = path .stat ()
142
146
if stats .st_size == 0 :
143
147
mssg += f"File '{ path } ' is empty"
144
- raise RuntimeError (mssg )
148
+ _LOG .error (mssg )
149
+ sys .exit (7 )
145
150
if not os .access (path , os .R_OK ):
146
151
mssg += f"cannot read from '{ path } ': access denied"
147
- raise PermissionError ( mssg )
152
+ _LOG . error ( 9 )
148
153
149
154
150
155
def validate_output_path (path : str | Path , is_file : bool = True , overwrite : bool = False ):
@@ -161,18 +166,20 @@ def validate_output_path(path: str | Path, is_file: bool = True, overwrite: bool
161
166
162
167
Raises
163
168
------
164
- FileExistsError
169
+ 3 = FileExistsError
165
170
Raised if path is a file and already exists.
166
- PermissionError
171
+ 11 = PermissionError
167
172
Raised if the calling process does not have adequate access rights to.
168
173
"""
169
174
path = Path (path )
170
175
if is_file :
171
176
if path .is_file () and not overwrite :
172
- raise FileExistsError (f"file '{ path } ' already exists" )
177
+ _LOG .error (f"file '{ path } ' already exists" )
178
+ sys .exit (3 )
173
179
else :
174
180
if path .is_dir ():
175
181
if not os .access (path , os .W_OK ):
176
- raise PermissionError (f"cannot write to '{ path } ', access denied" )
182
+ _LOG .error (f"cannot write to '{ path } ', access denied" )
183
+ sys .exit (11 )
177
184
else :
178
185
path .parent .mkdir (parents = True , exist_ok = True )
0 commit comments