diff --git a/README.rst b/README.rst index 21e8155..6bd17b5 100644 --- a/README.rst +++ b/README.rst @@ -7,9 +7,7 @@ Trafaret ----- - -Read The Docs hosted documentation -or look to the docs/api/intro.rst for start. +Ultimate transformation library that supports validation, contexts and ``aiohttp``. Trafaret is rigid and powerful lib to work with foreign data, configs etc. It provides simple way to check anything, and convert it accordingly to your needs. @@ -34,6 +32,25 @@ It have shortcut syntax and ability to express anything that you can code: raise DataError(error=errors, trafaret=self) trafaret.DataError: {'b': DataError({1: DataError(value is not a string)})} + +Read The Docs hosted documentation +or look to the docs/api/intro.rst for start. + + +New +--- + +* converters and ``convert=False`` are deleted in favor of ``And`` and ``&`` +* ``String`` parameter ``regex`` deleted in favor of ``Regexp`` and ``RegexpRaw`` usage +* new ``OnError`` to customize error message +* ``context=something`` argument for ``__call__`` and ``check`` Trafaret methods. + Supported by ``Or``, ``And``, ``Forward`` etc. +* new customizable method ``transform`` like ``change_and_return`` but takes ``context=`` arg +* new ``trafaret_instance.async_check`` method that works with ``await`` + +Doc +--- + For simple example what can be done: .. code-block:: python diff --git a/circle.yml b/circle.yml index 318f67a..abd9ce2 100644 --- a/circle.yml +++ b/circle.yml @@ -2,9 +2,13 @@ machine: python: version: - 3.6.0 + 3.6.2 post: - - pyenv local 3.6.0 2.7.10 + - pyenv local 3.6.2 3.5.2 2.7.10 + +test: + override: + - tox dependencies: pre: diff --git a/docs/changelog.rst b/docs/changelog.rst index 7f70a16..a7c8ed8 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -1,6 +1,18 @@ Changelog ========= +2017-08-04 +---------- + +- converters and ``convert=False`` are deleted in favor of ``And`` and ``&`` +- ``String`` parameter ``regex`` deleted in favor of ``Regexp`` and ``RegexpRaw`` usage +- new ``OnError`` to customize error message +- ``context=something`` argument for ``__call__`` and ``check`` Trafaret methods. + Supported by ``Or``, ``And``, ``Forward`` etc. +- new customizable method ``transform`` like ``change_and_return`` but takes ``context=`` arg +- new ``trafaret_instance.async_check`` method that works with ``await`` + + 2017-05-12 ---------- diff --git a/tests/test_base.py b/tests/test_base.py index 1dac6cc..64ede25 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -2,15 +2,16 @@ import unittest import trafaret as t from collections import Mapping as AbcMapping -from trafaret import catch_error, extract_error, ignore, DataError +from trafaret import catch_error, extract_error, DataError from trafaret.extras import KeysSubset class TestAnyTrafaret(unittest.TestCase): def test_any(self): + obj = object() self.assertEqual( - (t.Any() >> ignore).check(object()), - None + t.Any().check(obj), + obj ) @@ -50,14 +51,14 @@ def validator(value): class TestCallableTrafaret(unittest.TestCase): def test_callable(self): - (t.Callable() >> t.ignore).check(lambda: 1) + t.Callable().check(lambda: 1) res = extract_error(t.Callable(), 1) self.assertEqual(res, 'value is not callable') class TestDictTrafaret(unittest.TestCase): def test_base(self): - trafaret = t.Dict(foo=t.Int, bar=t.String) >> t.ignore + trafaret = t.Dict(foo=t.Int, bar=t.String) trafaret.check({"foo": 1, "bar": "spam"}) res = t.extract_error(trafaret, {"foo": 1, "bar": 2}) self.assertEqual(res, {'bar': 'value is not a string'}) @@ -102,8 +103,8 @@ def set_trafaret(self, trafaret): trafaret = t.Dict({ OldKey(): t.Any }) - res = trafaret.check({'testkey': 123}) - self.assertEqual(res, {'testkey': 123}) + with self.assertRaises(ValueError): + trafaret.check({'testkey': 123}) def test_callable_key(self): def simple_key(value): @@ -237,27 +238,27 @@ def test_dict_keys(self): class TestEmailTrafaret(unittest.TestCase): def test_email(self): - res = t.Email().check('someone@example.net') + res = t.Email.check('someone@example.net') self.assertEqual(res, 'someone@example.net') - res = extract_error(t.Email(),'someone@example') # try without domain-part + res = extract_error(t.Email,'someone@example') # try without domain-part self.assertEqual(res, 'value is not a valid email address') - res = str(t.Email().check('someone@пример.рф')) # try with `idna` encoding + res = str(t.Email.check('someone@пример.рф')) # try with `idna` encoding self.assertEqual(res, 'someone@xn--e1afmkfd.xn--p1ai') - res = (t.Email() >> (lambda m: m.groupdict()['domain'])).check('someone@example.net') - self.assertEqual(res, 'example.net') - res = extract_error(t.Email(), 'foo') + # res = (t.Email() >> (lambda m: m.groupdict()['domain'])).check('someone@example.net') + # self.assertEqual(res, 'example.net') + res = extract_error(t.Email, 'foo') self.assertEqual(res, 'value is not a valid email address') - res = extract_error(t.Email(), 'f' * 10000 + '@correct.domain.edu') + res = extract_error(t.Email, 'f' * 10000 + '@correct.domain.edu') self.assertEqual(res, 'value is not a valid email address') - res = extract_error(t.Email(), 'f' * 248 + '@x.edu') == 'f' * 248 + '@x.edu' + res = extract_error(t.Email, 'f' * 248 + '@x.edu') == 'f' * 248 + '@x.edu' self.assertEqual(res, True) - res = extract_error(t.Email(), 123) + res = extract_error(t.Email, 123) self.assertEqual(res, 'value is not a string') class TestEnumTrafaret(unittest.TestCase): def test_enum(self): - trafaret = t.Enum("foo", "bar", 1) >> ignore + trafaret = t.Enum("foo", "bar", 1) self.assertEqual(repr(trafaret), "") res = trafaret.check("foo") res = trafaret.check(1) @@ -476,7 +477,6 @@ def test_raise_error(self): self.assertEqual(res, 'other error') - class TestStrBoolTrafaret(unittest.TestCase): def test_str_bool(self): @@ -522,10 +522,6 @@ def test_string(self): self.assertEqual(res, '') res = extract_error(t.String(), 1) self.assertEqual(res, 'value is not a string') - res = t.String(regex='\w+').check('wqerwqer') - self.assertEqual(res, 'wqerwqer') - res = extract_error(t.String(regex='^\w+$'), 'wqe rwqer') - self.assertEqual(res, "value does not match pattern: '^\\\\w+$'") res = t.String(min_length=2, max_length=3).check('123') self.assertEqual(res, '123') res = extract_error(t.String(min_length=2, max_length=6), '1') @@ -611,22 +607,22 @@ class Type(type): class TestURLTrafaret(unittest.TestCase): def test_url(self): - res = t.URL().check('http://example.net/resource/?param=value#anchor') + res = t.URL.check('http://example.net/resource/?param=value#anchor') self.assertEqual(res, 'http://example.net/resource/?param=value#anchor') - res = str(t.URL().check('http://пример.рф/resource/?param=value#anchor')) + res = str(t.URL.check('http://пример.рф/resource/?param=value#anchor')) self.assertEqual(res, 'http://xn--e1afmkfd.xn--p1ai/resource/?param=value#anchor') - res = t.URL().check('http://example_underscore.net/resource/?param=value#anchor') + res = t.URL.check('http://example_underscore.net/resource/?param=value#anchor') self.assertEqual(res, 'http://example_underscore.net/resource/?param=value#anchor') - res = str(t.URL().check('http://user@example.net/resource/?param=value#anchor')) + res = str(t.URL.check('http://user@example.net/resource/?param=value#anchor')) self.assertEqual(res, 'http://user@example.net/resource/?param=value#anchor') - res = str(t.URL().check('http://user:@example.net/resource/?param=value#anchor')) + res = str(t.URL.check('http://user:@example.net/resource/?param=value#anchor')) self.assertEqual(res, 'http://user:@example.net/resource/?param=value#anchor') - res = str(t.URL().check('http://user:password@example.net/resource/?param=value#anchor')) + res = str(t.URL.check('http://user:password@example.net/resource/?param=value#anchor')) self.assertEqual(res, 'http://user:password@example.net/resource/?param=value#anchor') diff --git a/tests/test_context.py b/tests/test_context.py new file mode 100644 index 0000000..34b274d --- /dev/null +++ b/tests/test_context.py @@ -0,0 +1,39 @@ +import unittest +import trafaret as t + + +def check_context(value, context=None): + if value != context: + return t.DataError('have not context there') + return value + + +CONTEXT_TRAFARET = (t.String() | t.IntRaw()) & t.Any & check_context + + +class TestContext(unittest.TestCase): + def test_context(self): + self.assertEqual(CONTEXT_TRAFARET(123, context=123), 123) + + def test_dict_context(self): + trafaret = t.Dict({ + t.Key('b'): CONTEXT_TRAFARET, + }) + self.assertEqual(trafaret({'b': 123}, context=123), {'b': 123}) + + def test_list(self): + trafaret = t.List(CONTEXT_TRAFARET) + self.assertEqual(trafaret([123], context=123), [123]) + + def test_tuple(self): + trafaret = t.Tuple(CONTEXT_TRAFARET) + self.assertEqual(trafaret([123], context=123), (123,)) + + def test_mapping(self): + trafaret = t.Mapping(CONTEXT_TRAFARET, CONTEXT_TRAFARET) + self.assertEqual(trafaret({123: 123}, context=123), {123: 123}) + + def test_forward(self): + trafaret = t.Forward() + trafaret << t.List(CONTEXT_TRAFARET) + self.assertEqual(trafaret([123], context=123), [123]) diff --git a/tests/test_contrib.py b/tests/test_contrib.py new file mode 100644 index 0000000..2f8c8bf --- /dev/null +++ b/tests/test_contrib.py @@ -0,0 +1,11 @@ +import unittest +import datetime + +import trafaret as t +from trafaret.contrib.rfc_3339 import DateTime + + +class TestDateTime(unittest.TestCase): + def test_datetime(self): + check = DateTime() + assert check('2017-09-01 23:59') == datetime.datetime(2017, 9, 1, 23, 59) diff --git a/tests3k/test_async.py b/tests3k/test_async.py new file mode 100644 index 0000000..3b1ed1f --- /dev/null +++ b/tests3k/test_async.py @@ -0,0 +1,55 @@ +import asyncio +import unittest +import trafaret as t +from trafaret.lib import py3 + + +async def check_int(value): + return value + + +class TestContext(unittest.TestCase): + def setUp(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(None) + + def tearDown(self): + self.loop.close() + + def test_async_check(self): + trafaret = t.Int & int + res = self.loop.run_until_complete(trafaret.async_check('5')) + self.assertEqual(res, 5) + + def test_async_call(self): + trafaret = t.Int & int & check_int + res = self.loop.run_until_complete(trafaret.async_check('5')) + self.assertEqual(res, 5) + + def test_dict(self): + trafaret = t.Dict({ + t.Key('b'): t.Int & check_int, + }) + res = self.loop.run_until_complete(trafaret.async_check({'b': '5'})) + self.assertEqual(res, {'b': 5}) + + def test_list(self): + trafaret = t.List(t.Int & check_int) + res = self.loop.run_until_complete(trafaret.async_check(['5'])) + self.assertEqual(res, [5]) + + def test_tuple(self): + trafaret = t.Tuple(t.Null, t.Int & check_int) + res = self.loop.run_until_complete(trafaret.async_check([None, '5'])) + self.assertEqual(res, (None, 5)) + + def test_mapping(self): + trafaret = t.Mapping(t.String, t.Int & check_int) + res = self.loop.run_until_complete(trafaret.async_check({'a': '5'})) + self.assertEqual(res, {'a': 5}) + + def test_forward(self): + trafaret = t.Forward() + trafaret << t.List(t.Int & check_int) + res = self.loop.run_until_complete(trafaret.async_check(['5'])) + self.assertEqual(res, [5]) diff --git a/tox.ini b/tox.ini index c2b8b36..6bdc152 100644 --- a/tox.ini +++ b/tox.ini @@ -1,11 +1,24 @@ [tox] -envlist = py27,py36 +envlist = py27,py35,py36 whitelist_externals = {toxinidir}/utests.py [testenv] deps= unittest2 + flake8 + pylint pymongo -commands=python -m unittest discover {toxinidir}/tests + python-dateutil +commands= + python -m unittest discover {toxinidir}/tests +[testenv:py36] +commands= + python -m unittest discover {toxinidir}/tests + python -m unittest discover {toxinidir}/tests3k + flake8 trafaret + +[flake8] +exclude = .tox,*.egg,build +max-line-length = 120 diff --git a/trafaret/__init__.py b/trafaret/__init__.py index d531962..6db0b15 100644 --- a/trafaret/__init__.py +++ b/trafaret/__init__.py @@ -1,1620 +1,103 @@ -# -*- coding: utf-8 -*- - -import sys -import functools -import inspect -import re -import itertools -import numbers -import warnings -from collections import Mapping as AbcMapping -import types -from .lib import py3, py3metafix - - -__VERSION__ = (0, 10, 4) - - -# Python3 support -if py3: - import urllib.parse as urlparse - str_types = (str, bytes) - unicode = str -else: - try: - from future_builtins import map - except ImportError: - # Support for GAE runner - from itertools import imap as map - import urlparse - str_types = (basestring,) - -def _dd(value): - if not hasattr(value, 'items'): - return repr(value) - return r"{%s}" % ', '.join("%r: %s" % (x[0], _dd(x[1])) for x in sorted(value.items(), key=lambda x: x[0])) - - -def deprecated(message): - warnings.warn(message, DeprecationWarning) - - -""" -Trafaret is tiny library for data validation -It provides several primitives to validate complex data structures -Look at doctests for usage examples -""" +from .base import ( + DataError, + Trafaret, + Call, + Or, + And, + Forward, + + Any, + Null, + List, + Key, + Dict, + Enum, + Tuple, + + Atom, + String, + Float, + FloatRaw, + Int, + IntRaw, + Callable, + Bool, + Type, + Subclass, + Mapping, + StrBool, + DictKeys, + + guard, + + # utils + OnError, + ensure_trafaret, + extract_error, + ignore, + _dd, + catch, + catch_error, + str_types, +) +from .regexp import Regexp, RegexpRaw +from .internet import ( + URL, + Email, + IPv4, + IPv6, + IP, +) __all__ = ( - "DataError", "Trafaret", "Any", "Int", "String", - "List", "Dict", "Or", "And", "Null", "Float", "Enum", "Callable", - "Call", "Forward", "Bool", "Type", "Subclass", "Mapping", "guard", "Key", - "Tuple", "Atom", "Email", "URL", + "DataError", + "Trafaret", + "Call", + "Or", + "And", + "Forward", + + "Any", + "Null", + "List", + "Key", + "Dict", + "Enum", + "Tuple", + + "Atom", + "String", + "Float", + "FloatRaw", + "Int", + "IntRaw", + "Callable", + "Bool", + "Type", + "Subclass", + "Mapping", + "StrBool", + "DictKeys", + + "guard", + + "Regexp", + "RegexpRaw", + "URL", + "Email", + "IPv4", + "IPv6", + "IP", + + "OnError", + "ensure_trafaret", + "extract_error", + "ignore", + "_dd", + "catch", + "catch_error", + "str_types", ) -_empty = object() -MAX_EMAIL_LEN = 254 - - -class DataError(ValueError): - """ - Error with data preserve - error can be a message or None if error raised in childs - data can be anything - """ - __slots__ = ['error', 'name', 'value', 'trafaret'] - - def __init__(self, error=None, name=None, value=_empty, trafaret=None): - self.error = error - self.name = name - self.value = value - self.trafaret = trafaret - - def __str__(self): - return str(self.error) - - def __repr__(self): - return 'DataError(%s)' % str(self) - - def as_dict(self, value=False): - def as_dict(dataerror): - if not isinstance(dataerror.error, dict): - if value and dataerror.value != _empty: - return '%s, got %r' % (str(dataerror.error), dataerror.value) - else: - return str(dataerror.error) - return dict((k, v.as_dict(value=value) if isinstance(v, DataError) else v) - for k, v in dataerror.error.items()) - return as_dict(self) - - -class TrafaretMeta(type): - """ - Metaclass for trafarets to make using "|" operator possible not only - on instances but on classes - - >>> Int | String - , )> - >>> Int | String | Null - , , )> - >>> (Int >> (lambda v: v if v ** 2 > 15 else 0)).check(5) - 5 - """ - - def __or__(cls, other): - return cls() | other - - def __and__(cls, other): - return cls() & other - - def __rshift__(cls, other): - return cls() >> other - - -@py3metafix -class Trafaret(object): - """ - Base class for trafarets, provides only one method for - trafaret validation failure reporting - """ - - __metaclass__ = TrafaretMeta - - def check(self, value, convert=True): - """ - Common logic. In subclasses you need to implement check_value or - check_and_return. - """ - if hasattr(self, 'check_value'): - self.check_value(value) - return self.converter(value) if convert else value - if hasattr(self, 'check_and_return'): - res_value = self.check_and_return(value) - return self.converter(res_value) if convert else res_value - cls = "%s.%s" % (type(self).__module__, type(self).__name__) - raise NotImplementedError("You must implement check_value or" - " check_and_return methods '%s'" % cls) - - def converter(self, value): - """ - You can change converter with `>>` operator or append method - """ - return value - - def _failure(self, error=None, value=_empty): - """ - Shortcut method for raising validation error - """ - raise DataError(error=error, value=value, trafaret=self) - - @staticmethod - def _trafaret(trafaret): - """ - Helper for complex trafarets, takes trafaret instance or class - and returns trafaret instance - """ - return ensure_trafaret(trafaret) - - def append(self, converter): - """ - Appends new converter to list. - """ - return And(self, converter, disable_old_check_convert=True) - - def __or__(self, other): - return Or(self, other) - - def __and__(self, other): - return And(self, other) - - def __rshift__(self, other): - return And(self, other, disable_old_check_convert=True) - - def __call__(self, val): - return self.check(val) - - -def ensure_trafaret(trafaret): - """ - Helper for complex trafarets, takes trafaret instance or class - and returns trafaret instance - """ - if isinstance(trafaret, Trafaret) or inspect.isroutine(trafaret): - return trafaret - elif isinstance(trafaret, type): - if issubclass(trafaret, Trafaret): - return trafaret() - else: - return Type(trafaret) - else: - raise RuntimeError("%r should be instance or subclass" - " of Trafaret" % trafaret) - -class TypeMeta(TrafaretMeta): - - def __getitem__(self, type_): - return self(type_) - - -@py3metafix -class TypingTrafaret(Trafaret): - """A trafaret used for instance type and class inheritance checks.""" - - __metaclass__ = TypeMeta - - def __init__(self, type_): - self.type_ = type_ - - def check_value(self, value): - if not self.typing_checker(value, self.type_): - self._failure(self.failure_message % self.type_.__name__, value=value) - - def __repr__(self): - return "<%s(%s)>" % (self.__class__.__name__, self.type_.__name__) - - -class Subclass(TypingTrafaret): - """ - >>> Subclass(type) - - >>> Subclass[type] - - >>> s = Subclass[type] - >>> s.check(type) - - >>> extract_error(s, object) - 'value is not subclass of type' - """ - - typing_checker = issubclass - failure_message = "value is not subclass of %s" - - -class Type(TypingTrafaret): - """ - >>> Type(int) - - >>> Type[int] - - >>> c = Type[int] - >>> c.check(1) - 1 - >>> extract_error(c, "foo") - 'value is not int' - """ - - typing_checker = isinstance - failure_message = "value is not %s" - - -class Any(Trafaret): - """ - >>> Any() - - >>> (Any() >> ignore).check(object()) - """ - - def check_value(self, value): - pass - - def __repr__(self): - return "" - - -class OrMeta(TrafaretMeta): - """ - Allows to use "<<" operator on Or class - - >>> Or << Int << String - , )> - """ - - def __lshift__(cls, other): - return cls() << other - - -@py3metafix -class Or(Trafaret): - """ - >>> nullString = Or(String, Null) - >>> nullString - , )> - >>> nullString.check(None) - >>> nullString.check("test") - 'test' - >>> extract_error(nullString, 1) - {0: 'value is not a string', 1: 'value should be None'} - """ - - __metaclass__ = OrMeta - __slots__ = ['trafarets'] - - def __init__(self, *trafarets): - self.trafarets = list(map(ensure_trafaret, trafarets)) - - def check_and_return(self, value): - errors = [] - for trafaret in self.trafarets: - try: - return trafaret.check(value) - except DataError as e: - errors.append(e) - raise DataError(dict(enumerate(errors)), trafaret=self) - - def __lshift__(self, trafaret): - self.trafarets.append(ensure_trafaret(trafaret)) - return self - - def __or__(self, trafaret): - self << trafaret - return self - - def __repr__(self): - return "" % (", ".join(map(repr, self.trafarets))) - - -class And(Trafaret): - """ - Will work over trafarets sequentially - """ - __slots__ = ('trafaret', 'other', 'disable_old_check_convert') - - def __init__(self, trafaret, other, disable_old_check_convert=False): - self.trafaret = trafaret - self.other = other - self.disable_old_check_convert = disable_old_check_convert - - def check_and_return(self, value): - if isinstance(self.trafaret, Trafaret) and self.disable_old_check_convert: - res = self.trafaret.check(value, convert=False) - else: - res = self.trafaret(value) - if isinstance(res, DataError): - raise DataError - res = self.other(res) - if isinstance(res, DataError): - raise res - return res - - # support old code for some deprecation period - def allow_extra(self, *names, **kw): - deprecated('Call allow_extra after >> or & operations is deprecated') - self.trafaret = self.trafaret.allow_extra(*names, **kw) - return self - - def ignore_extra(self, *names): - deprecated('Call ignore_extra after >> or & operations is deprecated') - self.trafaret = self.trafaret.ignore_extra(*names) - return self - - def __repr__(self): - return repr(self.trafaret) - - - -class Null(Trafaret): - """ - >>> Null() - - >>> Null().check(None) - >>> extract_error(Null(), 1) - 'value should be None' - """ - - def check_value(self, value): - if value is not None: - self._failure("value should be None", value=value) - - def __repr__(self): - return "" - - -class Bool(Trafaret): - """ - >>> Bool() - - >>> Bool().check(True) - True - >>> Bool().check(False) - False - >>> extract_error(Bool(), 1) - 'value should be True or False' - """ - - def check_value(self, value): - if not isinstance(value, bool): - self._failure("value should be True or False", value=value) - - def __repr__(self): - return "" - - -class StrBool(Trafaret): - """ - >>> extract_error(StrBool(), 'aloha') - "value can't be converted to Bool" - >>> StrBool().check(1) - True - >>> StrBool().check(0) - False - >>> StrBool().check('y') - True - >>> StrBool().check('n') - False - >>> StrBool().check(None) - False - >>> StrBool().check('1') - True - >>> StrBool().check('0') - False - >>> StrBool().check('YeS') - True - >>> StrBool().check('No') - False - >>> StrBool().check(True) - True - >>> StrBool().check(False) - False - """ - - convertable = ('t', 'true', 'false', 'y', 'n', 'yes', 'no', 'on', - '1', '0', 'none') - - def check_value(self, value): - _value = str(value).strip().lower() - if _value not in self.convertable: - self._failure("value can't be converted to Bool", value=value) - - def converter(self, value): - if value is None: - return False - _str = str(value).strip().lower() - - return _str in ('t', 'true', 'y', 'yes', 'on', '1') - - def __repr__(self): - return "" - - -class NumberMeta(TrafaretMeta): - """ - Allows slicing syntax for min and max arguments for - number trafarets - - >>> Int[1:] - - >>> Int[1:10] - - >>> Int[:10] - - >>> Float[1:] - - >>> Int > 3 - - >>> 1 < (Float < 10) - - >>> (Int > 5).check(10) - 10 - >>> extract_error(Int > 5, 1) - 'value should be greater than 5' - >>> (Int < 3).check(1) - 1 - >>> extract_error(Int < 3, 3) - 'value should be less than 3' - """ - - def __getitem__(cls, slice_): - return cls(gte=slice_.start, lte=slice_.stop) - - def __lt__(cls, lt): - return cls(lt=lt) - - def __gt__(cls, gt): - return cls(gt=gt) - - -@py3metafix -class FloatRaw(Trafaret): - """ - Tests that value is a float or a string that is convertable to float. - - >>> Float() - - >>> Float(gte=1) - - >>> Float(lte=10) - - >>> Float(gte=1, lte=10) - - >>> Float().check(1.0) - 1.0 - >>> extract_error(Float(), 1 + 3j) - 'value is not float' - >>> extract_error(Float(), 1) - 1.0 - >>> Float(gte=2).check(3.0) - 3.0 - >>> extract_error(Float(gte=2), 1.0) - 'value is less than 2' - >>> Float(lte=10).check(5.0) - 5.0 - >>> extract_error(Float(lte=3), 5.0) - 'value is greater than 3' - >>> Float().check("5.0") - 5.0 - """ - - __metaclass__ = NumberMeta - - convertable = str_types + (numbers.Real,) - value_type = float - - def __init__(self, gte=None, lte=None, gt=None, lt=None): - self.gte = gte - self.lte = lte - self.gt = gt - self.lt = lt - - def _converter(self, value): - if not isinstance(value, self.convertable): - self._failure('value is not %s' % self.value_type.__name__, value=value) - try: - return self.value_type(value) - except ValueError: - self._failure( - "value can't be converted to %s" % self.value_type.__name__, - value=value - ) - - def _check(self, data): - if not isinstance(data, self.value_type): - value = self._converter(data) - else: - value = data - if self.gte is not None and value < self.gte: - self._failure("value is less than %s" % self.gte, value=data) - if self.lte is not None and value > self.lte: - self._failure("value is greater than %s" % self.lte, value=data) - if self.lt is not None and value >= self.lt: - self._failure("value should be less than %s" % self.lt, value=data) - if self.gt is not None and value <= self.gt: - self._failure("value should be greater than %s" % self.gt, value=data) - return value - - def check_and_return(self, data): - self._check(data) - return data - - def __lt__(self, lt): - return type(self)(gte=self.gte, lte=self.lte, gt=self.gt, lt=lt) - - def __gt__(self, gt): - return type(self)(gte=self.gte, lte=self.lte, gt=gt, lt=self.lt) - - def __repr__(self): - r = "<%s" % type(self).__name__ - options = [] - for param in ("gte", "lte", "gt", "lt"): - if getattr(self, param) is not None: - options.append("%s=%s" % (param, getattr(self, param))) - if options: - r += "(%s)" % (", ".join(options)) - r += ">" - return r - - -class Float(FloatRaw): - """Checks that value is a float. - Or if value is a string converts this string to float - """ - def check_and_return(self, data): - return self._check(data) - - -class IntRaw(FloatRaw): - """ - >>> Int() - - >>> Int().check(5) - 5 - >>> extract_error(Int(), 1.1) - 'value is not int' - >>> extract_error(Int(), 1 + 1j) - 'value is not int' - """ - - value_type = int - - def _converter(self, value): - if isinstance(value, float): - if not value.is_integer(): - self._failure('value is not int', value=value) - return super(IntRaw, self)._converter(value) - - -class Int(IntRaw): - def check_and_return(self, data): - return self._check(data) - - -class Atom(Trafaret): - """ - >>> Atom('atom').check('atom') - 'atom' - >>> extract_error(Atom('atom'), 'molecule') - "value is not exactly 'atom'" - """ - __slots__ = ['value'] - - def __init__(self, value): - self.value = value - - def check_value(self, value): - if self.value != value: - self._failure("value is not exactly '%s'" % self.value, value=value) - - -class RegexpRaw(Trafaret): - """ - Check if given string match given regexp - """ - __slots__ = ('regexp', 'raw_regexp') - - def __init__(self, regexp): - self.regexp = re.compile(regexp) if isinstance(regexp, str_types) else regexp - self.raw_regexp = self.regexp.pattern if self.regexp else None - - def check_and_return(self, value): - if not isinstance(value, str_types): - self._failure("value is not a string", value=value) - match = self.regexp.match(value) - if not match: - self._failure('does not match pattern %s' % self.raw_regexp, value=value) - return match - - def __repr__(self): - return '' - - -class Regexp(RegexpRaw): - def check_and_return(self, value): - return super(Regexp, self).check_and_return(value).group() - - -class String(Trafaret): - """ - >>> String() - - >>> String(allow_blank=True) - - >>> String().check("foo") - 'foo' - >>> extract_error(String(), "") - 'blank value is not allowed' - >>> String(allow_blank=True).check("") - '' - >>> extract_error(String(), 1) - 'value is not a string' - >>> String(regex='\w+').check('wqerwqer') - 'wqerwqer' - >>> String(allow_blank=True, regex='\w+').check('') - '' - >>> extract_error(String(regex='^\w+$'), 'wqe rwqer') - "value does not match pattern: '^\\\\\\\\w+$'" - >>> String(min_length=2, max_length=3).check('123') - '123' - >>> extract_error(String(min_length=2, max_length=6), '1') - 'String is shorter than 2 characters' - >>> extract_error(String(min_length=2, max_length=6), '1234567') - 'String is longer than 6 characters' - >>> String(min_length=2, max_length=6, allow_blank=True) - Traceback (most recent call last): - ... - AssertionError: Either allow_blank or min_length should be specified, not both - >>> String(min_length=0, max_length=6, allow_blank=True).check('123') - '123' - """ - - def __init__(self, allow_blank=False, regex=None, min_length=None, max_length=None): - assert not (allow_blank and min_length), \ - "Either allow_blank or min_length should be specified, not both" - self.allow_blank = allow_blank - self.regex = re.compile(regex) if isinstance(regex, str_types) else regex - if self.regex is not None: - deprecated('Deprecated, use Regexp or RegexpRaw instead') - self.min_length = min_length - self.max_length = max_length - self._raw_regex = self.regex.pattern if self.regex else None - - def check_and_return(self, value): - if not isinstance(value, str_types): - self._failure("value is not a string", value=value) - if not self.allow_blank and len(value) == 0: - self._failure("blank value is not allowed", value=value) - elif self.allow_blank and len(value) == 0: - return value - if self.min_length is not None and len(value) < self.min_length: - self._failure('String is shorter than %s characters' % self.min_length, value=value) - if self.max_length is not None and len(value) > self.max_length: - self._failure('String is longer than %s characters' % self.max_length, value=value) - if self.regex is not None: - match = self.regex.match(value) - if not match: - self._failure("value does not match pattern: %s" % repr(self._raw_regex), value=value) - return match - return value - - def converter(self, value): - if isinstance(value, str_types): - return value - return value.group() - - def __repr__(self): - return "" if self.allow_blank else "" - - -class Email(String): - """ - >>> Email().check('someone@example.net') - 'someone@example.net' - >>> extract_error(Email(),'someone@example') # try without domain-part - 'value is not a valid email address' - >>> str(Email().check('someone@пример.рф')) # try with `idna` encoding - 'someone@xn--e1afmkfd.xn--p1ai' - >>> (Email() >> (lambda m: m.groupdict()['domain'])).check('someone@example.net') - 'example.net' - >>> extract_error(Email(), 'foo') - 'value is not a valid email address' - >>> extract_error(Email(), 'f' * 10000 + '@correct.domain.edu') - 'value is not a valid email address' - >>> extract_error(Email(), 'f' * 248 + '@x.edu') == 'f' * 248 + '@x.edu' - True - """ - - regex = re.compile( - r"(?P^[-!#$%&'*+/=?^_`{}|~0-9A-Z]+(\.[-!#$%&'*+/=?^_`{}|~0-9A-Z]+)*" # dot-atom - r'|^"([\001-\010\013\014\016-\037!#-\[\]-\177]|\\[\001-011\013\014\016-\177])*"' # quoted-string - r')@(?P(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)$)' # domain - r'|\[(25[0-5]|2[0-4]\d|[0-1]?\d?\d)(\.(25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}\]$', re.IGNORECASE) # literal form, ipv4 address (SMTP 4.1.3) - - def __init__(self, allow_blank=False): - super(Email, self).__init__(allow_blank=allow_blank, - regex=self.regex, - max_length=MAX_EMAIL_LEN) - - def check_and_return(self, value): - try: - return super(Email, self).check_and_return(value) - except DataError: - if value and isinstance(value, str_types): - if isinstance(value, bytes): - decoded = value.decode('utf-8') - else: - decoded = value - else: - raise - # Trivial case failed. Try for possible IDN domain-part - if decoded and '@' in decoded: - parts = decoded.split('@') - try: - parts[-1] = parts[-1].encode('idna').decode('ascii') - except UnicodeError: - pass - else: - try: - return super(Email, self).check_and_return('@'.join(parts)) - except DataError: - # Will fail with main error - pass - self._failure('value is not a valid email address', value=value) - - def __repr__(self): - return '' - - -class URL(String): - """ - >>> URL().check('http://example.net/resource/?param=value#anchor') - 'http://example.net/resource/?param=value#anchor' - >>> str(URL().check('http://пример.рф/resource/?param=value#anchor')) - 'http://xn--e1afmkfd.xn--p1ai/resource/?param=value#anchor' - """ - - regex = re.compile( - r'^(?:http|ftp)s?://' # http:// or https:// - r'(?:\S+(?::\S*)?@)?' # user and password - r'(?:(?:[A-Z0-9](?:[A-Z0-9-_]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... - r'localhost|' # localhost... - r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip - r'(?::\d+)?' # optional port - r'(?:/?|[/?]\S+)$', re.IGNORECASE) - min_length = None - max_length = None - - def __init__(self, allow_blank=False): - super(URL, self).__init__(allow_blank=allow_blank, regex=self.regex) - - def check_and_return(self, value): - try: - return super(URL, self).check_and_return(value) - except DataError: - # Trivial case failed. Try for possible IDN domain-part - if value: - if isinstance(value, bytes): - decoded = value.decode('utf-8') - else: - decoded = value - scheme, netloc, path, query, fragment = urlparse.urlsplit(decoded) - try: - netloc = netloc.encode('idna').decode('ascii') # IDN -> ACE - except UnicodeError: # invalid domain part - pass - else: - url = urlparse.urlunsplit((scheme, netloc, path, query, fragment)) - try: - return super(URL, self).check_and_return(url) - except DataError: - # Will fail with main error - pass - self._failure('value is not URL', value=value) - - def __repr__(self): - return '' - - -class IPv4(Regexp): - """ - >>> IPv4().check('127.0.0.1') - '127.0.0.1' - """ - - regex = re.compile( - r'^((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])$', # noqa - ) - - def __init__(self): - super(IPv4, self).__init__(self.regex) - - def check_and_return(self, value): - try: - return super(IPv4, self).check_and_return(value) - except DataError: - self._failure('value is not IPv4 address') - - def __repr__(self): - return '' - - -class IPv6(Regexp): - """ - >>> IPv6().check('2001:0db8:0000:0042:0000:8a2e:0370:7334') - '2001:0db8:0000:0042:0000:8a2e:0370:7334' - """ - - regex = re.compile( - r'^(' - r'(::)|' - r'(::[0-9a-f]{1,4})|' - r'([0-9a-f]{1,4}:){7,7}[0-9a-f]{1,4}|' - r'([0-9a-f]{1,4}:){1,7}:|' - r'([0-9a-f]{1,4}:){1,6}:[0-9a-f]{1,4}|' - r'([0-9a-f]{1,4}:){1,5}(:[0-9a-f]{1,4}){1,2}|' - r'([0-9a-f]{1,4}:){1,4}(:[0-9a-f]{1,4}){1,3}|' - r'([0-9a-f]{1,4}:){1,3}(:[0-9a-f]{1,4}){1,4}|' - r'([0-9a-f]{1,4}:){1,2}(:[0-9a-f]{1,4}){1,5}|' - r'[0-9a-f]{1,4}:((:[0-9a-f]{1,4}){1,6})|' - r':((:[0-9a-f]{1,4}){1,7}:)|' - r'fe80:(:[0-9a-f]{0,4}){0,4}%[0-9a-z]{1,}|' - r'::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|' # noqa - r'([0-9a-f]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])' # noqa - r')$', - re.IGNORECASE, - ) - - def __init__(self): - super(IPv6, self).__init__(self.regex) - - def check_and_return(self, value): - try: - return super(IPv6, self).check_and_return(value) - except DataError: - self._failure('value is not IPv6 address') - - def __repr__(self): - return '' - - -class SquareBracketsMeta(TrafaretMeta): - """ - Allows usage of square brackets for List initialization - - >>> List[Int] - )> - >>> List[Int, 1:] - )> - >>> List[:10, Int] - )> - >>> List[1:10] - Traceback (most recent call last): - ... - RuntimeError: Trafaret is required for List initialization - """ - - def __getitem__(self, args): - slice_ = None - trafaret = None - if not isinstance(args, tuple): - args = (args, ) - for arg in args: - if isinstance(arg, slice): - slice_ = arg - elif isinstance(arg, Trafaret) or issubclass(arg, Trafaret) \ - or isinstance(arg, type): - trafaret = arg - if not trafaret: - raise RuntimeError("Trafaret is required for List initialization") - if slice_: - return self(trafaret, min_length=slice_.start or 0, - max_length=slice_.stop) - return self(trafaret) - - -@py3metafix -class List(Trafaret): - """ - >>> List(Int) - )> - >>> List(Int, min_length=1) - )> - >>> List(Int, min_length=1, max_length=10) - )> - >>> extract_error(List(Int), 1) - 'value is not a list' - >>> List(Int).check([1, 2, 3]) - [1, 2, 3] - >>> List(String).check(["foo", "bar", "spam"]) - ['foo', 'bar', 'spam'] - >>> extract_error(List(Int), [1, 2, 1 + 3j]) - {2: 'value is not int'} - >>> List(Int, min_length=1).check([1, 2, 3]) - [1, 2, 3] - >>> extract_error(List(Int, min_length=1), []) - 'list length is less than 1' - >>> List(Int, max_length=2).check([1, 2]) - [1, 2] - >>> extract_error(List(Int, max_length=2), [1, 2, 3]) - 'list length is greater than 2' - >>> extract_error(List(Int), ["a"]) - {0: "value can't be converted to int"} - """ - - __metaclass__ = SquareBracketsMeta - __slots__ = ['trafaret', 'min_length', 'max_length'] - - def __init__(self, trafaret, min_length=0, max_length=None): - self.trafaret = ensure_trafaret(trafaret) - self.min_length = min_length - self.max_length = max_length - - def check_and_return(self, value): - if not isinstance(value, list): - self._failure("value is not a list", value=value) - if len(value) < self.min_length: - self._failure("list length is less than %s" % self.min_length, value=value) - if self.max_length is not None and len(value) > self.max_length: - self._failure("list length is greater than %s" % self.max_length, value=value) - lst = [] - errors = {} - for index, item in enumerate(value): - try: - lst.append(self.trafaret.check(item)) - except DataError as err: - errors[index] = err - if errors: - raise DataError(error=errors, trafaret=self) - return lst - - def __repr__(self): - r = ">> t = Tuple(Int, Int, String) - >>> t.check([3, 4, '5']) - (3, 4, '5') - >>> extract_error(t, [3, 4, 5]) - {2: 'value is not a string'} - >>> t - , , )> - """ - __slots__ = ['trafarets', 'length'] - - def __init__(self, *args): - self.trafarets = list(map(ensure_trafaret, args)) - self.length = len(self.trafarets) - - def check_and_return(self, value): - try: - value = tuple(value) - except TypeError: - self._failure('value must be convertable to tuple', value=value) - if len(value) != self.length: - self._failure('value must contain %s items' % self.length, value=value) - result = [] - errors = {} - for idx, (item, trafaret) in enumerate(zip(value, self.trafarets)): - try: - result.append(trafaret.check(item)) - except DataError as err: - errors[idx] = err - if errors: - self._failure(errors, value=value) - return tuple(result) - - def __repr__(self): - return '' - - -class Key(object): - """ - Helper class for Dict. - - It gets ``name``, and provides method ``extract(data)`` that extract key value - from data through mapping ``get`` method. - Key `__call__` method yields ``(key name, Maybe(DataError), [touched keys])`` triples. - - You can redefine ``get_data(data, default)`` method in subclassed ``Key`` if you want to use something other - then ``.get(...)`` method. - - Like this for the aiohttp MultiDict:: - - class MDKey(t.Key): - def get_data(data, default): - return data.get_all(self.name, default) - """ - __slots__ = ['name', 'to_name', 'default', 'optional', 'trafaret'] - - def __init__(self, name, default=_empty, optional=False, to_name=None, trafaret=None): - self.name = name - self.to_name = to_name - self.default = default - self.optional = optional - self.trafaret = trafaret or Any() - - def __call__(self, data): - if self.name in data or self.default is not _empty: - if callable(self.default): - default = self.default() - else: - default = self.default - yield ( - self.get_name(), - catch_error(self.trafaret, self.get_data(data, default)), - (self.name,) - ) - return - - if not self.optional: - yield self.name, DataError(error='is required'), (self.name,) - - def get_data(self, data, default): - return data.get(self.name, default) - - def keys_names(self): - yield self.name - - def set_trafaret(self, trafaret): - self.trafaret = ensure_trafaret(trafaret) - return self - - def __rshift__(self, name): - self.to_name = name - return self - - def get_name(self): - return self.to_name or self.name - - def make_optional(self): - self.optional = True - - def __repr__(self): - return '<%s "%s"%s>' % (self.__class__.__name__, self.name, - ' to "%s"' % self.to_name if getattr(self, 'to_name', False) else '') - - -class Dict(Trafaret): - """ - >>> trafaret = Dict(foo=Int, bar=String) >> ignore - >>> trafaret.check({"foo": 1, "bar": "spam"}) - >>> extract_error(trafaret, {"foo": 1, "bar": 2}) - {'bar': 'value is not a string'} - >>> extract_error(trafaret, {"foo": 1}) - {'bar': 'is required'} - >>> extract_error(trafaret, {"foo": 1, "bar": "spam", "eggs": None}) - {'eggs': 'eggs is not allowed key'} - >>> trafaret.allow_extra("eggs") - , foo=)> - >>> trafaret.check({"foo": 1, "bar": "spam", "eggs": None}) - >>> trafaret.check({"foo": 1, "bar": "spam"}) - >>> extract_error(trafaret, {"foo": 1, "bar": "spam", "ham": 100}) - {'ham': 'ham is not allowed key'} - >>> trafaret.allow_extra("*") - , foo=)> - >>> trafaret.check({"foo": 1, "bar": "spam", "ham": 100}) - >>> trafaret.check({"foo": 1, "bar": "spam", "ham": 100, "baz": None}) - >>> extract_error(trafaret, {"foo": 1, "ham": 100, "baz": None}) - {'bar': 'is required'} - >>> trafaret = Dict({Key('bar', optional=True): String}, foo=Int) - >>> trafaret.allow_extra("*") - , foo=)> - >>> _dd(trafaret.check({"foo": 1, "ham": 100, "baz": None})) - "{'baz': None, 'foo': 1, 'ham': 100}" - >>> _dd(extract_error(trafaret, {"bar": 1, "ham": 100, "baz": None})) - "{'bar': 'value is not a string', 'foo': 'is required'}" - >>> extract_error(trafaret, {"foo": 1, "bar": 1, "ham": 100, "baz": None}) - {'bar': 'value is not a string'} - >>> trafaret = Dict({Key('bar', default='nyanya') >> 'baz': String}, foo=Int) - >>> _dd(trafaret.check({'foo': 4})) - "{'baz': 'nyanya', 'foo': 4}" - >>> _ = trafaret.ignore_extra('fooz') - >>> _dd(trafaret.check({'foo': 4, 'fooz': 5})) - "{'baz': 'nyanya', 'foo': 4}" - >>> _ = trafaret.ignore_extra('*') - >>> _dd(trafaret.check({'foo': 4, 'foor': 5})) - "{'baz': 'nyanya', 'foo': 4}" - """ - __slots__ = ['extras', 'extras_trafaret', 'allow_any', 'ignore', 'ignore_any', 'keys'] - - def __init__(self, *args, **trafarets): - if args and isinstance(args[0], AbcMapping): - keys = args[0] - args = args[1:] - else: - keys = {} - if any(not callable(key) for key in args): - raise RuntimeError('Keys in single attributes must be callables') - - # extra - allow_extra = trafarets.pop('allow_extra', []) - allow_extra_trafaret = trafarets.pop('allow_extra_trafaret', Any) - self.extras_trafaret = ensure_trafaret(allow_extra_trafaret) - self.allow_any = '*' in allow_extra - self.extras = [name for name in allow_extra if name != '*'] - # ignore - ignore_extra = trafarets.pop('ignore_extra', []) - self.ignore_any = '*' in ignore_extra - self.ignore = [name for name in ignore_extra if name != '*'] - - self.keys = list(args) - for key, trafaret in itertools.chain(trafarets.items(), keys.items()): - key_ = Key(key) if isinstance(key, str_types) else key - key_.set_trafaret(ensure_trafaret(trafaret)) - self.keys.append(key_) - - def allow_extra(self, *names, **kw): - trafaret = kw.get('trafaret', Any) - for name in names: - if name == "*": - self.allow_any = True - else: - self.extras.append(name) - self.extras_trafaret = ensure_trafaret(trafaret) - return self - - def ignore_extra(self, *names): - for name in names: - if name == "*": - self.ignore_any = True - else: - self.ignore.append(name) - return self - - def make_optional(self, *args): - for key in self.keys: - if key.name in args or '*' in args: - key.make_optional() - return self - - def check_and_return(self, value): - if not isinstance(value, AbcMapping): - self._failure("value is not a dict", value=value) - collect = {} - errors = {} - touched_names = [] - for key in self.keys: - if callable(key): - for k, v, names in key(value): - if isinstance(v, DataError): - errors[k] = v - else: - collect[k] = v - touched_names.extend(names) - else: - deprecated('Old pop based Keys subclasses deprecated. See README') - value_keys = set(value.keys()) - for k, v in key.pop(value): - if isinstance(v, DataError): - errors[k] = v - else: - collect[k] = v - touched_names.extend(value_keys - set(value.keys())) - - if not self.ignore_any: - for key in value: - if key in touched_names: - continue - if key in self.ignore: - continue - if not self.allow_any and key not in self.extras: - errors[key] = DataError("%s is not allowed key" % key) - elif key in collect: - errors[key] = DataError("%s key was shadowed" % key) - else: - try: - collect[key] = self.extras_trafaret(value[key]) - except DataError as de: - errors[key] = de - if errors: - raise DataError(error=errors, trafaret=self) - return collect - - def keys_names(self): - for key in self.keys: - for k in key.keys_names(): - yield k - - def __repr__(self): - r = ">> _dd(DictKeys(['a','b']).check({'a':1,'b':2,})) - "{'a': 1, 'b': 2}" - >>> extract_error(DictKeys(['a','b']), {'a':1,'b':2,'c':3,}) - {'c': 'c is not allowed key'} - >>> extract_error(DictKeys(['key','key2']), {'key':'val'}) - {'key2': 'is required'} - """ - def MissingKey(val): - raise DataError('%s is not in Dict' % val) - - req = [(Key(key), Any) for key in keys] - return Dict(dict(req)) - - -class Mapping(Trafaret): - """ - Mapping gets two trafarets as arguments, one for key and one for value, - like `Mapping(t.Int, t.List(t.Str))`. - """ - __slots__ = ['key', 'value'] - - def __init__(self, key, value): - self.key = ensure_trafaret(key) - self.value = ensure_trafaret(value) - - def check_and_return(self, mapping): - if not isinstance(mapping, dict): - self._failure("value is not a dict", value=mapping) - checked_mapping = {} - errors = {} - for key, value in mapping.items(): - pair_errors = {} - try: - checked_key = self.key.check(key) - except DataError as err: - pair_errors['key'] = err - try: - checked_value = self.value.check(value) - except DataError as err: - pair_errors['value'] = err - if pair_errors: - errors[key] = DataError(error=pair_errors) - else: - checked_mapping[checked_key] = checked_value - if errors: - raise DataError(error=errors, trafaret=self) - return checked_mapping - - def __repr__(self): - return " %r)>" % (self.key, self.value) - - -class Enum(Trafaret): - """ - >>> trafaret = Enum("foo", "bar", 1) >> ignore - >>> trafaret - - >>> trafaret.check("foo") - >>> trafaret.check(1) - >>> extract_error(trafaret, 2) - "value doesn't match any variant" - """ - __slots__ = ['variants'] - - def __init__(self, *variants): - self.variants = variants[:] - - def check_value(self, value): - if value not in self.variants: - self._failure("value doesn't match any variant", value=value) - - def __repr__(self): - return "" % (", ".join(map(repr, self.variants))) - - -class Callable(Trafaret): - """ - >>> (Callable() >> ignore).check(lambda: 1) - >>> extract_error(Callable(), 1) - 'value is not callable' - """ - - def check_value(self, value): - if not callable(value): - self._failure("value is not callable", value=value) - - def __repr__(self): - return "" - - -class Call(Trafaret): - """ - >>> def validator(value): - ... if value != "foo": - ... return DataError("I want only foo!") - ... return 'foo' - ... - >>> trafaret = Call(validator) - >>> trafaret - - >>> trafaret.check("foo") - 'foo' - >>> extract_error(trafaret, "bar") - 'I want only foo!' - """ - __slots__ = ['fn'] - - def __init__(self, fn): - if not callable(fn): - raise RuntimeError("Call argument should be callable") - if py3: - argspec = inspect.getfullargspec(fn) - else: - argspec = inspect.getargspec(fn) - if len(argspec.args) - len(argspec.defaults or []) > 1: - raise RuntimeError("Call argument should be" - " one argument function") - self.fn = fn - - def check_and_return(self, value): - res = self.fn(value) - if isinstance(res, DataError): - raise res - else: - return res - - def __repr__(self): - return "" % self.fn.__name__ - - -class Forward(Trafaret): - """ - >>> node = Forward() - >>> node << Dict(name=String, children=List[node]) - >>> node - )>, name=)>)> - >>> node.check({"name": "foo", "children": []}) == {'children': [], 'name': 'foo'} - True - >>> extract_error(node, {"name": "foo", "children": [1]}) - {'children': {0: 'value is not a dict'}} - >>> node.check({"name": "foo", "children": [ \ - {"name": "bar", "children": []} \ - ]}) == {'children': [{'children': [], 'name': 'bar'}], 'name': 'foo'} - True - >>> empty_node = Forward() - >>> empty_node - - >>> extract_error(empty_node, 'something') - 'trafaret not set yet' - """ - - def __init__(self): - self.trafaret = None - self._recur_repr = False - - def __lshift__(self, trafaret): - self.provide(trafaret) - - def provide(self, trafaret): - if self.trafaret: - raise RuntimeError("trafaret for Forward is already specified") - self.trafaret = ensure_trafaret(trafaret) - - def check_and_return(self, value): - if self.trafaret is None: - self._failure('trafaret not set yet', value=value) - return self.trafaret.check(value) - - def __repr__(self): - # XXX not threadsafe - if self._recur_repr: - return "" - self._recur_repr = True - r = "" % self.trafaret - self._recur_repr = False - return r - - -class GuardError(DataError): - """ - Raised when guarded function gets invalid arguments, - inherits error message from corresponding DataError - """ - - pass - - -def guard(trafaret=None, **kwargs): - """ - Decorator for protecting function with trafarets - - >>> @guard(a=String, b=Int, c=String) - ... def fn(a, b, c="default"): - ... '''docstring''' - ... return (a, b, c) - ... - >>> fn.__module__ = None - >>> help(fn) - Help on function fn: - - fn(*args, **kwargs) - guarded with , b=, c=)> - - docstring - - >>> fn("foo", 1) - ('foo', 1, 'default') - >>> extract_error(fn, "foo", 1, 2) - {'c': 'value is not a string'} - >>> extract_error(fn, "foo") - {'b': 'is required'} - >>> g = guard(Dict()) - >>> c = Forward() - >>> c << Dict(name=str, children=List[c]) - >>> g = guard(c) - >>> g = guard(Int()) - Traceback (most recent call last): - ... - RuntimeError: trafaret should be instance of Dict or Forward - """ - if trafaret and not isinstance(trafaret, Dict) and \ - not isinstance(trafaret, Forward): - raise RuntimeError("trafaret should be instance of Dict or Forward") - elif trafaret and kwargs: - raise RuntimeError("choose one way of initialization," - " trafaret or kwargs") - if not trafaret: - trafaret = Dict(**kwargs) - - def wrapper(fn): - if py3: - argspec = inspect.getfullargspec(fn) - else: - argspec = inspect.getargspec(fn) - - @functools.wraps(fn) - def decor(*args, **kwargs): - fnargs = argspec.args - if fnargs[0] in ['self', 'cls']: - obj = args[0] - fnargs = fnargs[1:] - checkargs = args[1:] - else: - obj = None - checkargs = args - - try: - call_args = dict( - itertools.chain(zip(fnargs, checkargs), kwargs.items()) - ) - for name, default in zip(reversed(fnargs), - reversed(argspec.defaults or ())): - if name not in call_args: - call_args[name] = default - converted = trafaret.check(call_args) - except DataError as err: - raise GuardError(error=err.error) - return fn(obj, **converted) if obj else fn(**converted) - decor.__doc__ = "guarded with %r\n\n" % trafaret + (decor.__doc__ or "") - return decor - return wrapper - - -def ignore(val): - """ - Stub to ignore value from trafaret - Use it like: - - >>> a = Int >> ignore - >>> a.check(7) - """ - pass - - -def catch(checker, *a, **kw): - """ - Helper for tests - catch error and return it as dict - """ - - try: - if hasattr(checker, 'check'): - return checker.check(*a, **kw) - elif callable(checker): - return checker(*a, **kw) - except DataError as error: - return error - -catch_error = catch - - -def extract_error(checker, *a, **kw): - """ - Helper for tests - catch error and return it as dict - """ - - res = catch_error(checker, *a, **kw) - if isinstance(res, DataError): - return res.as_dict() - return res - - -class MissingContribModuleStub(types.ModuleType): - """ - Preserves initial exception to be raised on module access - """ - - def __init__(self, entrypoint, orig): - self.orig = orig - self.entrypoint = entrypoint - - def __getattr__(self, item): - raise self.orig - - def __call__(self, *args, **kwargs): - raise self.orig - @property - def __name__(self): - return self.entrypoint.name.lstrip('.') +__VERSION__ = (0, 11, 1) diff --git a/trafaret/async.py b/trafaret/async.py new file mode 100644 index 0000000..71cddd5 --- /dev/null +++ b/trafaret/async.py @@ -0,0 +1,184 @@ +import inspect +from collections import Mapping as AbcMapping +from .dataerror import DataError +from .lib import ( + call_with_context_if_support, + _empty, +) + + +class TrafaretAsyncMixin: + async def async_check(self, value, context=None): + if hasattr(self, 'async_transform'): + return (await self.async_transform(value, context=context)) + return self.check(value, context=context) + + +class OrAsyncMixin: + async def async_transform(self, value, context=None): + errors = [] + for trafaret in self.trafarets: + try: + return (await trafaret.async_check(value, context=context)) + except DataError as e: + errors.append(e) + raise DataError(dict(enumerate(errors)), trafaret=self) + + +class AndAsyncMixin: + async def async_transform(self, value, context=None): + res = await self.trafaret.async_check(value, context=context) + if isinstance(res, DataError): + raise DataError + res = await self.other.async_check(res, context=context) + if isinstance(res, DataError): + raise res + return res + + +class ListAsyncMixin: + async def async_transform(self, value, context=None): + self.check_common(value) + lst = [] + errors = {} + for index, item in enumerate(value): + try: + lst.append(await self.trafaret.async_check(item, context=context)) + except DataError as err: + errors[index] = err + if errors: + raise DataError(error=errors, trafaret=self) + return lst + + +class TupleAsyncMixin: + async def async_transform(self, value, context=None): + self.check_common(value) + result = [] + errors = {} + for idx, (item, trafaret) in enumerate(zip(value, self.trafarets)): + try: + result.append(await trafaret.async_check(item, context=context)) + except DataError as err: + errors[idx] = err + if errors: + self._failure(errors, value=value) + return tuple(result) + + +class MappingAsyncMixin: + async def async_transform(self, mapping, context=None): + if not isinstance(mapping, AbcMapping): + self._failure("value is not a dict", value=mapping) + checked_mapping = {} + errors = {} + for key, value in mapping.items(): + pair_errors = {} + try: + checked_key = await self.key.async_check(key, context=context) + except DataError as err: + pair_errors['key'] = err + try: + checked_value = await self.value.async_check(value, context=context) + except DataError as err: + pair_errors['value'] = err + if pair_errors: + errors[key] = DataError(error=pair_errors) + else: + checked_mapping[checked_key] = checked_value + if errors: + raise DataError(error=errors, trafaret=self) + return checked_mapping + + +class CallAsyncMixin: + async def async_transform(self, value, context=None): + if not inspect.iscoroutinefunction(self.fn): + return self.transform(value, context=context) + if self.supports_context: + res = await self.fn(value, context=context) + else: + res = await self.fn(value) + if isinstance(res, DataError): + raise res + else: + return res + + +class ForwardAsyncMixin: + async def async_transform(self, value, context=None): + if self.trafaret is None: + self._failure('trafaret not set yet', value=value) + return (await self.trafaret.async_check(value, context=context)) + + +class DictAsyncMixin: + async def async_transform(self, value, context=None): + if not isinstance(value, AbcMapping): + self._failure("value is not a dict", value=value) + collect = {} + errors = {} + touched_names = [] + for key in self.keys: + if not callable(key) and not hasattr(key, 'async_call'): + raise ValueError('Non callable Keys are not supported') + key_run = call_with_context_if_support( + getattr(key, 'async_call', key), + value, + context=context, + ) + if inspect.isasyncgen(key_run): + async for k, v, names in key_run: + if isinstance(v, DataError): + errors[k] = v + else: + collect[k] = v + touched_names.extend(names) + else: + for k, v, names in key_run: + if isinstance(v, DataError): + errors[k] = v + else: + collect[k] = v + touched_names.extend(names) + + if not self.ignore_any: + for key in value: + if key in touched_names: + continue + if key in self.ignore: + continue + if not self.allow_any and key not in self.extras: + errors[key] = DataError("%s is not allowed key" % key) + elif key in collect: + errors[key] = DataError("%s key was shadowed" % key) + else: + try: + collect[key] = await self.extras_trafaret.async_check(value[key]) + except DataError as de: + errors[key] = de + if errors: + raise DataError(error=errors, trafaret=self) + return collect + + +class KeyAsyncMixin: + async def async_call(self, data, context=None): + if self.name in data or self.default is not _empty: + if callable(self.default): + default = self.default() + else: + default = self.default + try: + value = await self.trafaret.async_check(self.get_data(data, default), context=context) + except DataError as data_error: + value = data_error + yield ( + self.get_name(), + value, + (self.name,) + ) + return + + if not self.optional: + yield self.name, DataError(error='is required'), (self.name,) diff --git a/trafaret/base.py b/trafaret/base.py new file mode 100644 index 0000000..8d3dd81 --- /dev/null +++ b/trafaret/base.py @@ -0,0 +1,1387 @@ +# -*- coding: utf-8 -*- + +import functools +import inspect +import itertools +import numbers +import warnings +from collections import Mapping as AbcMapping +from .lib import ( + py3, + py36, + py3metafix, + getargspec, + call_with_context_if_support, + _empty, +) +from .dataerror import DataError + + +if py36: + from .async import ( + TrafaretAsyncMixin, + OrAsyncMixin, + AndAsyncMixin, + ListAsyncMixin, + TupleAsyncMixin, + MappingAsyncMixin, + CallAsyncMixin, + ForwardAsyncMixin, + DictAsyncMixin, + KeyAsyncMixin, + ) +else: + class EmptyMixin(object): + pass + TrafaretAsyncMixin = EmptyMixin + OrAsyncMixin = EmptyMixin + AndAsyncMixin = EmptyMixin + ListAsyncMixin = EmptyMixin + TupleAsyncMixin = EmptyMixin + MappingAsyncMixin = EmptyMixin + CallAsyncMixin = EmptyMixin + ForwardAsyncMixin = EmptyMixin + DictAsyncMixin = EmptyMixin + KeyAsyncMixin = EmptyMixin + + +# Python3 support +if py3: + str_types = (str, bytes) + unicode = str + BYTES_TYPE = bytes +else: + try: + from future_builtins import map + except ImportError: + # Support for GAE runner + from itertools import imap as map + str_types = (basestring,) # noqa + BYTES_TYPE = str + + +def _dd(value): + if not hasattr(value, 'items'): + return repr(value) + return r"{%s}" % ', '.join("%r: %s" % (x[0], _dd(x[1])) for x in sorted(value.items(), key=lambda x: x[0])) + + +def deprecated(message): + warnings.warn(message, DeprecationWarning) + + +""" +Trafaret is tiny library for data validation +It provides several primitives to validate complex data structures +Look at doctests for usage examples +""" + + +class TrafaretMeta(type): + """ + Metaclass for trafarets to make using "|" operator possible not only + on instances but on classes + + >>> Int | String + , )> + >>> Int | String | Null + , , )> + >>> (Int >> (lambda v: v if v ** 2 > 15 else 0)).check(5) + 5 + """ + + def __or__(cls, other): + return cls() | other + + def __and__(cls, other): + return cls() & other + + def __rshift__(cls, other): + return cls() >> other + + +@py3metafix +class Trafaret(TrafaretAsyncMixin): + """ + Base class for trafarets, provides only one method for + trafaret validation failure reporting + """ + + __metaclass__ = TrafaretMeta + + def check(self, value, context=None): + """ + Common logic. In subclasses you need to implement check_value or + check_and_return. + """ + if hasattr(self, 'transform'): + return self.transform(value, context=context) + elif hasattr(self, 'check_value'): + self.check_value(value) + return value + elif hasattr(self, 'check_and_return'): + return self.check_and_return(value) + else: + cls = "{}.{}".format( + type(self).__module__, + type(self).__name__, + ) + raise NotImplementedError( + "You must implement check_value or" + " check_and_return methods '%s'" % cls + ) + + def _failure(self, error=None, value=_empty): + """ + Shortcut method for raising validation error + """ + raise DataError(error=error, value=value, trafaret=self) + + def append(self, other): + """ + Appends new converter to list. + """ + return And(self, other) + + def __or__(self, other): + return Or(self, other) + + def __and__(self, other): + return And(self, other) + + def __rshift__(self, other): + return And(self, other) + + def __call__(self, val, context=None): + return self.check(val, context=context) + + +class OnError(Trafaret): + def __init__(self, trafaret, message): + self.trafaret = trafaret + self.message = message + + def transform(self, value, context=None): + try: + return self.trafaret(value, context=context) + except DataError: + raise DataError(self.message, value=value) + + +def ensure_trafaret(trafaret): + """ + Helper for complex trafarets, takes trafaret instance or class + and returns trafaret instance + """ + if isinstance(trafaret, Trafaret): + return trafaret + elif inspect.isroutine(trafaret): + if 'context' in getargspec(trafaret).args: + return trafaret + else: + return Call(trafaret) + elif isinstance(trafaret, type): + if issubclass(trafaret, Trafaret): + return trafaret() + # str, int, float are classes, but its appropriate to use them + # as trafaret functions + return Call(lambda val: trafaret(val)) + else: + raise RuntimeError("%r should be instance or subclass" + " of Trafaret" % trafaret) + + +class TypeMeta(TrafaretMeta): + def __getitem__(self, type_): + return self(type_) + + +@py3metafix +class TypingTrafaret(Trafaret): + """A trafaret used for instance type and class inheritance checks.""" + + __metaclass__ = TypeMeta + + def __init__(self, type_): + self.type_ = type_ + + def check_value(self, value): + if not self.typing_checker(value, self.type_): + self._failure(self.failure_message % self.type_.__name__, value=value) + + def __repr__(self): + return "<%s(%s)>" % (self.__class__.__name__, self.type_.__name__) + + +class Subclass(TypingTrafaret): + """ + >>> Subclass(type) + + >>> Subclass[type] + + >>> s = Subclass[type] + >>> s.check(type) + + >>> extract_error(s, object) + 'value is not subclass of type' + """ + + typing_checker = issubclass + failure_message = "value is not subclass of %s" + + +class Type(TypingTrafaret): + """ + >>> Type(int) + + >>> Type[int] + + >>> c = Type[int] + >>> c.check(1) + 1 + >>> extract_error(c, "foo") + 'value is not int' + """ + + typing_checker = isinstance + failure_message = "value is not %s" + + +class Any(Trafaret): + """ + >>> Any() + + >>> (Any() >> ignore).check(object()) + """ + + def check_value(self, value): + pass + + def __repr__(self): + return "" + + +class OrMeta(TrafaretMeta): + """ + Allows to use "<<" operator on Or class + + >>> Or << Int << String + , )> + """ + + def __lshift__(cls, other): + return cls() << other + + +@py3metafix +class Or(Trafaret, OrAsyncMixin): + """ + >>> nullString = Or(String, Null) + >>> nullString + , )> + >>> nullString.check(None) + >>> nullString.check("test") + 'test' + >>> extract_error(nullString, 1) + {0: 'value is not a string', 1: 'value should be None'} + """ + + __metaclass__ = OrMeta + __slots__ = ['trafarets'] + + def __init__(self, *trafarets): + self.trafarets = list(map(ensure_trafaret, trafarets)) + + def transform(self, value, context=None): + errors = [] + for trafaret in self.trafarets: + try: + return trafaret(value, context=context) + except DataError as e: + errors.append(e) + raise DataError(dict(enumerate(errors)), trafaret=self) + + def __lshift__(self, trafaret): + self.trafarets.append(ensure_trafaret(trafaret)) + return self + + def __or__(self, trafaret): + self << trafaret + return self + + def __repr__(self): + return "" % (", ".join(map(repr, self.trafarets))) + + +class And(Trafaret, AndAsyncMixin): + """ + Will work over trafarets sequentially + """ + __slots__ = ('trafaret', 'other', 'disable_old_check_convert') + + def __init__(self, trafaret, other): + self.trafaret = ensure_trafaret(trafaret) + self.other = ensure_trafaret(other) + + def transform(self, value, context=None): + res = self.trafaret(value, context=context) + if isinstance(res, DataError): + raise DataError + res = self.other(res, context=context) + if isinstance(res, DataError): + raise res + return res + + def __repr__(self): + return repr(self.trafaret) + + +class Null(Trafaret): + """ + >>> Null() + + >>> Null().check(None) + >>> extract_error(Null(), 1) + 'value should be None' + """ + + def check_value(self, value): + if value is not None: + self._failure("value should be None", value=value) + + def __repr__(self): + return "" + + +class Bool(Trafaret): + """ + >>> Bool() + + >>> Bool().check(True) + True + >>> Bool().check(False) + False + >>> extract_error(Bool(), 1) + 'value should be True or False' + """ + + def check_value(self, value): + if not isinstance(value, bool): + self._failure("value should be True or False", value=value) + + def __repr__(self): + return "" + + +class StrBool(Trafaret): + """ + >>> extract_error(StrBool(), 'aloha') + "value can't be converted to Bool" + >>> StrBool().check(1) + True + >>> StrBool().check(0) + False + >>> StrBool().check('y') + True + >>> StrBool().check('n') + False + >>> StrBool().check(None) + False + >>> StrBool().check('1') + True + >>> StrBool().check('0') + False + >>> StrBool().check('YeS') + True + >>> StrBool().check('No') + False + >>> StrBool().check(True) + True + >>> StrBool().check(False) + False + """ + + convertable = ('t', 'true', 'false', 'y', 'n', 'yes', 'no', 'on', + '1', '0', 'none') + + def check_and_return(self, value): + _value = str(value).strip().lower() + if _value not in self.convertable: + self._failure("value can't be converted to Bool", value=value) + return _value in ('t', 'true', 'y', 'yes', 'on', '1') + + def __repr__(self): + return "" + + +class NumberMeta(TrafaretMeta): + """ + Allows slicing syntax for min and max arguments for + number trafarets + + >>> Int[1:] + + >>> Int[1:10] + + >>> Int[:10] + + >>> Float[1:] + + >>> Int > 3 + + >>> 1 < (Float < 10) + + >>> (Int > 5).check(10) + 10 + >>> extract_error(Int > 5, 1) + 'value should be greater than 5' + >>> (Int < 3).check(1) + 1 + >>> extract_error(Int < 3, 3) + 'value should be less than 3' + """ + + def __getitem__(cls, slice_): + return cls(gte=slice_.start, lte=slice_.stop) + + def __lt__(cls, lt): + return cls(lt=lt) + + def __gt__(cls, gt): + return cls(gt=gt) + + +@py3metafix +class FloatRaw(Trafaret): + """ + Tests that value is a float or a string that is convertable to float. + + >>> Float() + + >>> Float(gte=1) + + >>> Float(lte=10) + + >>> Float(gte=1, lte=10) + + >>> Float().check(1.0) + 1.0 + >>> extract_error(Float(), 1 + 3j) + 'value is not float' + >>> extract_error(Float(), 1) + 1.0 + >>> Float(gte=2).check(3.0) + 3.0 + >>> extract_error(Float(gte=2), 1.0) + 'value is less than 2' + >>> Float(lte=10).check(5.0) + 5.0 + >>> extract_error(Float(lte=3), 5.0) + 'value is greater than 3' + >>> Float().check("5.0") + 5.0 + """ + + __metaclass__ = NumberMeta + + convertable = str_types + (numbers.Real,) + value_type = float + + def __init__(self, gte=None, lte=None, gt=None, lt=None): + self.gte = gte + self.lte = lte + self.gt = gt + self.lt = lt + + def _converter(self, value): + if not isinstance(value, self.convertable): + self._failure('value is not %s' % self.value_type.__name__, value=value) + try: + return self.value_type(value) + except ValueError: + self._failure( + "value can't be converted to %s" % self.value_type.__name__, + value=value + ) + + def _check(self, data): + if not isinstance(data, self.value_type): + value = self._converter(data) + else: + value = data + if self.gte is not None and value < self.gte: + self._failure("value is less than %s" % self.gte, value=data) + if self.lte is not None and value > self.lte: + self._failure("value is greater than %s" % self.lte, value=data) + if self.lt is not None and value >= self.lt: + self._failure("value should be less than %s" % self.lt, value=data) + if self.gt is not None and value <= self.gt: + self._failure("value should be greater than %s" % self.gt, value=data) + return value + + def check_and_return(self, data): + self._check(data) + return data + + def __lt__(self, lt): + return type(self)(gte=self.gte, lte=self.lte, gt=self.gt, lt=lt) + + def __gt__(self, gt): + return type(self)(gte=self.gte, lte=self.lte, gt=gt, lt=self.lt) + + def __repr__(self): + r = "<%s" % type(self).__name__ + options = [] + for param in ("gte", "lte", "gt", "lt"): + if getattr(self, param) is not None: + options.append("%s=%s" % (param, getattr(self, param))) + if options: + r += "(%s)" % (", ".join(options)) + r += ">" + return r + + +class Float(FloatRaw): + """Checks that value is a float. + Or if value is a string converts this string to float + """ + def check_and_return(self, data): + return self._check(data) + + +class IntRaw(FloatRaw): + """ + >>> Int() + + >>> Int().check(5) + 5 + >>> extract_error(Int(), 1.1) + 'value is not int' + >>> extract_error(Int(), 1 + 1j) + 'value is not int' + """ + + value_type = int + + def _converter(self, value): + if isinstance(value, float): + if not value.is_integer(): + self._failure('value is not int', value=value) + return super(IntRaw, self)._converter(value) + + +class Int(IntRaw): + def check_and_return(self, data): + return self._check(data) + + +class Atom(Trafaret): + """ + >>> Atom('atom').check('atom') + 'atom' + >>> extract_error(Atom('atom'), 'molecule') + "value is not exactly 'atom'" + """ + __slots__ = ['value'] + + def __init__(self, value): + self.value = value + + def check_value(self, value): + if self.value != value: + self._failure("value is not exactly '%s'" % self.value, value=value) + + +class String(Trafaret): + """ + >>> String() + + >>> String(allow_blank=True) + + >>> String().check("foo") + 'foo' + >>> extract_error(String(), "") + 'blank value is not allowed' + >>> String(allow_blank=True).check("") + '' + >>> extract_error(String(), 1) + 'value is not a string' + >>> String(min_length=2, max_length=3).check('123') + '123' + >>> extract_error(String(min_length=2, max_length=6), '1') + 'String is shorter than 2 characters' + >>> extract_error(String(min_length=2, max_length=6), '1234567') + 'String is longer than 6 characters' + >>> String(min_length=2, max_length=6, allow_blank=True) + Traceback (most recent call last): + ... + AssertionError: Either allow_blank or min_length should be specified, not both + >>> String(min_length=0, max_length=6, allow_blank=True).check('123') + '123' + """ + + def __init__(self, allow_blank=False, min_length=None, max_length=None): + assert not (allow_blank and min_length), \ + "Either allow_blank or min_length should be specified, not both" + self.allow_blank = allow_blank + self.min_length = min_length + self.max_length = max_length + + def check_and_return(self, value): + if not isinstance(value, str_types): + self._failure("value is not a string", value=value) + if not self.allow_blank and len(value) == 0: + self._failure("blank value is not allowed", value=value) + elif self.allow_blank and len(value) == 0: + return value + if self.min_length is not None and len(value) < self.min_length: + self._failure('String is shorter than %s characters' % self.min_length, value=value) + if self.max_length is not None and len(value) > self.max_length: + self._failure('String is longer than %s characters' % self.max_length, value=value) + return value + + def __repr__(self): + return "" if self.allow_blank else "" + + +class Bytes(Trafaret): + def __init__(self, encoding='utf-8'): + self.encoding = encoding + + def check_and_return(self, value): + if not isinstance(value, BYTES_TYPE): + self._failure('Value is not bytes', value=value) + try: + return value.decode(self.encoding) + except UnicodeError: + raise self._failure('value cannot be decoded with %s encoding' % self.encoding) + + +class SquareBracketsMeta(TrafaretMeta): + """ + Allows usage of square brackets for List initialization + + >>> List[Int] + )> + >>> List[Int, 1:] + )> + >>> List[:10, Int] + )> + >>> List[1:10] + Traceback (most recent call last): + ... + RuntimeError: Trafaret is required for List initialization + """ + + def __getitem__(self, args): + slice_ = None + trafaret = None + if not isinstance(args, tuple): + args = (args, ) + for arg in args: + if isinstance(arg, slice): + slice_ = arg + elif ( + isinstance(arg, Trafaret) + or issubclass(arg, Trafaret) + or isinstance(arg, type) + ): + trafaret = arg + if not trafaret: + raise RuntimeError("Trafaret is required for List initialization") + if slice_: + return self( + trafaret, + min_length=slice_.start or 0, + max_length=slice_.stop, + ) + return self(trafaret) + + +@py3metafix +class List(Trafaret, ListAsyncMixin): + """ + >>> List(Int) + )> + >>> List(Int, min_length=1) + )> + >>> List(Int, min_length=1, max_length=10) + )> + >>> extract_error(List(Int), 1) + 'value is not a list' + >>> List(Int).check([1, 2, 3]) + [1, 2, 3] + >>> List(String).check(["foo", "bar", "spam"]) + ['foo', 'bar', 'spam'] + >>> extract_error(List(Int), [1, 2, 1 + 3j]) + {2: 'value is not int'} + >>> List(Int, min_length=1).check([1, 2, 3]) + [1, 2, 3] + >>> extract_error(List(Int, min_length=1), []) + 'list length is less than 1' + >>> List(Int, max_length=2).check([1, 2]) + [1, 2] + >>> extract_error(List(Int, max_length=2), [1, 2, 3]) + 'list length is greater than 2' + >>> extract_error(List(Int), ["a"]) + {0: "value can't be converted to int"} + """ + + __metaclass__ = SquareBracketsMeta + __slots__ = ['trafaret', 'min_length', 'max_length'] + + def __init__(self, trafaret, min_length=0, max_length=None): + self.trafaret = ensure_trafaret(trafaret) + self.min_length = min_length + self.max_length = max_length + + def check_common(self, value): + if not isinstance(value, list): + self._failure("value is not a list", value=value) + if len(value) < self.min_length: + self._failure("list length is less than %s" % self.min_length, value=value) + if self.max_length is not None and len(value) > self.max_length: + self._failure("list length is greater than %s" % self.max_length, value=value) + + def transform(self, value, context=None): + self.check_common(value) + lst = [] + errors = {} + for index, item in enumerate(value): + try: + lst.append(self.trafaret(item, context=context)) + except DataError as err: + errors[index] = err + if errors: + raise DataError(error=errors, trafaret=self) + return lst + + def __repr__(self): + r = ">> t = Tuple(Int, Int, String) + >>> t.check([3, 4, '5']) + (3, 4, '5') + >>> extract_error(t, [3, 4, 5]) + {2: 'value is not a string'} + >>> t + , , )> + """ + __slots__ = ['trafarets', 'length'] + + def __init__(self, *args): + self.trafarets = list(map(ensure_trafaret, args)) + self.length = len(self.trafarets) + + def check_common(self, value): + try: + value = tuple(value) + except TypeError: + self._failure('value must be convertable to tuple', value=value) + if len(value) != self.length: + self._failure('value must contain %s items' % self.length, value=value) + + def transform(self, value, context=None): + self.check_common(value) + result = [] + errors = {} + for idx, (item, trafaret) in enumerate(zip(value, self.trafarets)): + try: + result.append(trafaret(item, context=context)) + except DataError as err: + errors[idx] = err + if errors: + self._failure(errors, value=value) + return tuple(result) + + def __repr__(self): + return '' + + +class Key(KeyAsyncMixin): + """ + Helper class for Dict. + + It gets ``name``, and provides method ``extract(data)`` that extract key value + from data through mapping ``get`` method. + Key `__call__` method yields ``(key name, Maybe(DataError), [touched keys])`` triples. + + You can redefine ``get_data(data, default)`` method in subclassed ``Key`` if you want to use something other + then ``.get(...)`` method. + + Like this for the aiohttp MultiDict:: + + class MDKey(t.Key): + def get_data(data, default): + return data.get_all(self.name, default) + """ + __slots__ = ['name', 'to_name', 'default', 'optional', 'trafaret'] + + def __init__(self, name, default=_empty, optional=False, to_name=None, trafaret=None): + self.name = name + self.to_name = to_name + self.default = default + self.optional = optional + self.trafaret = ensure_trafaret(trafaret) if trafaret else Any() + + def __call__(self, data, context=None): + if self.name in data or self.default is not _empty: + if callable(self.default): + default = self.default() + else: + default = self.default + yield ( + self.get_name(), + catch_error(self.trafaret, self.get_data(data, default), context=context), + (self.name,) + ) + return + + if not self.optional: + yield self.name, DataError(error='is required'), (self.name,) + + def get_data(self, data, default): + return data.get(self.name, default) + + def keys_names(self): + yield self.name + + def set_trafaret(self, trafaret): + self.trafaret = ensure_trafaret(trafaret) + return self + + def __rshift__(self, name): + self.to_name = name + return self + + def get_name(self): + return self.to_name or self.name + + def make_optional(self): + self.optional = True + + def __repr__(self): + return '<%s "%s"%s>' % ( + self.__class__.__name__, + self.name, + ' to "%s"' % self.to_name if getattr(self, 'to_name', False) else '', + ) + + +class Dict(Trafaret, DictAsyncMixin): + """ + >>> trafaret = Dict(foo=Int, bar=String) >> ignore + >>> trafaret.check({"foo": 1, "bar": "spam"}) + >>> extract_error(trafaret, {"foo": 1, "bar": 2}) + {'bar': 'value is not a string'} + >>> extract_error(trafaret, {"foo": 1}) + {'bar': 'is required'} + >>> extract_error(trafaret, {"foo": 1, "bar": "spam", "eggs": None}) + {'eggs': 'eggs is not allowed key'} + >>> trafaret.allow_extra("eggs") + , foo=)> + >>> trafaret.check({"foo": 1, "bar": "spam", "eggs": None}) + >>> trafaret.check({"foo": 1, "bar": "spam"}) + >>> extract_error(trafaret, {"foo": 1, "bar": "spam", "ham": 100}) + {'ham': 'ham is not allowed key'} + >>> trafaret.allow_extra("*") + , foo=)> + >>> trafaret.check({"foo": 1, "bar": "spam", "ham": 100}) + >>> trafaret.check({"foo": 1, "bar": "spam", "ham": 100, "baz": None}) + >>> extract_error(trafaret, {"foo": 1, "ham": 100, "baz": None}) + {'bar': 'is required'} + >>> trafaret = Dict({Key('bar', optional=True): String}, foo=Int) + >>> trafaret.allow_extra("*") + , foo=)> + >>> _dd(trafaret.check({"foo": 1, "ham": 100, "baz": None})) + "{'baz': None, 'foo': 1, 'ham': 100}" + >>> _dd(extract_error(trafaret, {"bar": 1, "ham": 100, "baz": None})) + "{'bar': 'value is not a string', 'foo': 'is required'}" + >>> extract_error(trafaret, {"foo": 1, "bar": 1, "ham": 100, "baz": None}) + {'bar': 'value is not a string'} + >>> trafaret = Dict({Key('bar', default='nyanya') >> 'baz': String}, foo=Int) + >>> _dd(trafaret.check({'foo': 4})) + "{'baz': 'nyanya', 'foo': 4}" + >>> _ = trafaret.ignore_extra('fooz') + >>> _dd(trafaret.check({'foo': 4, 'fooz': 5})) + "{'baz': 'nyanya', 'foo': 4}" + >>> _ = trafaret.ignore_extra('*') + >>> _dd(trafaret.check({'foo': 4, 'foor': 5})) + "{'baz': 'nyanya', 'foo': 4}" + """ + __slots__ = ['extras', 'extras_trafaret', 'allow_any', 'ignore', 'ignore_any', 'keys'] + + def __init__(self, *args, **trafarets): + if args and isinstance(args[0], AbcMapping): + keys = args[0] + args = args[1:] + else: + keys = {} + if any(not callable(key) for key in args): + raise RuntimeError('Keys in single attributes must be callables') + + # extra + allow_extra = trafarets.pop('allow_extra', []) + allow_extra_trafaret = trafarets.pop('allow_extra_trafaret', Any) + self.extras_trafaret = ensure_trafaret(allow_extra_trafaret) + self.allow_any = '*' in allow_extra + self.extras = [name for name in allow_extra if name != '*'] + # ignore + ignore_extra = trafarets.pop('ignore_extra', []) + self.ignore_any = '*' in ignore_extra + self.ignore = [name for name in ignore_extra if name != '*'] + + self.keys = list(args) + for key, trafaret in itertools.chain(trafarets.items(), keys.items()): + key_ = Key(key) if isinstance(key, str_types) else key + key_.set_trafaret(trafaret) + self.keys.append(key_) + + def allow_extra(self, *names, **kw): + trafaret = kw.get('trafaret', Any) + for name in names: + if name == "*": + self.allow_any = True + else: + self.extras.append(name) + self.extras_trafaret = ensure_trafaret(trafaret) + return self + + def ignore_extra(self, *names): + for name in names: + if name == "*": + self.ignore_any = True + else: + self.ignore.append(name) + return self + + def make_optional(self, *args): + for key in self.keys: + if key.name in args or '*' in args: + key.make_optional() + return self + + def transform(self, value, context=None): + if not isinstance(value, AbcMapping): + self._failure("value is not a dict", value=value) + collect = {} + errors = {} + touched_names = [] + for key in self.keys: + if not callable(key): + raise ValueError('Non callable Keys are not supported') + for k, v, names in call_with_context_if_support(key, value, context=context): + if isinstance(v, DataError): + errors[k] = v + else: + collect[k] = v + touched_names.extend(names) + + if not self.ignore_any: + for key in value: + if key in touched_names: + continue + if key in self.ignore: + continue + if not self.allow_any and key not in self.extras: + errors[key] = DataError("%s is not allowed key" % key) + elif key in collect: + errors[key] = DataError("%s key was shadowed" % key) + else: + try: + collect[key] = self.extras_trafaret(value[key]) + except DataError as de: + errors[key] = de + if errors: + raise DataError(error=errors, trafaret=self) + return collect + + def keys_names(self): + for key in self.keys: + for k in key.keys_names(): + yield k + + def __repr__(self): + r = ">> _dd(DictKeys(['a','b']).check({'a':1,'b':2,})) + "{'a': 1, 'b': 2}" + >>> extract_error(DictKeys(['a','b']), {'a':1,'b':2,'c':3,}) + {'c': 'c is not allowed key'} + >>> extract_error(DictKeys(['key','key2']), {'key':'val'}) + {'key2': 'is required'} + """ + req = [(Key(key), Any) for key in keys] + return Dict(dict(req)) + + +class Mapping(Trafaret, MappingAsyncMixin): + """ + Mapping gets two trafarets as arguments, one for key and one for value, + like `Mapping(t.Int, t.List(t.Str))`. + """ + __slots__ = ['key', 'value'] + + def __init__(self, key, value): + self.key = ensure_trafaret(key) + self.value = ensure_trafaret(value) + + def transform(self, mapping, context=None): + if not isinstance(mapping, AbcMapping): + self._failure("value is not a dict", value=mapping) + checked_mapping = {} + errors = {} + for key, value in mapping.items(): + pair_errors = {} + try: + checked_key = self.key(key, context=context) + except DataError as err: + pair_errors['key'] = err + try: + checked_value = self.value(value, context=context) + except DataError as err: + pair_errors['value'] = err + if pair_errors: + errors[key] = DataError(error=pair_errors) + else: + checked_mapping[checked_key] = checked_value + if errors: + raise DataError(error=errors, trafaret=self) + return checked_mapping + + def __repr__(self): + return " %r)>" % (self.key, self.value) + + +class Enum(Trafaret): + """ + >>> trafaret = Enum("foo", "bar", 1) >> ignore + >>> trafaret + + >>> trafaret.check("foo") + >>> trafaret.check(1) + >>> extract_error(trafaret, 2) + "value doesn't match any variant" + """ + __slots__ = ['variants'] + + def __init__(self, *variants): + self.variants = variants[:] + + def check_value(self, value): + if value not in self.variants: + self._failure("value doesn't match any variant", value=value) + + def __repr__(self): + return "" % (", ".join(map(repr, self.variants))) + + +class Callable(Trafaret): + """ + >>> (Callable() >> ignore).check(lambda: 1) + >>> extract_error(Callable(), 1) + 'value is not callable' + """ + + def check_value(self, value): + if not callable(value): + self._failure("value is not callable", value=value) + + def __repr__(self): + return "" + + +class Call(Trafaret, CallAsyncMixin): + """ + >>> def validator(value): + ... if value != "foo": + ... return DataError("I want only foo!") + ... return 'foo' + ... + >>> trafaret = Call(validator) + >>> trafaret + + >>> trafaret.check("foo") + 'foo' + >>> extract_error(trafaret, "bar") + 'I want only foo!' + """ + __slots__ = ['fn'] + + def __init__(self, fn): + if not callable(fn): + raise RuntimeError("Call argument should be callable") + argspec = getargspec(fn) + args = set(argspec.args) + self.supports_context = 'context' in args + if 'context' in args: + args.remove('context') + if len(argspec.args) - len(argspec.defaults or []) > 1: + raise RuntimeError("Call argument should be" + " one argument function") + self.fn = fn + + def transform(self, value, context=None): + if self.supports_context: + res = self.fn(value, context=context) + else: + res = self.fn(value) + if isinstance(res, DataError): + raise res + else: + return res + + def __repr__(self): + return "" % self.fn.__name__ + + +class Forward(Trafaret, ForwardAsyncMixin): + """ + >>> node = Forward() + >>> node << Dict(name=String, children=List[node]) + >>> node + )>, name=)>)> + >>> node.check({"name": "foo", "children": []}) == {'children': [], 'name': 'foo'} + True + >>> extract_error(node, {"name": "foo", "children": [1]}) + {'children': {0: 'value is not a dict'}} + >>> node.check({"name": "foo", "children": [ \ + {"name": "bar", "children": []} \ + ]}) == {'children': [{'children': [], 'name': 'bar'}], 'name': 'foo'} + True + >>> empty_node = Forward() + >>> empty_node + + >>> extract_error(empty_node, 'something') + 'trafaret not set yet' + """ + + def __init__(self): + self.trafaret = None + self._recur_repr = False + + def __lshift__(self, trafaret): + self.provide(trafaret) + + def provide(self, trafaret): + if self.trafaret: + raise RuntimeError("trafaret for Forward is already specified") + self.trafaret = ensure_trafaret(trafaret) + + def transform(self, value, context=None): + if self.trafaret is None: + self._failure('trafaret not set yet', value=value) + return self.trafaret(value, context=context) + + def __repr__(self): + # XXX not threadsafe + if self._recur_repr: + return "" + self._recur_repr = True + r = "" % self.trafaret + self._recur_repr = False + return r + + +class GuardError(DataError): + """ + Raised when guarded function gets invalid arguments, + inherits error message from corresponding DataError + """ + + pass + + +def guard(trafaret=None, **kwargs): + """ + Decorator for protecting function with trafarets + + >>> @guard(a=String, b=Int, c=String) + ... def fn(a, b, c="default"): + ... '''docstring''' + ... return (a, b, c) + ... + >>> fn.__module__ = None + >>> help(fn) + Help on function fn: + + fn(*args, **kwargs) + guarded with , b=, c=)> + + docstring + + >>> fn("foo", 1) + ('foo', 1, 'default') + >>> extract_error(fn, "foo", 1, 2) + {'c': 'value is not a string'} + >>> extract_error(fn, "foo") + {'b': 'is required'} + >>> g = guard(Dict()) + >>> c = Forward() + >>> c << Dict(name=str, children=List[c]) + >>> g = guard(c) + >>> g = guard(Int()) + Traceback (most recent call last): + ... + RuntimeError: trafaret should be instance of Dict or Forward + """ + if ( + trafaret + and not isinstance(trafaret, Dict) + and not isinstance(trafaret, Forward) + ): + raise RuntimeError("trafaret should be instance of Dict or Forward") + elif trafaret and kwargs: + raise RuntimeError("choose one way of initialization," + " trafaret or kwargs") + if not trafaret: + trafaret = Dict(**kwargs) + + def wrapper(fn): + if py3: + argspec = inspect.getfullargspec(fn) + else: + argspec = inspect.getargspec(fn) + + @functools.wraps(fn) + def decor(*args, **kwargs): + fnargs = argspec.args + if fnargs[0] in ['self', 'cls']: + obj = args[0] + fnargs = fnargs[1:] + checkargs = args[1:] + else: + obj = None + checkargs = args + + try: + call_args = dict( + itertools.chain(zip(fnargs, checkargs), kwargs.items()) + ) + for name, default in zip(reversed(fnargs), + reversed(argspec.defaults or ())): + if name not in call_args: + call_args[name] = default + converted = trafaret(call_args) + except DataError as err: + raise GuardError(error=err.error) + return fn(obj, **converted) if obj else fn(**converted) + decor.__doc__ = "guarded with %r\n\n" % trafaret + (decor.__doc__ or "") + return decor + return wrapper + + +def ignore(val): + """ + Stub to ignore value from trafaret + Use it like: + + >>> a = Int >> ignore + >>> a.check(7) + """ + pass + + +def catch(checker, *a, **kw): + """ + Helper for tests - catch error and return it as dict + """ + + try: + if hasattr(checker, 'check'): + return checker.check(*a, **kw) + elif callable(checker): + return checker(*a, **kw) + except DataError as error: + return error + + +catch_error = catch + + +def extract_error(checker, *a, **kw): + """ + Helper for tests - catch error and return it as dict + """ + + res = catch_error(checker, *a, **kw) + if isinstance(res, DataError): + return res.as_dict() + return res diff --git a/trafaret/constructor.py b/trafaret/constructor.py index 6bbd9f6..c567d5f 100644 --- a/trafaret/constructor.py +++ b/trafaret/constructor.py @@ -1,5 +1,5 @@ import trafaret as t -from trafaret.lib import py3, py3metafix +from trafaret.lib import py3metafix class ConstructMeta(type): diff --git a/trafaret/contrib/object_id.py b/trafaret/contrib/object_id.py index 938d411..c82bc70 100644 --- a/trafaret/contrib/object_id.py +++ b/trafaret/contrib/object_id.py @@ -30,16 +30,13 @@ def __init__(self, allow_blank=False): def __repr__(self): return "" if self.allow_blank else "" - def converter(self, value): - try: - return self.value_type(value) - except InvalidId as e: - self._failure(str(e)) - def check_and_return(self, value): if not self.allow_blank and value is None: self._failure("blank value is not allowed") if isinstance(value, self.convertable) or value is None: - return value + try: + return ObjectId(value) + except InvalidId as e: + self._failure(str(e)) self._failure('value is not %s' % self.value_type.__name__) diff --git a/trafaret/contrib/rfc_3339.py b/trafaret/contrib/rfc_3339.py index 7b6e824..a32770b 100644 --- a/trafaret/contrib/rfc_3339.py +++ b/trafaret/contrib/rfc_3339.py @@ -16,19 +16,10 @@ def __init__(self, allow_blank=False): def __repr__(self): return "" if self.allow_blank else "" - def converter(self, value): + def check_and_return(self, value): if isinstance(value, datetime): return value try: return parse(value) except ValueError as e: self._failure(e.message) - - def check_and_return(self, value): - if isinstance(value, str_types): - if len(value) > 0 or self.allow_blank: - return value - if isinstance(value, datetime): - return value - - self._failure('value is not valid') diff --git a/trafaret/dataerror.py b/trafaret/dataerror.py new file mode 100644 index 0000000..7961056 --- /dev/null +++ b/trafaret/dataerror.py @@ -0,0 +1,33 @@ +from .lib import _empty + + +class DataError(ValueError): + """ + Error with data preserve + error can be a message or None if error raised in childs + data can be anything + """ + __slots__ = ['error', 'name', 'value', 'trafaret'] + + def __init__(self, error=None, name=None, value=_empty, trafaret=None): + self.error = error + self.name = name + self.value = value + self.trafaret = trafaret + + def __str__(self): + return str(self.error) + + def __repr__(self): + return 'DataError(%s)' % str(self) + + def as_dict(self, value=False): + def as_dict(dataerror): + if not isinstance(dataerror.error, dict): + if value and dataerror.value != _empty: + return '%s, got %r' % (str(dataerror.error), dataerror.value) + else: + return str(dataerror.error) + return dict((k, v.as_dict(value=value) if isinstance(v, DataError) else v) + for k, v in dataerror.error.items()) + return as_dict(self) diff --git a/trafaret/internet.py b/trafaret/internet.py new file mode 100644 index 0000000..260fd93 --- /dev/null +++ b/trafaret/internet.py @@ -0,0 +1,142 @@ +# -*- coding: utf-8 -*- + +import re +from .regexp import Regexp +from .base import DataError, String, Bytes, OnError +from .lib import py3 + +if py3: + import urllib.parse as urlparse +else: + import urlparse + + +MAX_EMAIL_LEN = 254 + + +EMAIL_REGEXP = re.compile( + # dot-atom + r"(?P^[-!#$%&'*+/=?^_`{}|~0-9A-Z]+(\.[-!#$%&'*+/=?^_`{}|~0-9A-Z]+)*" + # quoted-string + r'|^"([\001-\010\013\014\016-\037!#-\[\]-\177]|\\[\001-011\013\014\016-\177])*"' + # domain + r')@(?P(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)$)' + # literal form, ipv4 address (SMTP 4.1.3) + r'|\[(25[0-5]|2[0-4]\d|[0-1]?\d?\d)(\.(25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}\]$', + re.IGNORECASE, +) + + +def email_idna_encode(value): + if '@' in value: + parts = value.split('@') + try: + parts[-1] = parts[-1].encode('idna').decode('ascii') + return '@'.join(parts) + except UnicodeError: + pass + return value + + +email_regexp_trafaret = OnError(Regexp(EMAIL_REGEXP), 'value is not a valid email address') +email_trafaret = email_regexp_trafaret | ((Bytes('utf-8') | String()) & email_idna_encode & email_regexp_trafaret) +Email = String(allow_blank=True) & OnError( + String(max_length=MAX_EMAIL_LEN) & email_trafaret, + 'value is not a valid email address', +) + + +URL_REGEXP = re.compile( + r'^(?:http|ftp)s?://' # http:// or https:// + r'(?:\S+(?::\S*)?@)?' # user and password + r'(?:(?:[A-Z0-9](?:[A-Z0-9-_]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... + r'localhost|' # localhost... + r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip + r'(?::\d+)?' # optional port + r'(?:/?|[/?]\S+)$', + re.IGNORECASE, +) +URLRegexp = OnError(Regexp(URL_REGEXP), 'value is not URL') +URLRegexp = Regexp(URL_REGEXP) + + +def decode_url_idna(value): + try: + scheme, netloc, path, query, fragment = urlparse.urlsplit(value) + netloc = netloc.encode('idna').decode('ascii') # IDN -> ACE + except UnicodeError: # invalid domain part + pass + else: + return urlparse.urlunsplit((scheme, netloc, path, query, fragment)) + return value + + +URL = OnError( + URLRegexp | ((Bytes('utf-8') | String()) & decode_url_idna & URLRegexp), + 'value is not URL', +) + + +class IPv4(Regexp): + """ + >>> IPv4().check('127.0.0.1') + '127.0.0.1' + """ + + regex = re.compile( + r'^((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])$', # noqa + ) + + def __init__(self): + super(IPv4, self).__init__(self.regex) + + def check_and_return(self, value): + try: + return super(IPv4, self).check_and_return(value) + except DataError: + self._failure('value is not IPv4 address') + + def __repr__(self): + return '' + + +class IPv6(Regexp): + """ + >>> IPv6().check('2001:0db8:0000:0042:0000:8a2e:0370:7334') + '2001:0db8:0000:0042:0000:8a2e:0370:7334' + """ + + regex = re.compile( + r'^(' + r'(::)|' + r'(::[0-9a-f]{1,4})|' + r'([0-9a-f]{1,4}:){7,7}[0-9a-f]{1,4}|' + r'([0-9a-f]{1,4}:){1,7}:|' + r'([0-9a-f]{1,4}:){1,6}:[0-9a-f]{1,4}|' + r'([0-9a-f]{1,4}:){1,5}(:[0-9a-f]{1,4}){1,2}|' + r'([0-9a-f]{1,4}:){1,4}(:[0-9a-f]{1,4}){1,3}|' + r'([0-9a-f]{1,4}:){1,3}(:[0-9a-f]{1,4}){1,4}|' + r'([0-9a-f]{1,4}:){1,2}(:[0-9a-f]{1,4}){1,5}|' + r'[0-9a-f]{1,4}:((:[0-9a-f]{1,4}){1,6})|' + r':((:[0-9a-f]{1,4}){1,7}:)|' + r'fe80:(:[0-9a-f]{0,4}){0,4}%[0-9a-z]{1,}|' + r'::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|' # noqa + r'([0-9a-f]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])' # noqa + r')$', + re.IGNORECASE, + ) + + def __init__(self): + super(IPv6, self).__init__(self.regex) + + def check_and_return(self, value): + try: + return super(IPv6, self).check_and_return(value) + except DataError: + self._failure('value is not IPv6 address') + + def __repr__(self): + return '' + + +IP = OnError(IPv4 | IPv6, 'value is not IP address') diff --git a/trafaret/keys.py b/trafaret/keys.py new file mode 100644 index 0000000..177868d --- /dev/null +++ b/trafaret/keys.py @@ -0,0 +1,38 @@ +import trafaret as t + + +def xor_key(first, second, trafaret): + trafaret = t.Trafaret._trafaret(trafaret) + + def check_(value): + if (first in value) ^ (second in value): + key = first if first in value else second + yield first, t.catch_error(trafaret, value[key]), (key,) + elif first in value and second in value: + yield first, t.DataError(error=f'correct only if {second} is not defined'), (first,) + yield second, t.DataError(error=f'correct only if {first} is not defined'), (second,) + else: + yield first, t.DataError(error=f'is required if {second} is not defined'), (first,) + yield second, t.DataError(error=f'is required if {first} is not defined'), (second,) + + return check_ + + +def confirm_key(name, confirm_name, trafaret): + def check_(value): + first, second = None, None + if name in value: + first = value[name] + else: + yield name, t.DataError('is required'), (name,) + if confirm_name in value: + second = value[confirm_name] + else: + yield confirm_name, t.DataError('is required'), (confirm_name,) + if not (first and second): + return + yield name, t.catch_error(trafaret, first), (name,) + yield confirm_name, t.catch_error(trafaret, second), (confirm_name,) + if first != second: + yield confirm_name, t.DataError(f'must be equal to {name}'), (confirm_name,) + return check_ diff --git a/trafaret/lib.py b/trafaret/lib.py index 11226d1..83922b5 100644 --- a/trafaret/lib.py +++ b/trafaret/lib.py @@ -1,7 +1,18 @@ import sys +import inspect py3 = sys.version_info[0] == 3 +py36 = sys.version_info >= (3, 6, 0) + + +if py3: + getargspec = inspect.getfullargspec +else: + getargspec = inspect.getargspec + + +_empty = object() def py3metafix(cls): @@ -11,3 +22,12 @@ def py3metafix(cls): newcls = cls.__metaclass__(cls.__name__, (cls,), {}) newcls.__doc__ = cls.__doc__ return newcls + + +def call_with_context_if_support(callble, value, context): + if not inspect.isfunction(callble) and hasattr(callble, '__call__'): + callble = callble.__call__ + if 'context' in getargspec(callble).args: + return callble(value, context=context) + else: + return callble(value) diff --git a/trafaret/regexp.py b/trafaret/regexp.py new file mode 100644 index 0000000..2130f50 --- /dev/null +++ b/trafaret/regexp.py @@ -0,0 +1,29 @@ +import re +from .base import Trafaret, str_types + + +class RegexpRaw(Trafaret): + """ + Check if given string match given regexp + """ + __slots__ = ('regexp', 'raw_regexp') + + def __init__(self, regexp): + self.regexp = re.compile(regexp) if isinstance(regexp, str_types) else regexp + self.raw_regexp = self.regexp.pattern if self.regexp else None + + def check_and_return(self, value): + if not isinstance(value, str_types): + self._failure("value is not a string", value=value) + match = self.regexp.match(value) + if not match: + self._failure('does not match pattern %s' % self.raw_regexp, value=value) + return match + + def __repr__(self): + return '' + + +class Regexp(RegexpRaw): + def check_and_return(self, value): + return super(Regexp, self).check_and_return(value).group() diff --git a/trafaret/utils.py b/trafaret/utils.py index aa77230..87a2866 100644 --- a/trafaret/utils.py +++ b/trafaret/utils.py @@ -3,7 +3,7 @@ """ import collections from itertools import groupby -from . import _dd +from . import _dd # noqa def recursive_unfold(data, prefix='', delimeter='__'): @@ -97,7 +97,9 @@ def deep(data): return [i[1] for i in sorted(collect.items())] return collect - data_ = [(split(key, delimeter), value) - for key, value in sorted(data.items())] + data_ = [ + (split(key, delimeter), value) + for key, value in sorted(data.items()) + ] result = deep(data_) return result[prefix] if prefix else result diff --git a/trafaret/visitor.py b/trafaret/visitor.py index 4b3de76..6447cb8 100644 --- a/trafaret/visitor.py +++ b/trafaret/visitor.py @@ -1,5 +1,6 @@ -""" This module is expirement. API and implementation are unstable. - Supposed to use with ``Request`` object or something like that. +""" +This module is expirement. API and implementation are unstable. +Supposed to use with ``Request`` object or something like that. """ from collections import Mapping from . import Trafaret, DataError, Key, catch_error, _empty @@ -42,8 +43,13 @@ class DeepKey(Key): def pop(self, data): try: - yield self.get_name(), catch_error(self.trafaret, - get_deep_attr(data, self.name.split('.'))) + yield ( + self.get_name(), + catch_error( + self.trafaret, + get_deep_attr(data, self.name.split('.')), + ) + ) except DataError as e: if self.default != _empty: yield self.get_name(), self.default