Skip to content

Commit

Permalink
feat: make session, base_url and realm_name available on the client
Browse files Browse the repository at this point in the history
  • Loading branch information
derlin committed Mar 21, 2024
1 parent 32e1254 commit cfdc0b5
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 12 deletions.
12 changes: 12 additions & 0 deletions mantelo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ def __init__(
append_slash=False,
)

@property
def session(self):
return self._store["session"]

@property
def base_url(self):
return self._store["base_url"]

@property
def realm_name(self):
return self._store["base_url"].split("/realms/")[1]

@classmethod
def create(
cls,
Expand Down
37 changes: 30 additions & 7 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ def test_init(openid_connection_password):
realm_name=constants.TEST_REALM,
auth=BearerAuth(openid_connection_password.token),
session=session,
headers={"foo": "bar"},
)

s = adm._store["session"]
assert s == session
assert s.headers.get("foo") == "bar"
assert adm.session == session
assert adm.realm_name == constants.TEST_REALM
assert (
adm.base_url
== f"{constants.TEST_SERVER_URL}/admin/realms/{constants.TEST_REALM}"
)

resp = [u["username"] for u in adm.users.get()]
assert constants.TEST_USER in resp
Expand All @@ -29,20 +31,34 @@ def test_init(openid_connection_password):
def test_create(openid_connection_sa):
adm = KeycloakAdmin.create(openid_connection_sa)

assert adm.session == openid_connection_sa.session
assert adm.realm_name == openid_connection_sa.realm_name
assert (
adm.base_url
== f"{openid_connection_sa.server_url}/admin/realms/{openid_connection_sa.realm_name}"
)
resp = [u["username"] for u in adm.users.get()]
assert constants.TEST_USER in resp


@pytest.mark.integration
def test_password_connection():
@pytest.mark.parametrize("with_custom_session", [True, False])
def test_password_connection(with_custom_session):
session = requests.Session() if with_custom_session else None

adm = KeycloakAdmin.from_credentials(
server_url=constants.TEST_SERVER_URL,
realm_name=constants.TEST_REALM,
client_id=constants.ADMIN_CLI_CLIENT,
username=constants.TEST_USER,
password=constants.TEST_PASSWORD,
session=session,
)

assert adm.session == adm.session.auth.token_getter.__self__.session
if session:
assert adm.session == session

resp = [u["username"] for u in adm.users.get()]
assert constants.TEST_USER in resp

Expand Down Expand Up @@ -109,15 +125,22 @@ def test_exceptions(openid_connection_password, status, op):
assert isinstance(ex.response, requests.Response)


@pytest.mark.integration
def test_headers(openid_connection_password):
headers = {"foo": "foo", "bar": "bar"}

adm = KeycloakAdmin.create(
connection=openid_connection_password,
)
adm.session.headers.update(headers)

# Trigger an exception to get the request object
with pytest.raises(HttpException) as excinfo:
adm.non_existant.get()

session_headers = adm._store["session"].headers
hs = excinfo.value.response.request.headers
for k, v in headers.items():
assert session_headers.get(k) == v
assert hs.get(k) == v


def test_resource_private(openid_connection_password):
Expand Down
94 changes: 89 additions & 5 deletions tests/test_connection.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import pytest

from attrs import evolve
from mantelo.connection import Token, AuthenticationException
from mantelo.connection import (
Token,
AuthenticationException,
ServiceAccountConnection,
UsernamePasswordConnection,
)
from datetime import datetime, timedelta, timezone
import requests


def test_token_from_dict():
Expand Down Expand Up @@ -94,6 +100,84 @@ def test_token_has_refresh_token(ok, token, now_delta):
assert res == ok


def test_userpasswordconnection_init():
conn = UsernamePasswordConnection(
server_url="https://kc.test",
realm_name="test",
client_id="foo-client",
username="u",
password="p",
)
assert (
conn.auth_url
== "https://kc.test/realms/test/protocol/openid-connect/token"
)
assert conn._token_exchange_data() == {
"scope": "openid",
"grant_type": "password",
"client_id": "foo-client",
"username": "u",
"password": "p",
}
assert isinstance(conn.session, requests.Session)
assert conn.refresh_timeout == timedelta(seconds=30)


def test_serviceaccountconnection_init():
conn = ServiceAccountConnection(
server_url="https://kc.test",
realm_name="test",
client_id="foo-client",
client_secret="s3cr3t",
)
assert (
conn.auth_url
== "https://kc.test/realms/test/protocol/openid-connect/token"
)
assert conn._token_exchange_data() == {
"scope": "openid",
"grant_type": "client_credentials",
"client_id": "foo-client",
"client_secret": "s3cr3t",
}
assert isinstance(conn.session, requests.Session)
assert conn.refresh_timeout == timedelta(seconds=30)


def test_openidconnection_default_values():
args = dict(
server_url="https://kc.test",
realm_name="test",
client_id="foo-client",
client_secret="s3cr3t",
)

# Defaults
conn = ServiceAccountConnection(**args)
assert isinstance(conn.session, requests.Session)
assert conn.refresh_timeout == timedelta(seconds=30)

# Override defaults
session = requests.Session()
timeout = timedelta(seconds=120)
conn = ServiceAccountConnection(
**args,
session=session,
refresh_timeout=timeout,
)
assert conn.session == session
assert conn.refresh_timeout == timeout

# None values fallback to the default values
conn = ServiceAccountConnection(
**args,
session=None,
refresh_timeout=None,
)
assert isinstance(conn.session, requests.Session)
assert conn.refresh_timeout == timedelta(seconds=30)


@pytest.mark.integration
def test_openid_token(openid_connection_password):
assert not openid_connection_password._token
Expand All @@ -120,7 +204,9 @@ def test_openid_token(openid_connection_password):
created_at=datetime.now(timezone.utc) - timedelta(days=1),
)
assert openid_connection_password.token() != access_token
assert openid_connection_password._token.expires_at > datetime.now(timezone.utc)
assert openid_connection_password._token.expires_at > datetime.now(
timezone.utc
)


@pytest.mark.integration
Expand All @@ -135,9 +221,7 @@ def test_openid_refresh_token(openid_connection_password):
openid_connection_password.password = "invalid"
openid_connection_password._fetch_token()
assert openid_connection_password._token
assert (
openid_connection_password._token.expires_at > token.expires_at
)
assert openid_connection_password._token.expires_at > token.expires_at


@pytest.mark.integration
Expand Down

0 comments on commit cfdc0b5

Please sign in to comment.