1
1
"""Module implements logic for rhcos image download"""
2
+ import subprocess
2
3
from subprocess import Popen , PIPE
3
4
from shutil import copyfileobj
4
5
from pathlib import Path
5
6
from typing import Tuple
6
7
from tempfile import NamedTemporaryFile
7
8
9
+
8
10
import gzip
9
11
import re
10
12
import logging
16
18
GITHUB_URL = "https://raw.githubusercontent.com/openshift/installer/{commit}/data/data/rhcos.json"
17
19
18
20
19
- def get_commit (installer : str ) -> str :
21
+ class CoreOsException (Exception ):
22
+ """CoreOsException represents error while executing installer in older version
23
+ """
24
+ def __init__ (self , * args , ** kwargs ):
25
+ super ().__init__ (self , * args , * kwargs )
26
+
27
+
28
+ def _get_coreos_json (installer : str ) -> str :
29
+ json_data = {}
30
+ with Popen ([installer , "coreos" , "print-stream-json" ], stdout = PIPE ,
31
+ stderr = subprocess .DEVNULL , universal_newlines = True ) as proc :
32
+ proc .wait ()
33
+ if proc .returncode != 0 :
34
+ raise CoreOsException ("Installer doesn't support coreos subcommand" )
35
+ json_data = json .loads (proc .stdout .read ())
36
+ json_part = json_data ["architectures" ]["x86_64" ]["artifacts" ]["openstack" ]
37
+ release_str = json_part ["release" ]
38
+ json_part = json_part ["formats" ]["qcow2.gz" ]
39
+ return json_part .get ("disk" , json_part )["location" ], release_str
40
+
41
+
42
+ def get_commit (installer : str ) -> Tuple [str , str ]:
20
43
"""Function extracts source commit from installer,
21
44
in order to find associated rhcos image"""
22
45
version_str = ""
@@ -28,16 +51,26 @@ def get_commit(installer: str) -> str:
28
51
return commits [0 ]
29
52
30
53
31
- def get_url (installer : str ) -> Tuple [str , str ]:
32
- """Function builds url to rhcos image and version of
33
- rhcos iamge."""
54
+ def _get_old_url (installer : str ) -> Tuple [str , str ]:
34
55
commit = get_commit (installer )
35
56
gh_data_link = GITHUB_URL .format (commit = commit )
36
57
rhcos_json = requests .get (gh_data_link , allow_redirects = True )
37
58
rhcos_data = json .loads (rhcos_json .content )
38
59
return rhcos_data ['baseURI' ] + rhcos_data ['images' ]['openstack' ]['path' ], rhcos_data ['buildid' ]
39
60
40
61
62
+ def get_url (installer : str ) -> Tuple [str , str ]:
63
+ """Function builds url to rhcos image and version of
64
+ rhcos iamge."""
65
+ url , version = None , None
66
+ try :
67
+ url , version = _get_coreos_json (installer )
68
+ except CoreOsException as ex :
69
+ logging .debug (ex )
70
+ url , version = _get_old_url (installer )
71
+ return url , version
72
+
73
+
41
74
def _extract_gzip (buff : NamedTemporaryFile , target : str ) -> Path :
42
75
result = None
43
76
with gzip .open (buff .name ) as zip_file :
0 commit comments