Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve UsernamePickerTestCase #9112

Merged
merged 3 commits into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9112.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve `UsernamePickerTestCase`.
120 changes: 2 additions & 118 deletions tests/handlers/test_oidc.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import re
from typing import Dict, Optional
from urllib.parse import parse_qs, urlencode, urlparse
from typing import Optional
from urllib.parse import parse_qs, urlparse

from mock import ANY, Mock, patch

import pymacaroons

from twisted.web.resource import Resource

from synapse.api.errors import RedirectException
from synapse.handlers.sso import MappingException
from synapse.rest.client.v1 import login
from synapse.rest.synapse.client.pick_username import pick_username_resource
from synapse.server import HomeServer
from synapse.types import UserID

Expand Down Expand Up @@ -855,116 +849,6 @@ def _generate_oidc_session_token(
)


class UsernamePickerTestCase(HomeserverTestCase):
if not HAS_OIDC:
skip = "requires OIDC"

servlets = [login.register_servlets]

def default_config(self):
config = super().default_config()
config["public_baseurl"] = BASE_URL
oidc_config = {
"enabled": True,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"issuer": ISSUER,
"scopes": SCOPES,
"user_mapping_provider": {
"config": {"display_name_template": "{{ user.displayname }}"}
},
}

# Update this config with what's in the default config so that
# override_config works as expected.
oidc_config.update(config.get("oidc_config", {}))
config["oidc_config"] = oidc_config

# whitelist this client URI so we redirect straight to it rather than
# serving a confirmation page
config["sso"] = {"client_whitelist": ["https://whitelisted.client"]}
return config

def create_resource_dict(self) -> Dict[str, Resource]:
d = super().create_resource_dict()
d["/_synapse/client/pick_username"] = pick_username_resource(self.hs)
return d

def test_username_picker(self):
"""Test the happy path of a username picker flow."""
client_redirect_url = "https://whitelisted.client"

# first of all, mock up an OIDC callback to the OidcHandler, which should
# raise a RedirectException
userinfo = {"sub": "tester", "displayname": "Jonny"}
f = self.get_failure(
_make_callback_with_userinfo(
self.hs, userinfo, client_redirect_url=client_redirect_url
),
RedirectException,
)

# check the Location and cookies returned by the RedirectException
self.assertEqual(f.value.location, b"/_synapse/client/pick_username")
cookieheader = f.value.cookies[0]
regex = re.compile(b"^username_mapping_session=([a-zA-Z]+);")
m = regex.search(cookieheader)
if not m:
self.fail("cookie header %s does not match %s" % (cookieheader, regex))

# introspect the sso handler a bit to check that the username mapping session
# looks ok.
session_id = m.group(1).decode("ascii")
username_mapping_sessions = self.hs.get_sso_handler()._username_mapping_sessions
self.assertIn(
session_id, username_mapping_sessions, "session id not found in map"
)
session = username_mapping_sessions[session_id]
self.assertEqual(session.remote_user_id, "tester")
self.assertEqual(session.display_name, "Jonny")
self.assertEqual(session.client_redirect_url, client_redirect_url)

# the expiry time should be about 15 minutes away
expected_expiry = self.clock.time_msec() + (15 * 60 * 1000)
self.assertApproximates(session.expiry_time_ms, expected_expiry, tolerance=1000)

# Now, submit a username to the username picker, which should serve a redirect
# back to the client
submit_path = f.value.location + b"/submit"
content = urlencode({b"username": b"bobby"}).encode("utf8")
chan = self.make_request(
"POST",
path=submit_path,
content=content,
content_is_form=True,
custom_headers=[
("Cookie", cookieheader),
# old versions of twisted don't do form-parsing without a valid
# content-length header.
("Content-Length", str(len(content))),
],
)
self.assertEqual(chan.code, 302, chan.result)
location_headers = chan.headers.getRawHeaders("Location")
# ensure that the returned location starts with the requested redirect URL
self.assertEqual(
location_headers[0][: len(client_redirect_url)], client_redirect_url
)

