Source code for

# -*- coding: utf-8 -*-

A module to run openfisca yaml tests

from __future__ import unicode_literals, print_function, division, absolute_import
from builtins import str

import collections
import copy
import glob
import os
import sys
import unittest
import logging

import nose
import numpy as np
import yaml

from openfisca_core import conv, periods, scenarios
from import assert_near
from openfisca_core.commons import to_unicode

log = logging.getLogger(__name__)

    from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
        ' '
        'libyaml is not installed in your environment, this can make your '
        'test suite slower to run. Once you have installed libyaml, run `pip '
        'uninstall pyyaml && pip install pyyaml --no-cache-dir` so that it is used in your '
        'Python environment.')
    from yaml import Loader, Dumper

# Yaml module configuration

class unicode_folder(str):

class unicode_literal(str):

Loader.add_constructor(yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, lambda loader, node: collections.OrderedDict(loader.construct_pairs(node)))
Dumper.add_representer(collections.OrderedDict, lambda dumper, data: dumper.represent_dict((copy.deepcopy(key), value) for key, value in data.items()))
Dumper.add_representer(dict, lambda dumper, data: dumper.represent_dict((copy.deepcopy(key), value) for key, value in data.items()))
Dumper.add_representer(np.ndarray, lambda dumper, data: dumper.represent_list(data.tolist()))
Dumper.add_representer(tuple, lambda dumper, data: dumper.represent_list(data))
Dumper.add_representer(unicode_folder, lambda dumper, data: dumper.represent_scalar(',2002:str', data, style='>'))
Dumper.add_representer(unicode_literal, lambda dumper, data: dumper.represent_scalar(',2002:str', data, style='|'))
Dumper.add_representer(periods.Instant, lambda dumper, data: dumper.represent_scalar(',2002:str', str(data)))
Dumper.add_representer(periods.Period, lambda dumper, data: dumper.represent_scalar(',2002:str', str(data)))
Dumper.add_representer(str, lambda dumper, data: dumper.represent_scalar(',2002:str', data))

# Exposed methods

