|
13 | 13 | from io import StringIO
|
14 | 14 |
|
15 | 15 | from ansible.compat.tests.mock import patch, MagicMock
|
| 16 | +from ansible.errors import AnsibleConnectionFailure |
| 17 | +from ansible.module_utils._text import to_bytes |
16 | 18 | from ansible.playbook.play_context import PlayContext
|
17 | 19 | from ansible.plugins.loader import connection_loader
|
18 | 20 |
|
@@ -219,3 +221,148 @@ def test_set_options(self, play, options, direct, expected, kerb):
|
219 | 221 | % (attr, actual, expected)
|
220 | 222 |
|
221 | 223 | module_patcher.stop()
|
| 224 | + |
| 225 | + |
| 226 | +class TestWinRMKerbAuth(object): |
| 227 | + |
| 228 | + DATA = ( |
| 229 | + # default |
| 230 | + ({"_extras": {}}, (["kinit", "user@domain"],), False), |
| 231 | + ({"_extras": {}}, ("kinit", ["user@domain"],), True), |
| 232 | + |
| 233 | + # override kinit path from options |
| 234 | + ({"_extras": {}, 'ansible_winrm_kinit_cmd': 'kinit2'}, |
| 235 | + (["kinit2", "user@domain"],), False), |
| 236 | + ({"_extras": {}, 'ansible_winrm_kinit_cmd': 'kinit2'}, |
| 237 | + ("kinit2", ["user@domain"],), True), |
| 238 | + |
| 239 | + # we expect the -f flag when delegation is set |
| 240 | + ({"_extras": {'ansible_winrm_kerberos_delegation': True}}, |
| 241 | + (["kinit", "-f", "user@domain"],), False), |
| 242 | + ({"_extras": {'ansible_winrm_kerberos_delegation': True}}, |
| 243 | + ("kinit", ["-f", "user@domain"],), True), |
| 244 | + ) |
| 245 | + |
| 246 | + # pylint bug: https://github.com/PyCQA/pylint/issues/511 |
| 247 | + # pylint: disable=undefined-variable |
| 248 | + @pytest.mark.parametrize('options, expected, pexpect', |
| 249 | + ((o, e, p) for o, e, p in DATA)) |
| 250 | + def test_kinit_success(self, options, expected, pexpect): |
| 251 | + def mock_popen_communicate(input=None, timeout=None): |
| 252 | + return b"", b"" |
| 253 | + |
| 254 | + mock_pexpect = None |
| 255 | + if pexpect: |
| 256 | + mock_pexpect = MagicMock() |
| 257 | + mock_pexpect.spawn.return_value.exitstatus = 0 |
| 258 | + |
| 259 | + mock_subprocess = MagicMock() |
| 260 | + mock_subprocess.Popen.return_value.communicate = mock_popen_communicate |
| 261 | + mock_subprocess.Popen.return_value.returncode = 0 |
| 262 | + |
| 263 | + modules = { |
| 264 | + 'pexpect': mock_pexpect, |
| 265 | + 'subprocess': mock_subprocess, |
| 266 | + } |
| 267 | + |
| 268 | + with patch.dict(sys.modules, modules): |
| 269 | + pc = PlayContext() |
| 270 | + new_stdin = StringIO() |
| 271 | + |
| 272 | + connection_loader._module_cache = {} |
| 273 | + conn = connection_loader.get('winrm', pc, new_stdin) |
| 274 | + conn.set_options(var_options=options) |
| 275 | + |
| 276 | + conn._kerb_auth("user@domain", "pass") |
| 277 | + if pexpect: |
| 278 | + assert len(mock_pexpect.method_calls) == 1 |
| 279 | + assert mock_pexpect.method_calls[0][1] == expected |
| 280 | + actual_env = mock_pexpect.method_calls[0][2]['env'] |
| 281 | + else: |
| 282 | + assert len(mock_subprocess.method_calls) == 1 |
| 283 | + assert mock_subprocess.method_calls[0][1] == expected |
| 284 | + actual_env = mock_subprocess.method_calls[0][2]['env'] |
| 285 | + |
| 286 | + assert list(actual_env.keys()) == ['KRB5CCNAME'] |
| 287 | + assert actual_env['KRB5CCNAME'].startswith("FILE:/") |
| 288 | + |
| 289 | + # pylint bug: https://github.com/PyCQA/pylint/issues/511 |
| 290 | + # pylint: disable=undefined-variable |
| 291 | + @pytest.mark.parametrize('use_pexpect', (False, True),) |
| 292 | + def test_kinit_with_missing_executable(self, use_pexpect): |
| 293 | + expected_err = "[Errno 2] No such file or directory: " \ |
| 294 | + "'/fake/kinit': '/fake/kinit'" |
| 295 | + mock_subprocess = MagicMock() |
| 296 | + mock_subprocess.Popen = MagicMock(side_effect=OSError(expected_err)) |
| 297 | + |
| 298 | + mock_pexpect = None |
| 299 | + if use_pexpect: |
| 300 | + expected_err = "The command was not found or was not " \ |
| 301 | + "executable: /fake/kinit" |
| 302 | + |
| 303 | + mock_pexpect = MagicMock() |
| 304 | + mock_pexpect.ExceptionPexpect = Exception |
| 305 | + mock_pexpect.spawn = MagicMock(side_effect=Exception(expected_err)) |
| 306 | + |
| 307 | + modules = { |
| 308 | + 'pexpect': mock_pexpect, |
| 309 | + 'subprocess': mock_subprocess, |
| 310 | + } |
| 311 | + |
| 312 | + with patch.dict(sys.modules, modules): |
| 313 | + pc = PlayContext() |
| 314 | + new_stdin = StringIO() |
| 315 | + |
| 316 | + connection_loader._module_cache = {} |
| 317 | + conn = connection_loader.get('winrm', pc, new_stdin) |
| 318 | + options = {"_extras": {}, "ansible_winrm_kinit_cmd": "/fake/kinit"} |
| 319 | + conn.set_options(var_options=options) |
| 320 | + |
| 321 | + with pytest.raises(AnsibleConnectionFailure) as err: |
| 322 | + conn._kerb_auth("user@domain", "pass") |
| 323 | + assert str(err.value) == "Kerberos auth failure when calling " \ |
| 324 | + "kinit cmd '/fake/kinit': %s" % expected_err |
| 325 | + |
| 326 | + # pylint bug: https://github.com/PyCQA/pylint/issues/511 |
| 327 | + # pylint: disable=undefined-variable |
| 328 | + @pytest.mark.parametrize('use_pexpect', (False, True),) |
| 329 | + def test_kinit_error(self, use_pexpect): |
| 330 | + mechanism = "subprocess" |
| 331 | + expected_err = "kinit: krb5_parse_name: " \ |
| 332 | + "Configuration file does not specify default realm" |
| 333 | + |
| 334 | + def mock_popen_communicate(input=None, timeout=None): |
| 335 | + return b"", to_bytes(expected_err) |
| 336 | + |
| 337 | + mock_subprocess = MagicMock() |
| 338 | + mock_subprocess.Popen.return_value.communicate = mock_popen_communicate |
| 339 | + mock_subprocess.Popen.return_value.returncode = 1 |
| 340 | + |
| 341 | + mock_pexpect = None |
| 342 | + if use_pexpect: |
| 343 | + mechanism = "pexpect" |
| 344 | + expected_err = "Configuration file does not specify default realm" |
| 345 | + |
| 346 | + mock_pexpect = MagicMock() |
| 347 | + mock_pexpect.spawn.return_value.expect = MagicMock(side_effect=OSError) |
| 348 | + mock_pexpect.spawn.return_value.read.return_value = to_bytes(expected_err) |
| 349 | + mock_pexpect.spawn.return_value.exitstatus = 1 |
| 350 | + |
| 351 | + modules = { |
| 352 | + 'pexpect': mock_pexpect, |
| 353 | + 'subprocess': mock_subprocess, |
| 354 | + } |
| 355 | + |
| 356 | + with patch.dict(sys.modules, modules): |
| 357 | + pc = PlayContext() |
| 358 | + new_stdin = StringIO() |
| 359 | + |
| 360 | + connection_loader._module_cache = {} |
| 361 | + conn = connection_loader.get('winrm', pc, new_stdin) |
| 362 | + conn.set_options(var_options={"_extras": {}}) |
| 363 | + |
| 364 | + with pytest.raises(AnsibleConnectionFailure) as err: |
| 365 | + conn._kerb_auth("invaliduser", "pass") |
| 366 | + |
| 367 | + assert str(err.value) == "Kerberos auth failure for principal " \ |
| 368 | + "invaliduser with %s: %s" % (mechanism, expected_err) |
0 commit comments