Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of ldap filter with anonymous user #186

Merged
merged 1 commit into from
Mar 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions ldap_auth_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,9 @@ def parse_config(config) -> "_LdapConfig":

ldap_config = _LdapConfig(
enabled=config.get("enabled", False),
mode=LDAPMode.SIMPLE,
mode=LDAPMode.SEARCH
if config.get("mode", "simple") == "search"
else LDAPMode.SIMPLE,
uri=config["uri"],
start_tls=config.get("start_tls", False),
tls_options=config.get("tls_options"),
Expand Down Expand Up @@ -403,6 +405,8 @@ def parse_config(config) -> "_LdapConfig":
raise ValueError(
"Either bind_password or bind_password_file must be set!"
)

if ldap_config.mode == LDAPMode.SEARCH:
ldap_config.filter = config.get("filter", None)

# verify attribute lookup
Expand Down Expand Up @@ -461,13 +465,16 @@ async def _fetch_root_domain(self) -> str:
server = self._get_server(get_info=ldap3.DSA)

if self.ldap_bind_dn is None or self.ldap_bind_password is None:
raise ValueError("Missing bind DN or bind password")

result, conn = await self._ldap_simple_bind(
server=server,
bind_dn=self.ldap_bind_dn,
password=self.ldap_bind_password,
)
result, conn = await self._ldap_simple_bind(
server=server,
auth_type=ldap3.ANONYMOUS,
)
else:
result, conn = await self._ldap_simple_bind(
server=server,
bind_dn=self.ldap_bind_dn,
password=self.ldap_bind_password,
)

if not result:
logger.warning("Unable to get root domain due to failed LDAP bind")
Expand Down Expand Up @@ -503,7 +510,11 @@ async def _fetch_root_domain(self) -> str:
return self.ldap_root_domain

async def _ldap_simple_bind(
self, server: ldap3.ServerPool, bind_dn: str, password: str
self,
server: ldap3.ServerPool,
bind_dn: Optional[str] = None,
password: Optional[str] = None,
auth_type: str = ldap3.SIMPLE,
) -> Tuple[bool, Optional[ldap3.Connection]]:
"""Attempt a simple bind with the credentials given by the user against
the LDAP server.
Expand All @@ -513,6 +524,8 @@ async def _ldap_simple_bind(
Returns False, None
if an error occured
"""
if (bind_dn is None or password is None) and auth_type == ldap3.SIMPLE:
raise ValueError("Missing bind DN or bind password")

try:
# bind with the the local user's ldap credentials
Expand All @@ -521,7 +534,7 @@ async def _ldap_simple_bind(
server,
bind_dn,
password,
authentication=ldap3.SIMPLE,
authentication=auth_type,
read_only=True,
)
logger.debug("Established LDAP connection in simple bind mode: %s", conn)
Expand Down Expand Up @@ -578,13 +591,16 @@ async def _ldap_authenticated_search(

try:
if self.ldap_bind_dn is None or self.ldap_bind_password is None:
raise ValueError("Missing bind DN or bind password")

result, conn = await self._ldap_simple_bind(
server=server,
bind_dn=self.ldap_bind_dn,
password=self.ldap_bind_password,
)
result, conn = await self._ldap_simple_bind(
server=server,
auth_type=ldap3.ANONYMOUS,
)
else:
result, conn = await self._ldap_simple_bind(
server=server,
bind_dn=self.ldap_bind_dn,
password=self.ldap_bind_password,
)

if not result:
return (False, None, None)
Expand Down
Loading