Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from urllib.parse import urlencode, urlparse, urlunparse
from subprocess import call
from json import loads
import requests

from azure.cli.core._profile import Profile
from azure.cli.core._util import CLIError

def _get_login_token(login_server, only_refresh_token=True, repository=None):
'''Obtains refresh and access tokens for an AAD-enabled registry.
:param str login_server: The registry login server URL to log in to
:param bool only_refresh_token: Whether to ask for only refresh token,
or for both refresh and access tokens
:param str repository: repository for which the access token is requested
'''
profile = Profile()
_, _, tenant = profile.get_login_credentials()
refresh = profile.get_refresh_credentials()
login_server = login_server.rstrip('/')
base_endpoint = 'https://' + login_server

challenge = requests.get(base_endpoint + '/v2/')
if challenge.status_code not in [401] or 'WWW-Authenticate' not in challenge.headers:
raise CLIError('Registry did not issue a challenge.')

authenticate = challenge.headers['WWW-Authenticate']

tokens = authenticate.split(' ', 2)
if len(tokens) < 2 or tokens[0].lower() != 'bearer':
raise CLIError('Registry does not support AAD login.')

params = {y[0]: y[1].strip('"') for y in
(x.strip().split('=', 2) for x in tokens[1].split(','))}
if 'realm' not in params or 'service' not in params:
raise CLIError('Registry does not support AAD login.')

authurl = urlparse(params['realm'])
authhost = urlunparse((authurl[0], authurl[1], '/oauth2/exchange', '', '', ''))

headers = {'Content-Type': 'application/x-www-form-urlencoded'}
if isinstance(refresh, str):
content = {
'service': params['service'],
'user_type': 'user',
'tenant': tenant,
'refresh_token': refresh
}
else:
content = {
'service': params['service'],
'user_type': 'spn',
'tenant': tenant,
'username': refresh[1],
'password': refresh[2]
}

response = requests.post(authhost, urlencode(content), headers=headers)

if response.status_code not in [200]:
raise CLIError(
"Access to registry was denied. Response code: {}".format(response.status_code))

refresh_token = loads(response.content.decode("utf-8"))["refresh_token"]
if only_refresh_token:
return refresh_token, ''

authhost = urlunparse((authurl[0], authurl[1], '/oauth2/token', '', '', ''))
if repository is None:
scope = 'registry:catalog:*'
else:
scope = 'repository:' + repository + ':pull'
content = {
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
'scope': scope,
'service': login_server
}
response = requests.post(authhost, urlencode(content), headers=headers)
access_token = loads(response.content.decode("utf-8"))["access_token"]

return refresh_token, access_token

def _get_login_refresh_token(login_server):
'''Obtains a refresh token from the token server for an AAD-enabled registry.
:param str login_server: The registry login server URL to log in to
'''
refresh_token, _ = _get_login_token(login_server)
return refresh_token

def get_login_access_token(login_server, repository=None):
'''Obtains an access token from the token server for an AAD-enabled registry.
:param str login_server: The registry login server URL to log in to
:param str repository: repository for which the access token is requested
'''
only_refresh_token = False
_, access_token = _get_login_token(login_server, only_refresh_token, repository)
return access_token

def docker_login_to_registry(login_server):
'''Logs in the Docker client to a registry.
:param str login_server: The registry login server URL to log in to
'''
refresh_token = _get_login_refresh_token(login_server)
call(["docker", "login",
"--username", "00000000-0000-0000-0000-000000000000",
"--password", refresh_token,
login_server])
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
get_resource_group_name_by_registry_name,
arm_deploy_template,
random_storage_account_name,
docker_login_to_registry,
get_registry_by_name
)
from ._docker_utils import (
docker_login_to_registry
)

import azure.cli.core.azlogging as azlogging
logger = azlogging.get_az_logger(__name__)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,46 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from base64 import b64encode
import requests

from requests.utils import to_native_string
from azure.cli.core.prompting import prompt, prompt_pass, NoTTYException
from azure.cli.core._util import CLIError

from ._utils import (
get_registry_by_name
)
from ._docker_utils import (
get_login_access_token
)
from .credential import acr_credential_show

def _basic_auth_str(username, password):
return 'Basic ' + to_native_string(
b64encode(('%s:%s' % (username, password)).encode('latin1')).strip()
)

def _bearer_auth_str(token):
return 'Bearer ' + token

def _obtain_data_from_registry(login_server, path, resultIndex, username, password):
registryEndpoint = 'https://' + login_server
resultList = []
executeNextHttpCall = True

if username is None:
auth = _bearer_auth_str(password)
else:
auth = _basic_auth_str(username, password)

headers = {'Authorization': auth}

while executeNextHttpCall:
executeNextHttpCall = False
response = requests.get(
registryEndpoint + path,
auth=requests.auth.HTTPBasicAuth(
username,
password
)
headers=headers
)

if response.status_code == 200:
Expand All @@ -43,7 +60,8 @@ def _obtain_data_from_registry(login_server, path, resultIndex, username, passwo

return resultList

def _validate_user_credentials(registry_name, path, resultIndex, username=None, password=None):
def _validate_user_credentials(
registry_name, path, resultIndex, username=None, password=None, repository=None):
registry, _ = get_registry_by_name(registry_name)
login_server = registry.login_server #pylint: disable=no-member

Expand All @@ -55,6 +73,12 @@ def _validate_user_credentials(registry_name, path, resultIndex, username=None,
raise CLIError('Please specify both username and password in non-interactive mode.')
return _obtain_data_from_registry(login_server, path, resultIndex, username, password)

try:
access_token = get_login_access_token(login_server, repository)
return _obtain_data_from_registry(login_server, path, resultIndex, None, access_token)
except: #pylint: disable=bare-except
pass

try:
cred = acr_credential_show(registry_name)
username = cred.username
Expand Down Expand Up @@ -89,4 +113,4 @@ def acr_repository_show_tags(registry_name, repository, username=None, password=
:param str password: The password used to log into the container registry
'''
path = '/v2/' + repository + '/tags/list'
return _validate_user_credentials(registry_name, path, 'tags', username, password)
return _validate_user_credentials(registry_name, path, 'tags', username, password, repository)