""" Abstract base classes for the system. The AHUOb
"""
import abc
import pandas as pd
import numpy as np
import copy
import uuid
from marshmallow import Schema, post_load, fields
from marshmallow.exceptions import ValidationError
from .exceptions import InternalNotDefinedError, CollectionLoadError, CollectionValidationError, AdapterChainError
from .registry import register_collection, get_class_from_collection_registry, adapter_path
from .utils import DataFrameDtypeConversion, RecordUtils
import logging
l = logging.getLogger(__name__)
# a place for the registry of internals after they are constructed
[docs]class InternalObject(object):
""" a namespace class for instance checking for an internally used model object
It is otherwise a normal python object. _Internals are used as medium for
serialization and deserialization and their declarations bound with Collections and enforced by Serializers.
It can be inherited from or used as a Mixin.
"""
is_binx_internal = True
registered_colls = set() #NOTE these are collections. A coll's metaclass hook appends any collection objects here
def __init__(self, *args, **kwargs):
self.__dict__.update(kwargs)
[docs]class BaseSerializer(Schema):
"""The BaseSerializer overrides Schema to include a internal to dump associated InternalObjects.
These are instantiated with the serializer and used for loading and validating data.
It also provides a mapping of numpy dtypes to a select amount of marshmallow field name which helps optimize
memory in the to_dataframe object
"""
registered_colls = set()
numpy_map = {
fields.Integer: np.dtype('int'),
fields.Float: np.dtype('float'),
fields.Str: np.dtype('str'),
fields.Date: np.dtype('datetime64[ns]'),
fields.DateTime: np.dtype('datetime64[ns]'),
fields.List: np.dtype('O'),
fields.Bool: np.dtype('bool'),
fields.Dict: np.dtype('O'),
fields.Nested: np.dtype('O')
}
def __init__(self, *args, **kwargs):
if 'internal' in kwargs:
self._InternalClass = kwargs.pop('internal')
else:
raise InternalNotDefinedError('An InternalObject class must be instantiated with this Collection')
super().__init__(*args, **kwargs)
self.dateformat_fields = self._set_dateformat_fields()
def _set_dateformat_fields(self):
""" builds a mapping of date formatted column names to string formats for Collection.load_data
"""
#XXX this is breaking in 3.0... need to look at Meta object. field level attrs for Date not supported
dateformat_fields = {}
for col,field in self.fields.items():
if isinstance(field, fields.Date):
if self.opts.dateformat is not None:
dateformat_fields[col] = self.opts.dateformat
else:
dateformat_fields[col] = '%Y-%m-%d' # we set this as a default for datetime.date based objects
elif isinstance(field, fields.DateTime):
if self.opts.datetimeformat is not None:
dateformat_fields[col] = self.opts.datetimeformat
return dateformat_fields
[docs] @post_load
def load_object(self, data, **kwargs):
""" loads and validates an internal class object """
return self._InternalClass(**data)
[docs] def get_numpy_fields(self):
""" returns a dictionary of column names and numpy dtypes based on the ma_np_map dictionary.
Collections will use this to create more mem-optimized dataframes
"""
out = {}
for field_name in self._declared_fields.keys():
ma_klass = self.__class__._declared_fields[field_name]
out[field_name] = self.numpy_map.get(type(ma_klass)) or np.dtype('O')
return out
# compose a mixed Metaclass that registers and provides an abstract interface
AbstractCollectionMeta = type('AbstractCollectionMeta', (abc.ABC, CollectionMeta), {})
[docs]class AbstractCollection(object, metaclass=AbstractCollectionMeta):
"""Defines an interface for Collection objects. This includes a valid marshmallow
serializer class, a data list object iterablem, load_data method with validation.
Collections are also registered so this AbstractCollection uses AbstractCollectionMeta as
a metaclass
"""
[docs] @abc.abstractmethod
def get_fully_qualified_class_path(self):
""" reaches into the registry and gets the fully qualified class path"""
@property
@abc.abstractmethod
def serializer_class(self):
""" returns an ma serializer. Used for validation and instantiation """
@property
@abc.abstractmethod
def internal_class(self):
""" returns an ma serializer. Used for validation and instantiation
NOTE possibly change to class method
"""
@property
@abc.abstractmethod
def data(self):
""" returns an object-representation of the metadata using the serializer
"""
[docs] @abc.abstractmethod
def load_data(self, object):
""" uses a marshmallow serializer to validate and load the data into an object-record
representation
"""
[docs] @abc.abstractmethod
def to_dataframe(self):
""" returns a dataframe representation of the object. This wraps the data property in a pd.DataFrame
"""
[docs] @abc.abstractmethod
def to_json(self):
""" returns a json string representation of the data using the serializer
"""
[docs]class BaseCollection(AbstractCollection):
""" Used to implement many of the default AbstractCollection methods
Subclasses will mostly just need to define a custom Serializer and InternalObject pair
:param data: the data being passed into the serializer, could be a dataframe or list of records. If None
"""
serializer_class = BaseSerializer # must be overridden with a valid marshmallow schema and _Internal
internal_class = InternalObject
def __new__(cls, *args, **kwargs):
cls.serializer_class.registered_colls.add(cls) # register the cls here
cls.internal_class.registered_colls.add(cls)
inst = super(BaseCollection, cls).__new__(cls) # changed here 0.4 to allow args to be passed into __init__
return inst
def __init__(self, data=None, **ma_kwargs):
self._data = []
self._serializer = self.serializer_class(internal=self.__class__.internal_class, **ma_kwargs)
if data is not None:
self.load_data(data)
self.__collection_id = uuid.uuid4().hex
[docs] @classmethod
def get_fully_qualified_class_path(cls):
""" This returns the fully qualified class name for this class. This can be used for collection_registry lookup
"""
return cls.__module__ + '.' + cls.__name__
[docs] @classmethod
def get_registry_entry(cls):
""" This returns the complete registry entry for this class
"""
return get_class_from_collection_registry(cls.get_fully_qualified_class_path())
@property
def serializer(self):
""" returns an ma serializer. Used for validation and instantiation """
return self._serializer
@property
def data(self):
""" returns an object-representation of the metadata using the serializer
"""
if len(self._data) == 0:
return self._data
return self.serializer.dump(self._data, many=True) # changed to update ma v3
@property
def internal(self):
""" returns a class of the internal object
"""
return self.__class__.internal_class
@property
def collection_id(self):
return self.__collection_id
def __iter__(self):
self._idx = 0
return self
def __next__(self):
self._idx += 1
if self._idx > len(self._data):
raise StopIteration
return self._data[self._idx-1]
def __len__(self):
return len(self._data)
def __getitem__(self, i):
return self._data[i]
def __add__(self, other):
if isinstance(other, self.__class__):
combined = self.data + other.data
new_inst = self.__class__()
new_inst.load_data(combined)
return new_inst
else:
raise TypeError('Only Collections of the same class can be concatenated')
@classmethod
def _resolve_adapter_chain(cls, input_collection, accumulate, **adapter_context):
""" attempts to resolve the adapter chain using the current class as the target and
input as the starting class. The adapter context accumulates over each call and ensures that
kwargs needed for certain adapter calls are guaranteed to make it to the correct adapter.
returns the final AdapterOutputContainer with accumulated context or None if there are no adapters in the adapter chain
This raises an AdapterChainError in adapt
"""
adapters = adapter_path(input_collection.__class__, cls)
if len(adapters) == 0: # return an empty list if no adapters can be found
return
try:
current_context = adapter_context # set starting point... these are instances and will be modified below
current_input = input_collection # NOTE this is an instance with data to be transformed.. not a class
adapter_output = None
for i, adapter_class in enumerate(adapters):
# if accumulate make a new key in the current context for the current collection the collection name in the registry
if accumulate and i > 0:
coll_id = current_input.__class__.__name__
current_context[coll_id] = copy.copy(current_input)
current_adapter = adapter_class() # for each adapter class we push the input_collection and a context
adapter_output = current_adapter(current_input, **current_context) # adapt data to the next type of collection
current_context = {**current_context, **adapter_output.context} # NOTE this is will fail on py3.4
current_input = adapter_output.collection
except Exception as err:
e = AdapterChainError('An error occurred within the adapter chain')
e.context = current_context
raise e from err
adapter_output._context = current_context # set final context
return adapter_output
def _dataframe_with_dtypes(self, data):
""" converts records to column format
"""
rutil = RecordUtils()
dfutil = DataFrameDtypeConversion()
try:
col_data = rutil.records_to_columns(data)
except IndexError:
return pd.DataFrame()
dtype_map = self.serializer.get_numpy_fields()
# iterate columns and construct a dictionary of pd.Series with correct-dtype
df_data = {} # a dictionary of pd.Series with dtypes keyed by col names
for col, dtype in dtype_map.items():
try:
if dtype == np.dtype('int') and any([c is None for c in col_data[col]]):
dtype = None # NOTE should coerce an int to a float if there are nans
df_data[col] = pd.Series(col_data[col], dtype=dtype)
except KeyError as err:
l.warning('Creating df without non-required field {}'.format(col))
pass
df = pd.DataFrame(df_data)
df = dfutil.df_none_to_nan(df)
return df
def _clean_dataframe(self, df):
""" cleans and converts formats on a dataframe
"""
formatfields = self.serializer.dateformat_fields
util = DataFrameDtypeConversion()
df = util.df_nan_to_none(df)
if len(formatfields) > 0:
date_col_mapping = self.serializer.dateformat_fields
df = util.date_to_string(formatfields, df)
records = df.to_dict('records')
return records
def _clean_records(self, records):
formatfields = self.serializer.dateformat_fields
util = RecordUtils()
if len(formatfields) > 0:
records = util.date_to_string(formatfields, records)
return records
[docs] def load_data(self, records, raise_on_empty=False):
"""default implementation. Defaults to handling lists of python-dicts (records).
#TODO -- create a drop_duplicates option and use pandas to drop the dupes
"""
try:
if raise_on_empty and len(records) == 0:
raise ValueError('An empty set of records was passed to load_data')
if isinstance(records, pd.DataFrame):
records = self._clean_dataframe(records)
else:
records = self._clean_records(records)
# append to the data dictionary
# NOTE changing this to handle tuples in marsh 2.x
valid = self.serializer.load(records, many=True)
self._data += valid
except TypeError as err:
raise CollectionLoadError('A Serializer must be instantiated with valid fields') from err
except ValidationError as err:
errors = err.messages
l.error(errors)
raise CollectionValidationError('A ValidationError occurred while trying to load {}'.format(self.__class__.__name__)) from err
except Exception as err:
raise CollectionLoadError('An error occurred while loading and validating records') from err
[docs] @classmethod
def adapt(cls, input_collection, accumulate=False, **adapter_context):
""" Attempts to adapt the input collection instance into a collection of this type by
resolving the adapter chain for the input collection. Any kwargs passed in are handed over to the resolver.
colla = CollectionA()
colla.load_data(some_data)
collb, context = CollectionB.adapt(colla, some_var=42, some_other_var=66)
This method returns a new instance of the adapted class (the caller)
"""
if not issubclass(input_collection.__class__, BaseCollection): #check if its a Collection or raise TypeError
raise TypeError('The input to adapt must be a Collection')
adapted = cls._resolve_adapter_chain(input_collection, accumulate, **adapter_context) # attempt to resolve the adapter chain
if adapted is not None:
return adapted.collection, adapted.context # on success we return the new collection and the accumulated context for reference
else:
raise AdapterChainError('The input_collection {} could not be found on the adapter chain for {}'.format(
input_collection.__class__.__name__, cls.__name__))
[docs] def to_dataframe(self):
""" returns a dataframe representation of the object. This wraps the data property in a
pd.DataFrame
converts any columns that can be converted to datetime
"""
return self._dataframe_with_dtypes(self.data)
[docs] def to_json(self):
""" returns a json string representation of the data using the serializer
"""
return self.serializer.dumps(self._data, many=True)
[docs]class AbstractCollectionBuilder(abc.ABC):
""" An interface for the CollectionBuilder. A build method takes a subclass of BaseSerializer
and creates a Collection class dynamically. Its use is optional but is designed to cut down on
class declarations if the user is making many generic Collection implementations.
"""
[docs] @abc.abstractmethod
def build(self, serializer):
""" builds a collection object
"""
[docs]class CollectionBuilder(AbstractCollectionBuilder):
""" A factory class that contructs Collection objects dynamically, providing a default
namespace for binx.registry and the adapter chain.
"""
def __init__(self, name=None, unique_fields=None):
self.name = name # NOTE in v0.3.0 the name can be optionally set in the build. Left in for backwards compatibility
self.unique_fields = None #NOTE placeholder... future builds will be able to declare unique constraints here
def _make_dynamic_class(self, name, args, base_class=InternalObject):
""" a factory method for making classes dynamically.The default base_class thats used
is the InternalObject. NOTE args is an iterable
"""
def __init__(self, **kwargs):
base_class.__init__(self)
for k,v in kwargs.items():
if k not in args:
raise TypeError("Argument {} not valid for {}".format(k, self.__class__.__name__))
setattr(self, k, v)
return type(name, (base_class, ), {'__init__': __init__ })
def _make_collection_class(self, name, serializer_class, internal_class, base_class=BaseCollection):
""" specifically makes collection classes by assigning the two necessary class attributes
"""
class_attrs = {'serializer_class': serializer_class, 'internal_class': internal_class}
x = type(name, (base_class, ), class_attrs)
return x
def _parse_names(self, name):
""" makes sure the user provided name is cleaned up
"""
coll_name = name + 'Collection'
internal_name = name+ 'Internal'
return coll_name, internal_name
def _get_declared_fields(self, serializer_class):
""" introspects the declared fields on the serializer object and returns a
list of those variable names
"""
return list(vars(serializer_class)['_declared_fields'].keys())
def _build_internal(self, name, serializer_class):
""" constructs and registers the internal object for the collection.
Returns a subclass of InternalObject. This is used internally in the classes
build method, but also can be used to
"""
args = self._get_declared_fields(serializer_class)
klass = self._make_dynamic_class(name, args, base_class=InternalObject)
return klass
def _get_name_from_serializer_class(self, serializer_class):
""" helper that parses the serializer_class for a name to use when constructing the collection
This automatically looks for 'Serializer' and 'Schema' on the class name and deletes them, leaving
a the "root" name that will be given to the dynamically created objects.
"""
return serializer_class.__name__.replace('Serializer', '').replace('Schema', '')
[docs] def build(self, serializer_class, name=None, internal_only=False):
""" dynamically creates and returns a Collection class given a serializer
and identifier. If internal_only is set to True then this will only return the internal.
This is useful if you are using a declarative approach to defining the collections and want to
add or override some of the base behavior
"""
# name detection. Check init for a string, then check build kwarg. If either is None then
# derive the name from the serializer_class.
if self.name: #
name = self.name
if name is None:
name = self._get_name_from_serializer_class(serializer_class)
coll_name, internal_name = self._parse_names(name) # create the col name
internal_class = self._build_internal(internal_name, serializer_class) # create the internal class
if internal_only:
return internal_class
return self._make_collection_class(coll_name, serializer_class, internal_class, base_class=BaseCollection) # pass in the serializer