# fish the login token out of the returned redirect uri
parts = urlparse(location_headers[0])
query = parse_qs(parts.query)
login_token = query["loginToken"][0]

# finally, submit the matrix login token to the login API, which gives us our
# matrix access token, mxid, and device id.
chan = self.make_request(
"POST", "/login", content={"type": "m.login.token", "token": login_token},
)
self.assertEqual(chan.code, 200, chan.result)
self.assertEqual(chan.json_body["user_id"], "@bobby:test")


async def _make_callback_with_userinfo(
hs: HomeServer, userinfo: dict, client_redirect_url: str = "http://client/redirect"
) -> None:
Expand Down
105 changes: 104 additions & 1 deletion tests/rest/client/v1/test_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import urllib.parse
from html.parser import HTMLParser
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from urllib.parse import parse_qs, urlencode, urlparse

from mock import Mock

Expand All @@ -30,13 +31,14 @@
from synapse.rest.client.v2_alpha import devices, register
from synapse.rest.client.v2_alpha.account import WhoamiRestServlet
from synapse.rest.synapse.client.pick_idp import PickIdpResource
from synapse.rest.synapse.client.pick_username import pick_username_resource
from synapse.types import create_requester

from tests import unittest
from tests.handlers.test_oidc import HAS_OIDC
from tests.handlers.test_saml import has_saml2
from tests.rest.client.v1.utils import TEST_OIDC_AUTH_ENDPOINT, TEST_OIDC_CONFIG
from tests.unittest import override_config, skip_unless
from tests.unittest import HomeserverTestCase, override_config, skip_unless

try:
import jwt
Expand Down Expand Up @@ -1060,3 +1062,104 @@ def test_login_appservice_no_token(self):
channel = self.make_request(b"POST", LOGIN_URL, params)

self.assertEquals(channel.result["code"], b"401", channel.result)


@skip_unless(HAS_OIDC, "requires OIDC")
class UsernamePickerTestCase(HomeserverTestCase):
"""Tests for the username picker flow of SSO login"""

servlets = [login.register_servlets]

def default_config(self):
config = super().default_config()
config["public_baseurl"] = BASE_URL

config["oidc_config"] = {}
config["oidc_config"].update(TEST_OIDC_CONFIG)
config["oidc_config"]["user_mapping_provider"] = {
"config": {"display_name_template": "{{ user.displayname }}"}
}

# whitelist this client URI so we redirect straight to it rather than
# serving a confirmation page
config["sso"] = {"client_whitelist": ["https://whitelisted.client"]}
return config

def create_resource_dict(self) -> Dict[str, Resource]:
from synapse.rest.oidc import OIDCResource

d = super().create_resource_dict()
d["/_synapse/client/pick_username"] = pick_username_resource(self.hs)
d["/_synapse/oidc"] = OIDCResource(self.hs)
return d

def test_username_picker(self):
"""Test the happy path of a username picker flow."""
client_redirect_url = "https://whitelisted.client"

# do the start of the login flow
channel = self.helper.auth_via_oidc(
{"sub": "tester", "displayname": "Jonny"}, client_redirect_url
)

# that should redirect to the username picker
self.assertEqual(channel.code, 302, channel.result)
picker_url = channel.headers.getRawHeaders("Location")[0]
self.assertEqual(picker_url, "/_synapse/client/pick_username")

# ... with a username_mapping_session cookie
cookies = {} # type: Dict[str,str]
channel.extract_cookies(cookies)
self.assertIn("username_mapping_session", cookies)
session_id = cookies["username_mapping_session"]

