diff --git a/msal/application.py b/msal/application.py index 57e40980..88e3e00b 100644 --- a/msal/application.py +++ b/msal/application.py @@ -920,6 +920,9 @@ def initiate_auth_code_flow( ): """Initiate an auth code flow. + .. deprecated:: + The response_mode parameter is deprecated and will be removed in a future version. + Later when the response reaches your redirect_uri, you can use :func:`~acquire_token_by_auth_code_flow()` to complete the authentication/authorization. @@ -960,18 +963,10 @@ def initiate_auth_code_flow( New in version 1.15. :param str response_mode: - OPTIONAL. Specifies the method with which response parameters should be returned. - The default value is equivalent to ``query``, which is still secure enough in MSAL Python - (because MSAL Python does not transfer tokens via query parameter in the first place). - For even better security, we recommend using the value ``form_post``. - In "form_post" mode, response parameters - will be encoded as HTML form values that are transmitted via the HTTP POST method and - encoded in the body using the application/x-www-form-urlencoded format. - Valid values can be either "form_post" for HTTP POST to callback URI or - "query" (the default) for HTTP GET with parameters encoded in query string. - More information on possible values - `here ` - and `here ` + .. deprecated:: + This parameter is deprecated and will be removed in a future version. + The response_mode is always set to ``form_post`` for security reasons, + regardless of the value provided. Do not use this parameter. :return: The auth code flow. It is a dict in this form:: @@ -991,6 +986,13 @@ def initiate_auth_code_flow( 3. and then relay this dict and subsequent auth response to :func:`~acquire_token_by_auth_code_flow()`. """ + if response_mode is not None: + import warnings + warnings.warn( + "The 'response_mode' parameter is deprecated and will be removed in a future version. " + "Response mode is always 'form_post' for security reasons.", + DeprecationWarning, + stacklevel=2) client = _ClientWithCcsRoutingInfo( {"authorization_endpoint": self.authority.authorization_endpoint}, self.client_id, diff --git a/msal/oauth2cli/authcode.py b/msal/oauth2cli/authcode.py index ba266223..3d960211 100644 --- a/msal/oauth2cli/authcode.py +++ b/msal/oauth2cli/authcode.py @@ -111,27 +111,76 @@ class _AuthCodeHandler(BaseHTTPRequestHandler): def do_GET(self): # For flexibility, we choose to not check self.path matching redirect_uri #assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP') - qs = parse_qs(urlparse(self.path).query) - if qs.get('code') or qs.get("error"): # So, it is an auth response + parsed_url = urlparse(self.path) + qs = parse_qs(parsed_url.query) + + if qs.get('code'): # Auth code via GET is a security risk - reject it + logger.error("Received auth code via query string (GET request). " + "This is a security risk. Only form_post (POST) is supported.") + self._send_full_response( + "Authentication method not supported. " + "The application requires response_mode=form_post for security. " + "Please ensure your application registration uses form_post response mode.", + is_ok=False) + elif qs.get("error"): # Errors can come via GET - process them auth_response = _qs2kv(qs) - logger.debug("Got auth response: %s", auth_response) - if self.server.auth_state and self.server.auth_state != auth_response.get("state"): - # OAuth2 successful and error responses contain state when it was used - # https://www.rfc-editor.org/rfc/rfc6749#section-4.2.2.1 - self._send_full_response("State mismatch") # Possibly an attack - else: - template = (self.server.success_template - if "code" in qs else self.server.error_template) - if _is_html(template.template): - safe_data = _escape(auth_response) # Foiling an XSS attack - else: - safe_data = auth_response - self._send_full_response(template.safe_substitute(**safe_data)) - self.server.auth_response = auth_response # Set it now, after the response is likely sent + self._process_auth_response(auth_response) + elif not qs and parsed_url.path == '/': + # GET request to root with no query params - this is likely from clicking OK on eSTS error + # Treat it as an error since success would come via POST + logger.warning("Received GET request to root path with no query parameters - treating as authentication error") + auth_response = { + "error": "authentication_failed", + "error_description": "Please close this window and try again." + } + # Include the original state so state validation doesn't fail + if self.server.auth_state: + auth_response["state"] = self.server.auth_state + self._process_auth_response(auth_response) else: self._send_full_response(self.server.welcome_page) # NOTE: Don't do self.server.shutdown() here. It'll halt the server. + def do_POST(self): + # Handle form_post response mode where auth code is sent via POST body + content_length = int(self.headers.get('Content-Length', 0)) + post_data = self.rfile.read(content_length).decode('utf-8') + try: + from urllib.parse import parse_qs as parse_qs_post + except ImportError: + from urlparse import parse_qs as parse_qs_post + + qs = parse_qs_post(post_data) + if qs.get('code') or qs.get('error'): # So, it is an auth response + auth_response = _qs2kv(qs) + logger.debug("Got auth response via POST: %s", auth_response) + self._process_auth_response(auth_response) + else: + self._send_full_response("Invalid POST request", is_ok=False) + + def _process_auth_response(self, auth_response): + """Process the auth response from either GET or POST request.""" + if self.server.auth_state and self.server.auth_state != auth_response.get("state"): + # OAuth2 successful and error responses contain state when it was used + # https://www.rfc-editor.org/rfc/rfc6749#section-4.2.2.1 + self._send_full_response("State mismatch") # Possibly an attack + # Don't set auth_response for security, but mark as done to avoid hanging + self.server.done = True + else: + template = (self.server.success_template + if "code" in auth_response else self.server.error_template) + if _is_html(template.template): + safe_data = _escape(auth_response) # Foiling an XSS attack + else: + safe_data = dict(auth_response) # Make a copy to avoid mutating original + # Provide default values for common OAuth2 response fields + # to avoid showing literal placeholder text like "$error_description" + safe_data.setdefault("error", "") + safe_data.setdefault("error_description", "") + safe_data.setdefault("error_uri", "") + self._send_full_response(template.safe_substitute(**safe_data)) + self.server.auth_response = auth_response # Set it now, after the response is likely sent + def _send_full_response(self, body, is_ok=True): self.send_response(200 if is_ok else 400) content_type = 'text/html' if _is_html(body) else 'text/plain' @@ -317,19 +366,22 @@ def _get_auth_response(self, result, auth_uri=None, timeout=None, state=None, auth_uri_callback(_uri) self._server.success_template = Template(success_template or - "Authentication completed. You can close this window now.") + "Authentication complete. You can return to the application. Please close this browser tab.\n\n" + "For your security: Do not share the contents of this page, the address bar, or take screenshots.") self._server.error_template = Template(error_template or - "Authentication failed. $error: $error_description. ($error_uri)") + "Authentication failed. $error: $error_description.\n\n" + "For your security: Do not share the contents of this page, the address bar, or take screenshots.") self._server.timeout = timeout # Otherwise its handle_timeout() won't work self._server.auth_response = {} # Shared with _AuthCodeHandler self._server.auth_state = state # So handler will check it before sending response + self._server.done = False # Flag to indicate completion without setting auth_response while not self._closing: # Otherwise, the handle_request() attempt # would yield noisy ValueError trace # Derived from # https://docs.python.org/2/library/basehttpserver.html#more-examples self._server.handle_request() - if self._server.auth_response: + if self._server.auth_response or self._server.done: break result.update(self._server.auth_response) # Return via writable result param diff --git a/msal/oauth2cli/oauth2.py b/msal/oauth2cli/oauth2.py index ef32ceaa..9975fb0d 100644 --- a/msal/oauth2cli/oauth2.py +++ b/msal/oauth2cli/oauth2.py @@ -176,7 +176,14 @@ def _build_auth_request_params(self, response_type, **kwargs): response_type = self._stringify(response_type) params = {'client_id': self.client_id, 'response_type': response_type} - params.update(kwargs) # Note: None values will override params + # Strictly enforce form_post for security - query string is not allowed + params['response_mode'] = 'form_post' + if 'response_mode' in kwargs and kwargs['response_mode'] != 'form_post': + import warnings + warnings.warn( + "The 'response_mode' parameter will be overridden to 'form_post' for better security.", + UserWarning) + params.update({k: v for k, v in kwargs.items() if k != 'response_mode'}) # Exclude response_mode from kwargs params = {k: v for k, v in params.items() if v is not None} # clean up if params.get('scope'): params['scope'] = self._stringify(params['scope']) @@ -467,6 +474,13 @@ def initiate_auth_code_flow( 3. and then relay this dict and subsequent auth response to :func:`~obtain_token_by_auth_code_flow()`. """ + if "response_mode" in kwargs: + import warnings + warnings.warn( + "The 'response_mode' parameter is deprecated and will be removed in a future version. " + "Response mode is always 'form_post' for security reasons.", + DeprecationWarning, + stacklevel=2) response_type = kwargs.pop("response_type", "code") # Auth Code flow # Must be "code" when you are using Authorization Code Grant. # The "token" for Implicit Grant is not applicable thus not allowed. diff --git a/tests/test_authcode.py b/tests/test_authcode.py index 329cafd8..fd38c294 100644 --- a/tests/test_authcode.py +++ b/tests/test_authcode.py @@ -26,17 +26,159 @@ def test_no_two_concurrent_receivers_can_listen_on_same_port(self): pass def test_template_should_escape_input(self): + """Test that POST request with HTML in error is properly escaped""" with AuthCodeReceiver() as receiver: receiver._scheduled_actions = [( # Injection happens here when the port is known 1, # Delay it until the receiver is activated by get_auth_response() lambda: self.assertEqual( "<tag>foo</tag>", - requests.get("http://localhost:{}?error=foo".format( - receiver.get_port())).text, - "Unsafe data in HTML should be escaped", + requests.post( + "http://localhost:{}".format(receiver.get_port()), + data={"error": "foo"}, + headers={'Content-Type': 'application/x-www-form-urlencoded'} + ).text, ))] receiver.get_auth_response( # Starts server and hang until timeout timeout=3, error_template="$error", ) + def test_get_request_with_auth_code_is_rejected(self): + """Test that GET request with auth code is rejected for security""" + with AuthCodeReceiver() as receiver: + test_code = "test_auth_code_12345" + test_state = "test_state_67890" + receiver._scheduled_actions = [( + 1, + lambda: self._verify_get_rejection( + receiver.get_port(), + code=test_code, + state=test_state + ) + )] + result = receiver.get_auth_response( + timeout=3, + state=test_state, + ) + # Should not receive auth response via GET + self.assertIsNone(result) + + def _verify_get_rejection(self, port, **params): + """Helper to verify GET requests with auth codes are rejected""" + try: + from urllib.parse import urlencode + except ImportError: + from urllib import urlencode + + response = requests.get( + "http://localhost:{}?{}".format(port, urlencode(params)) + ) + # Verify error message about unsupported method + self.assertIn("not supported", response.text.lower()) + self.assertEqual(response.status_code, 400) + + def test_post_request_with_auth_code(self): + """Test that POST request with auth code is handled correctly (form_post response mode)""" + with AuthCodeReceiver() as receiver: + test_code = "test_auth_code_12345" + test_state = "test_state_67890" + receiver._scheduled_actions = [( + 1, + lambda: self._send_post_auth_response( + receiver.get_port(), + code=test_code, + state=test_state + ) + )] + result = receiver.get_auth_response( + timeout=3, + state=test_state, + success_template="Success: Got code $code", + ) + self.assertIsNotNone(result) + self.assertEqual(result.get("code"), test_code) + self.assertEqual(result.get("state"), test_state) + + def test_post_request_with_error(self): + """Test that POST request with error is handled correctly""" + with AuthCodeReceiver() as receiver: + test_error = "access_denied" + test_error_description = "User denied access" + receiver._scheduled_actions = [( + 1, + lambda: self._send_post_auth_response( + receiver.get_port(), + error=test_error, + error_description=test_error_description + ) + )] + result = receiver.get_auth_response( + timeout=3, + error_template="Error: $error - $error_description", + ) + self.assertIsNotNone(result) + self.assertEqual(result.get("error"), test_error) + self.assertEqual(result.get("error_description"), test_error_description) + + def test_post_request_state_mismatch(self): + """Test that POST request with mismatched state is rejected""" + with AuthCodeReceiver() as receiver: + test_code = "test_auth_code_12345" + wrong_state = "wrong_state" + expected_state = "expected_state" + receiver._scheduled_actions = [( + 1, + lambda: self._send_post_auth_response( + receiver.get_port(), + code=test_code, + state=wrong_state + ) + )] + result = receiver.get_auth_response( + timeout=3, + state=expected_state, + ) + # When state mismatches, the response should not be set + self.assertIsNone(result) + + def test_post_request_escapes_html(self): + """Test that POST request with HTML in error is properly escaped""" + with AuthCodeReceiver() as receiver: + malicious_error = "" + receiver._scheduled_actions = [( + 1, + lambda: self._verify_post_escaping(receiver.get_port(), malicious_error) + )] + receiver.get_auth_response( + timeout=3, + error_template="$error", + ) + + def _send_post_auth_response(self, port, **params): + """Helper to send POST request with auth response""" + try: + from urllib.parse import urlencode + except ImportError: + from urllib import urlencode + + response = requests.post( + "http://localhost:{}".format(port), + data=params, + headers={'Content-Type': 'application/x-www-form-urlencoded'} + ) + return response + + def _verify_post_escaping(self, port, malicious_error): + """Helper to verify HTML escaping in POST requests""" + response = self._send_post_auth_response(port, error=malicious_error) + # Verify that the malicious script is escaped + self.assertIn("<script>", response.text) + self.assertNotIn("