Source code for flask_open_directory.query.base_query

# -*- coding: utf-8 -*-
from typing import Union, Iterable, Any, Tuple
from contextlib import contextmanager
import ldap3

from .query_abc import QueryABC
from ..model import ModelABC
from ..base import OpenDirectoryABC
from .._compat import ContextManager


ConnectionCtx = ContextManager[Union[None, ldap3.Connection]]


def _quote_if_str(val):
    """Helper to quote a value if it's a string.

    """
    if isinstance(val, str):
        return "'{}'".format(val)
    return val


[docs]class BaseQuery(QueryABC): """Implementation of :class:`QueryABC` abstract class. This is used to search an ldap connection and convert the entries into a :class:`ModelABC` subclass. This also adds a helpful context manager :meth:`BaseQuery.connection_ctx`, which is helpful if you do not have a flask application running, it will try to create a connection with the ``open_directory`` set on the instance for the search. """ # used if no search filter is set on an instance. _default_search_filter = '(objectClass=*)' def __init__(self, open_directory: Any=None, model: Any=None, search_base: str=None, search_filter: str=None, connection: ldap3.Connection=None, ldap_attributes: Iterable[str]=None ) -> None: self.search_base = search_base self.search_filter = search_filter self.connection = connection self.ldap_attributes = ldap_attributes self.model = model self.open_directory = open_directory @property def model(self) -> Any: """A :class:`ModelABC` subclass used for search criterea and to in converting returned entries into the desired model. The attribute can be set with either a class or an instance of a class, but we always store a reference to the class. """ return getattr(self, '_model', None) @model.setter def model(self, value) -> None: if value: if isinstance(value, ModelABC): self._model = value.__class__ elif issubclass(value, ModelABC): self._model = value else: raise TypeError() @property def open_directory(self) -> Any: """A :class:`OpenDirectoryABC` subclass used for the connection for the query. """ return getattr(self, '_open_directory', None) @open_directory.setter def open_directory(self, value) -> None: if value is not None: if isinstance(value, OpenDirectoryABC): self._open_directory = value else: raise TypeError(value) @property def search_base(self) -> Union[str, None]: """A string representing where to search. This property will try to derive this value from several places. If it set explicitly, then that will be the preferred return value. If it is not set explicitly, then we will check if there is an :class:`OpenDirectory` set for the instance. If so we will use the ``base_dn`` property from the ``open_directory``. We will further check if a ``model`` has been set for the instance and we will add it's ``query_cn``. """ search_base = getattr(self, '_search_base', None) if search_base is None: od_base = None try: od_base = self.open_directory.base_dn except AttributeError: pass model_cn = None try: model_cn = self.model.query_cn() except AttributeError: pass if model_cn is not None and od_base is not None: if "cn=" not in model_cn: search_base = 'cn={},{}'.format(model_cn, od_base) else: search_base = model_cn + ',{}'.format(od_base) elif od_base is not None: search_base = od_base return search_base @search_base.setter def search_base(self, value) -> None: if value is not None: self._search_base = str(value) @property def search_filter(self) -> str: """The ldap filter string for the query. If not set then we will return the class's ``_default_search_filter`` which is '(objectClass=*)' """ search_filter = getattr(self, '_search_filter', None) if search_filter is not None: return str(search_filter) return self._default_search_filter @search_filter.setter def search_filter(self, value) -> None: if value is not None: self._search_filter = str(value) @property def ldap_attributes(self) -> Union[None, Iterable[str]]: """The ldap attributes to return with the query. These should be a list/tuple of strings that are attributes in the ldap database. If none have been set explicitly on an instance, then we will check if there is a ``model``, and use it's ``ldap_keys`` for this value. """ attrs = getattr(self, '_ldap_attributes', None) if attrs is None: try: attrs = self.model.ldap_keys() except AttributeError: pass if attrs is not None: return tuple(attrs) @ldap_attributes.setter def ldap_attributes(self, value) -> None: if value is not None: if not isinstance(value, str): self._ldap_attributes = [s for s in iter(value) if isinstance(s, str)] else: self._ldap_attributes = [value] @property def connection(self) -> ConnectionCtx: """An :class:`ldap3.Connection` for the query. If this is not set explicitly, then we will see if an ``open_directory`` is set on the instance and return it's ``connection``, which could still be ``None``, if there is no flask application running. If you don't know wheter you are working inside of the application context or not, then it is safer to use the :meth:`BaseQuery.connection_ctx` method. :raises TypeError: If explicitly setting this to something other than an :class:`ldap3.Connection`. """ connection = getattr(self, '_connection', None) if connection is None: try: connection = self.open_directory.connection except AttributeError: pass return connection @connection.setter def connection(self, value): if value: if isinstance(value, ldap3.Connection): self._connection = value else: raise TypeError() @contextmanager
[docs] def connection_ctx(self) -> Union[ldap3.Connection, None]: """A context manager that tries to find a connection to use. If a connection is found on the instance or the ``open_directory`` of an instance, then that will be used. If not it will try to create one using an instances ``open_directory``. If a connection is not able to be found this will yield ``None``, so you should check before using the connection. Example:: >>> with query.connection_ctx() as conn: ... model = query.first(conn) """ if self.connection is not None: yield self.connection elif self.open_directory is not None: with self.open_directory.connection_ctx() as connection: yield connection else: yield
def _query(self, connection) -> Iterable[ldap3.Entry]: """Helper that performs the query and yields :class:`ldap3.Entry`'s """ if connection is not None: self.connection = connection with self.connection_ctx() as conn: if conn is not None: conn.search( self.search_base, self.search_filter, attributes=self.ldap_attributes ) for entry in conn.entries: # check the entries have values. All attributes get returned # as a list, so we check that the list has a length greater # than 0. This is useful, because when using the ``all`` # method, without any filter criteria, the first record is # always empty lists. if any(map(lambda x: len(x) > 0, entry.entry_attributes_as_dict.values())): yield entry # We should probably raise a connection error of some # sort here.
[docs] def first(self, connection: ldap3.Connection=None, convert=True) -> Any: """Query the connection and return the first item found for the current state of the query. This will return ``None`` if a connection was not established or there was no entry to match the filter criteria. :param connection: An optional :class:`ldap3.Connection` to use for the search. :param convert: Whether to convert the :class:`ldap3.Entry` to the ``model`` set on a class. Default is ``True`` """ entry = next(self._query(connection), None) if convert is True and self.model is not None: return self.model.from_entry(entry) return entry
[docs] def all(self, connection: ldap3.Connection=None, convert=True ) -> Tuple[Any]: """Query the connection and return a tuple of all items found for the current state of the query. This will return an empty tuple if a connection was not established or there was no entries to match the filter criteria. :param connection: An optional :class:`ldap3.Connection` to use for the search. :param convert: Whether to convert the :class:`ldap3.Entry`'s to the ``model`` set on a class. Default is ``True`` """ entries = tuple(self._query(connection)) if len(entries) > 0: if convert is True and self.model is not None: return tuple(map(self.model.from_entry, entries)) return entries
def __repr__(self) -> str: rv = '{}('.format(self.__class__.__name__) attrs = map( lambda k: "{}={}".format(k, _quote_if_str(getattr(self, k, None))), ('search_base', 'search_filter', 'ldap_attributes', 'connection', 'open_directory', 'model') ) return rv + ', '.join(attrs) + ')'