From 0b3b224fef430a3b485c73508d1e57e6f17933d0 Mon Sep 17 00:00:00 2001 From: pcworld <0188801@gmail.com> Date: Mon, 21 Feb 2022 15:48:13 +0100 Subject: [PATCH] More error handling --- ldapom/connection.py | 41 +++++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/ldapom/connection.py b/ldapom/connection.py index b89fe72..335a975 100644 --- a/ldapom/connection.py +++ b/ldapom/connection.py @@ -24,13 +24,15 @@ LDAP_OPT_X_TLS_TRY = libldap.LDAP_OPT_X_TLS_TRY -def handle_ldap_error(err): +def handle_ldap_error(err, force=False): """Given an LDAP error code, raise an error if needed. :param err: The error code returned by the library. :type err: int """ if err == libldap.LDAP_SUCCESS: + if force: + raise error.LDAPError('unexpected LDAP_SUCCESS') return error_string = compat._decode_utf8(ffi.string(libldap.ldap_err2string(err))) @@ -124,27 +126,37 @@ def _connect(self): handle_ldap_error(err) self._ld = ld_p[0] + # For simplicity, we use handle_ldap_error for ldap_set_option errors. + # This is not quite correct, but should work good enough if at least the success code is the same. + if libldap.LDAP_SUCCESS != libldap.LDAP_OPT_SUCCESS: + raise RuntimeError('libldap.LDAP_SUCCESS != libldap.LDAP_OPT_SUCCESS, handle_ldap_error does not work with this') + version_p = ffi.new("int *") version_p[0] = libldap.LDAP_VERSION3 - libldap.ldap_set_option(self._ld, libldap.LDAP_OPT_PROTOCOL_VERSION, version_p) + err = libldap.ldap_set_option(self._ld, libldap.LDAP_OPT_PROTOCOL_VERSION, version_p) + handle_ldap_error(err) if self._cacertfile is not None: - libldap.ldap_set_option(self._ld, libldap.LDAP_OPT_X_TLS_CACERTFILE, + err = libldap.ldap_set_option(self._ld, libldap.LDAP_OPT_X_TLS_CACERTFILE, compat._encode_utf8(self._cacertfile)) + handle_ldap_error(err) require_cert_p = ffi.new("int *") require_cert_p[0] = self._require_cert - libldap.ldap_set_option(self._ld, libldap.LDAP_OPT_X_TLS_REQUIRE_CERT, - require_cert_p); + err = libldap.ldap_set_option(self._ld, libldap.LDAP_OPT_X_TLS_REQUIRE_CERT, + require_cert_p) + handle_ldap_error(err) # For TLS options to take effect, a context refresh seems to be needed. newctx_p = ffi.new("int *") newctx_p[0] = 0 - libldap.ldap_set_option(self._ld, libldap.LDAP_OPT_X_TLS_NEWCTX, newctx_p) + err = libldap.ldap_set_option(self._ld, libldap.LDAP_OPT_X_TLS_NEWCTX, newctx_p) + handle_ldap_error(err) timelimit_p = ffi.new("int *") timelimit_p[0] = self._timelimit - libldap.ldap_set_option(self._ld, libldap.LDAP_OPT_TIMELIMIT, timelimit_p) + err = libldap.ldap_set_option(self._ld, libldap.LDAP_OPT_TIMELIMIT, timelimit_p) + handle_ldap_error(err) err = libldap.ldap_simple_bind_s(self._ld, compat._encode_utf8(self._bind_dn), @@ -264,12 +276,17 @@ def _raw_search(self, search_filter=None, retrieve_attributes=None, handle_ldap_error(err) search_result = search_result_p[0] + self._ld.ld_errno = libldap.LDAP_SUCCESS current_entry = libldap.ldap_first_entry(self._ld, search_result) while current_entry != ffi.NULL: - dn = ffi.string(libldap.ldap_get_dn(self._ld, current_entry)) + dn_cstr = libldap.ldap_get_dn(self._ld, current_entry) + if dn_cstr == ffi.NULL: + handle_ldap_error(self._ld.ld_errno, True) + dn = ffi.string(dn_cstr) attribute_dict = {} ber_p = ffi.new("BerElement **") + self._ld.ld_errno = libldap.LDAP_SUCCESS current_attribute = libldap.ldap_first_attribute(self._ld, current_entry, ber_p) while current_attribute != ffi.NULL: @@ -278,17 +295,25 @@ def _raw_search(self, search_filter=None, retrieve_attributes=None, values_p = libldap.ldap_get_values_len(self._ld, current_entry, current_attribute) + if values_p == ffi.NULL: + handle_ldap_error(self._ld.ld_errno, True) for i in range(0, libldap.ldap_count_values_len(values_p)): val = ffi.buffer(values_p[i].bv_val, values_p[i].bv_len)[:] attribute_dict[current_attribute_str].append(val) libldap.ldap_memfree(current_attribute) + self._ld.ld_errno = libldap.LDAP_SUCCESS current_attribute = libldap.ldap_next_attribute(self._ld, current_entry, ber_p[0]) + # handle both ldap_first_attribute and ldap_next_attribute, they return NULL on error + handle_ldap_error(self._ld.ld_errno) libldap.ber_free(ber_p[0], 0) yield (dn, attribute_dict) + self._ld.ld_errno = libldap.LDAP_SUCCESS current_entry = libldap.ldap_next_entry(self._ld, current_entry) + # handle both ldap_first_entry and ldap_next_entry, they return NULL on error + handle_ldap_error(self._ld.ld_errno) libldap.ldap_msgfree(search_result)