[docs]def generate_tests(tax_benefit_system, paths, options = {}): """ Generates a lazy iterator of all the YAML tests contained in a file or a directory. :parameters: Same as :meth:`run_tests` :return: a generator of YAML tests """ if isinstance(paths, str): paths = [paths] for path in paths: if os.path.isdir(path): for test in _generate_tests_from_directory(tax_benefit_system, path, options): yield test else: for test in _generate_tests_from_file(tax_benefit_system, path, options): yield test
[docs]def run_tests(tax_benefit_system, paths, options = {}): """ Runs all the YAML tests contained in a file or a directory. If `path` is a directory, subdirectories will be recursively explored. :param TaxBenefitSystem tax_benefit_system: the tax-benefit system to use to run the tests :param (str/list) paths: A path, or a list of paths, towards the files or directories containing the tests to run. If a path is a directory, subdirectories will be recursively explored. :param dict options: See more details below. :raises AssertionError: if a test does not pass :return: the number of sucessful tests excecuted **Testing options**: +-------------------------------+-----------+-------------------------------------------+ | Key | Type | Role | +===============================+===========+===========================================+ | verbose | ``bool`` | | +-------------------------------+-----------+ See :any:`openfisca-run-test` options doc + | name_filter | ``str`` | | +-------------------------------+-----------+-------------------------------------------+ """ argv = sys.argv[:1] # Nose crashes if it gets any unexpected argument. if options.get('verbose'): argv.append('--nocapture') # Do not capture output when verbose mode is activated return # The suite argument must be a lambda for nose to run the tests lazily suite = lambda: generate_tests(tax_benefit_system, paths, options), argv = argv, )
# Internal methods def _generate_tests_from_file(tax_benefit_system, path_to_file, options): filename = os.path.splitext(os.path.basename(path_to_file))[0] name_filter = options.get('name_filter') if isinstance(name_filter, str): name_filter = to_unicode(name_filter) verbose = options.get('verbose') only_variables = options.get('only_variables') ignore_variables = options.get('ignore_variables') tests = _parse_test_file(tax_benefit_system, path_to_file) for test_index, (path_to_file, name, period_str, test) in enumerate(tests, 1): if name_filter is not None and name_filter not in filename \ and name_filter not in (test.get('name', '')) \ and name_filter not in (test.get('keywords', [])): continue keywords = test.get('keywords', []) title = "{}: {}{} - {}".format( os.path.basename(path_to_file), '[{}] '.format(', '.join(keywords)) if keywords else '', name, period_str, ) def check(): try: _run_test(period_str, test, verbose, only_variables, ignore_variables, options) except Exception: log.error(title) raise yield unittest.FunctionTestCase(check) def _generate_tests_from_directory(tax_benefit_system, path_to_dir, options): yaml_paths = glob.glob(os.path.join(path_to_dir, "*.yaml")) subdirectories = glob.glob(os.path.join(path_to_dir, "*/")) for yaml_path in yaml_paths: for test in _generate_tests_from_file(tax_benefit_system, yaml_path, options): yield test for subdirectory in subdirectories: for test in _generate_tests_from_directory(tax_benefit_system, subdirectory, options): yield test def _parse_test_file(tax_benefit_system, yaml_path): filename = os.path.splitext(os.path.basename(yaml_path))[0] with open(yaml_path) as yaml_file: try: tests = yaml.load(yaml_file, Loader=Loader) except yaml.scanner.ScannerError: log.error("{} is not a valid YAML file".format(yaml_path).encode('utf-8')) raise tests, error = conv.pipe( conv.make_item_to_singleton(), conv.uniform_sequence( conv.noop, drop_none_items = True, ), )(tests) if error is not None: embedding_error = conv.embed_error(tests, 'errors', error) assert embedding_error is None, embedding_error raise ValueError("Error in test {}:\n{}".format(yaml_path, yaml.dump(tests, Dumper=Dumper, allow_unicode = True, default_flow_style = False, indent = 2, width = 120))) for test in tests: current_tax_benefit_system = tax_benefit_system if test.get('reforms'): reforms = test.pop('reforms') if not isinstance(reforms, list): reforms = [reforms] for reform_path in reforms: current_tax_benefit_system = current_tax_benefit_system.apply_reform(reform_path) try: test, error = scenarios.make_json_or_python_to_test( tax_benefit_system = current_tax_benefit_system )(test) except Exception: log.error("{} is not a valid OpenFisca test file".format(yaml_path).encode('utf-8')) raise if error is not None: embedding_error = conv.embed_error(test, 'errors', error) assert embedding_error is None, embedding_error raise ValueError("Error in test {}:\n{}\nYaml test content: \n{}\n".format( yaml_path, error, yaml.dump(test, Dumper=Dumper, allow_unicode = True, default_flow_style = False, indent = 2, width = 120))) yield yaml_path, test.get('name') or filename, to_unicode(test['scenario'].period), test def _run_test(period_str, test, verbose = False, only_variables = None, ignore_variables = None, options = {}): absolute_error_margin = None relative_error_margin = None if test.get('absolute_error_margin') is not None: absolute_error_margin = test.get('absolute_error_margin') if test.get('relative_error_margin') is not None: relative_error_margin = test.get('relative_error_margin') scenario = test['scenario'] scenario.suggest() simulation = scenario.new_simulation(trace = verbose) output_variables = test.get('output_variables') if output_variables is not None: try: for variable_name, expected_value in output_variables.items(): variable_ignored = ignore_variables is not None and variable_name in ignore_variables variable_not_tested = only_variables is not None and variable_name not in only_variables if variable_ignored or variable_not_tested: continue # Skip this variable if isinstance(expected_value, dict): for requested_period, expected_value_at_period in expected_value.items(): assert_near( simulation.calculate(variable_name, requested_period), expected_value_at_period, absolute_error_margin = absolute_error_margin, message = '{}@{}: '.format(variable_name, requested_period), relative_error_margin = relative_error_margin, ) else: assert_near( simulation.calculate(variable_name), expected_value, absolute_error_margin = absolute_error_margin, message = '{}@{}: '.format(variable_name, period_str), relative_error_margin = relative_error_margin, ) finally: if verbose: print("Computation log:") simulation.tracer.print_computation_log()