# -*- coding: utf-8 -*-
from __future__ import unicode_literals, print_function, division, absolute_import
import numpy as np
import logging
import copy
from collections import defaultdict
from openfisca_core.parameters import ParameterNodeAtInstant, VectorialParameterNodeAtInstant, ALLOWED_PARAM_TYPES
from openfisca_core.taxscales import AbstractTaxScale
log = logging.getLogger(__name__)
[docs]class Tracer(object):
"""
A tracer that records simulation steps to enable exploring calculation steps in details.
.. py:attribute:: requested_calculations
``set`` containing calculations that have been directly requested by the client.
Value example:
>>> {'income_tax<2017-01>', 'basic_income<2017-01>'}
.. py:attribute:: stack
``list`` of the calculations that have started, but have not finished. The first item is one of the :attr:`requested_calculations`, and each other item is a dependency of the one preceding him. Note that after a calculation is finished, :attr:`stack` is always ``[]``.
Value example:
>>> ['income_tax<2017-01>', 'global_income<2017-01>', 'salary<2017-01>']
.. py:attribute:: trace
``dict`` containing, for each calculation, its result and its immediate dependencies.
Value example:
.. code-block:: python
{
'income_tax<2017-01>': {
'dependencies':['global_income<2017-01>', 'nb_children<2017-01>'],
'parameters' : {'taxes.income_tax_rate<2015-01>': 0.15, ...},
'value': 600
},
'global_income<2017-01>': {...}
}
.. py:attribute:: usage_stats
``dict`` containing, for each variable computed, the number of times the variable was requested.
Value example:
.. code-block:: python
{
'salary': {
'nb_requests': 17
},
'global_income': {
'nb_requests': 1
}
}
"""
def __init__(self):
self.requested_calculations = set()
self.stack = []
self.trace = {}
self.usage_stats = defaultdict(lambda: {"nb_requests": 0})
self._computation_log = []
self._aggregates = {}
def clone(self):
new = Tracer()
new.requested_calculations = copy.copy(self.requested_calculations)
new.stack = copy.copy(self.stack)
new.trace = copy.deepcopy(self.trace)
new._computation_log = copy.copy(self._computation_log)
new.usage_stats = copy.deepcopy(self.usage_stats)
new._aggregates = copy.deepcopy(self._aggregates)
return new
@staticmethod
def _get_key(variable_name, period, **parameters):
if parameters.get('extra_params'):
return "{}<{}><{}>".format(variable_name, period, '><'.join(map(str, parameters['extra_params'])))
return "{}<{}>".format(variable_name, period)
[docs] def record_calculation_start(self, variable_name, period, **parameters):
"""
Record that OpenFisca started computing a variable.
:param str variable_name: Name of the variable starting to be computed
:param Period period: Period for which the variable is being computed
:param list parameters: Parameter with which the variable is being computed
"""
key = self._get_key(variable_name, period, **parameters)
if self.stack: # The variable is a dependency of another variable
parent = self.stack[-1]
self.trace[parent]['dependencies'].append(key)
else: # The variable has been requested by the client
self.requested_calculations.add(key)
if not self.trace.get(key):
self.trace[key] = {'dependencies': [], 'parameters': {}}
self.stack.append(key)
self._computation_log.append((key, len(self.stack)))
self.usage_stats[variable_name]['nb_requests'] += 1
def record_calculation_parameter_access(self, parameter_name, period, value):
if isinstance(value, AbstractTaxScale):
value = value.to_dict()
if isinstance(value, np.ndarray):
value = value.tolist()
parent = self.stack[-1]
parameter_key = '{}<{}>'.format(
parameter_name,
period
)
self.trace[parent]['parameters'][parameter_key] = value
[docs] def record_calculation_end(self, variable_name, period, result, **parameters):
"""
Record that OpenFisca finished computing a variable.
:param str variable_name: Name of the variable starting to be computed
:param Period period: Period for which the variable is being computed
:param numpy.ndarray result: Result of the computation
:param list parameters: Parameter with which the variable is being computed
"""
key = self._get_key(variable_name, period, **parameters)
expected_key = self.stack.pop()
if not key == expected_key:
raise ValueError(
"Something went wrong with the simulation tracer: result of '{0}' was expected, got results for '{1}' instead. This does not make sense as the last variable we started computing was '{0}'."
.format(expected_key, key).encode('utf-8')
)
self.trace[key]['value'] = result
[docs] def record_calculation_abortion(self, variable_name, period, **parameters):
"""
Record that OpenFisca aborted computing a variable. This removes all trace of this computation.
:param str variable_name: Name of the variable starting to be computed
:param Period period: Period for which the variable is being computed
:param list parameters: Parameter with which the variable is being computed
"""
key = self._get_key(variable_name, period, **parameters)
expected_key = self.stack.pop()
if not key == expected_key:
raise ValueError(
"Something went wrong with the simulation tracer: calculation of '{1}' was aborted, whereas the last variable we started computing was '{0}'."
.format(expected_key, key).encode('utf-8')
)
if self.stack:
parent = self.stack[-1]
self.trace[parent]['dependencies'].remove(key)
del self.trace[key]
def _get_aggregate(self, key):
if self._aggregates.get(key):
return self._aggregates.get(key)
value = self.trace[key]['value']
try:
aggregated_value = {
'min': np.min(value),
'max': np.max(value),
}
except TypeError: # Much less efficient, but works for strings
aggregated_value = {
'min': min(value),
'max': max(value),
}
try:
aggregated_value['avg'] = np.mean(value)
except TypeError:
aggregated_value['avg'] = np.nan
self._aggregates[key] = aggregated_value
return aggregated_value
def _print_node(self, key, depth, aggregate):
def print_line(depth, node, value):
print("{}{} >> {}".format(' ' * depth, node, value))
if not self.trace.get(key):
return print_line(depth, key, "Calculation aborted due to a circular dependency")
if not aggregate:
return print_line(depth, key, self.trace[key]['value'])
return print_line(depth, key, self._get_aggregate(key))
[docs] def print_trace(self, variable_name, period, extra_params = None, max_depth = 1, aggregate = False, ignore_zero = False):
"""
Print value, the dependencies, and the dependencies values of the variable for the given period (and possibly the given set of extra parameters).
:param str variable_name: Name of the variable to investigate
:param Period period: Period to investigate
:param list extra_params: Set of extra parameters
:param int max_depth: Maximum level of recursion
:param bool aggregate: See :any:`print_computation_log`
:param bool ignore_zero: If ``True``, don't print dependencies if their value is 0
"""
key = self._get_key(variable_name, period, extra_params = extra_params)
def _print_details(key, depth):
if depth > 0 and ignore_zero and np.all(self.trace[key]['value'] == 0):
return
self._print_node(key, depth, aggregate)
if depth < max_depth:
for dependency in self.trace[key]['dependencies']:
_print_details(dependency, depth + 1)
_print_details(key, 0)
[docs] def print_computation_log(self, aggregate = False):
"""
Print the computation log of a simulation.
If ``aggregate`` is ``False`` (default), print the value of each computed vector.
If ``aggregate`` is ``True``, only print the minimum, maximum, and average value of each computed vector.
This mode is more suited for simulations on a large population.
"""
for node, depth in self._computation_log:
self._print_node(node, depth, aggregate)
class TracingParameterNodeAtInstant(object):
def __init__(self, parameter_node_at_instant, tracer):
self.parameter_node_at_instant = parameter_node_at_instant
self.tracer = tracer
def __getattr__(self, key):
child = getattr(self.parameter_node_at_instant, key)
return self.get_traced_child(child, key)
def __getitem__(self, key):
child = self.parameter_node_at_instant[key]
return self.get_traced_child(child, key)
def get_traced_child(self, child, key):
period = self.parameter_node_at_instant._instant_str
if isinstance(child, (ParameterNodeAtInstant, VectorialParameterNodeAtInstant)):
return TracingParameterNodeAtInstant(child, self.tracer)
if not isinstance(key, str) or isinstance(self.parameter_node_at_instant, VectorialParameterNodeAtInstant):
# In case of vectorization, we keep the parent node name as, for instance, rate[status].zone1 is best described as the value of "rate"
name = self.parameter_node_at_instant._name
else:
name = '.'.join([self.parameter_node_at_instant._name, key])
if isinstance(child, (np.ndarray,) + ALLOWED_PARAM_TYPES):
self.tracer.record_calculation_parameter_access(name, period, child)
return child