# introspect the sso handler a bit to check that the username mapping session
# looks ok.
username_mapping_sessions = self.hs.get_sso_handler()._username_mapping_sessions
self.assertIn(
session_id, username_mapping_sessions, "session id not found in map",
)
session = username_mapping_sessions[session_id]
self.assertEqual(session.remote_user_id, "tester")
self.assertEqual(session.display_name, "Jonny")
self.assertEqual(session.client_redirect_url, client_redirect_url)

# the expiry time should be about 15 minutes away
expected_expiry = self.clock.time_msec() + (15 * 60 * 1000)
self.assertApproximates(session.expiry_time_ms, expected_expiry, tolerance=1000)

# Now, submit a username to the username picker, which should serve a redirect
# back to the client
submit_path = picker_url + "/submit"
content = urlencode({b"username": b"bobby"}).encode("utf8")
chan = self.make_request(
"POST",
path=submit_path,
content=content,
content_is_form=True,
custom_headers=[
("Cookie", "username_mapping_session=" + session_id),
# old versions of twisted don't do form-parsing without a valid
# content-length header.
("Content-Length", str(len(content))),
],
)
self.assertEqual(chan.code, 302, chan.result)
location_headers = chan.headers.getRawHeaders("Location")
# ensure that the returned location starts with the requested redirect URL
self.assertEqual(
location_headers[0][: len(client_redirect_url)], client_redirect_url
)

# fish the login token out of the returned redirect uri
parts = urlparse(location_headers[0])
query = parse_qs(parts.query)
login_token = query["loginToken"][0]

# finally, submit the matrix login token to the login API, which gives us our
# matrix access token, mxid, and device id.
chan = self.make_request(
"POST", "/login", content={"type": "m.login.token", "token": login_token},
)
self.assertEqual(chan.code, 200, chan.result)
self.assertEqual(chan.json_body["user_id"], "@bobby:test")
11 changes: 6 additions & 5 deletions tests/rest/client/v1/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,10 @@ def login_via_oidc(self, remote_user_id: str) -> JsonDict:
the normal places.
"""
client_redirect_url = "https://x"
channel = self.auth_via_oidc(remote_user_id, client_redirect_url)
channel = self.auth_via_oidc({"sub": remote_user_id}, client_redirect_url)

# expect a confirmation page
assert channel.code == 200
assert channel.code == 200, channel.result

# fish the matrix login token out of the body of the confirmation page
m = re.search(
Expand All @@ -390,7 +390,7 @@ def login_via_oidc(self, remote_user_id: str) -> JsonDict:

def auth_via_oidc(
self,
remote_user_id: str,
user_info_dict: JsonDict,
client_redirect_url: Optional[str] = None,
ui_auth_session_id: Optional[str] = None,
) -> FakeChannel:
Expand All @@ -411,7 +411,8 @@ def auth_via_oidc(
the normal places.

Args:
remote_user_id: the remote id that the OIDC provider should present
user_info_dict: the remote userinfo that the OIDC provider should present.
Typically this should be '{"sub": "<remote user id>"}'.
client_redirect_url: for a login flow, the client redirect URL to pass to
the login redirect endpoint
ui_auth_session_id: if set, we will perform a UI Auth flow. The session id
Expand Down Expand Up @@ -457,7 +458,7 @@ def auth_via_oidc(
# a dummy OIDC access token
("https://issuer.test/token", {"access_token": "TEST"}),
# and then one to the user_info endpoint, which returns our remote user id.
("https://issuer.test/userinfo", {"sub": remote_user_id}),
("https://issuer.test/userinfo", user_info_dict),
]

async def mock_req(method: str, uri: str, data=None, headers=None):
Expand Down
2 changes: 1 addition & 1 deletion tests/rest/client/v2_alpha/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ def test_ui_auth_via_sso(self):
# run the UIA-via-SSO flow
session_id = channel.json_body["session"]
channel = self.helper.auth_via_oidc(
remote_user_id=remote_user_id, ui_auth_session_id=session_id
{"sub": remote_user_id}, ui_auth_session_id=session_id
)

# that should serve a confirmation page
Expand Down