diff --git a/README.rst b/README.rst
index 21e8155..6bd17b5 100644
--- a/README.rst
+++ b/README.rst
@@ -7,9 +7,7 @@ Trafaret
-----
-
-Read The Docs hosted documentation
-or look to the docs/api/intro.rst for start.
+Ultimate transformation library that supports validation, contexts and ``aiohttp``.
Trafaret is rigid and powerful lib to work with foreign data, configs etc.
It provides simple way to check anything, and convert it accordingly to your needs.
@@ -34,6 +32,25 @@ It have shortcut syntax and ability to express anything that you can code:
raise DataError(error=errors, trafaret=self)
trafaret.DataError: {'b': DataError({1: DataError(value is not a string)})}
+
+Read The Docs hosted documentation
+or look to the docs/api/intro.rst for start.
+
+
+New
+---
+
+* converters and ``convert=False`` are deleted in favor of ``And`` and ``&``
+* ``String`` parameter ``regex`` deleted in favor of ``Regexp`` and ``RegexpRaw`` usage
+* new ``OnError`` to customize error message
+* ``context=something`` argument for ``__call__`` and ``check`` Trafaret methods.
+ Supported by ``Or``, ``And``, ``Forward`` etc.
+* new customizable method ``transform`` like ``change_and_return`` but takes ``context=`` arg
+* new ``trafaret_instance.async_check`` method that works with ``await``
+
+Doc
+---
+
For simple example what can be done:
.. code-block:: python
diff --git a/circle.yml b/circle.yml
index 318f67a..abd9ce2 100644
--- a/circle.yml
+++ b/circle.yml
@@ -2,9 +2,13 @@
machine:
python:
version:
- 3.6.0
+ 3.6.2
post:
- - pyenv local 3.6.0 2.7.10
+ - pyenv local 3.6.2 3.5.2 2.7.10
+
+test:
+ override:
+ - tox
dependencies:
pre:
diff --git a/docs/changelog.rst b/docs/changelog.rst
index 7f70a16..a7c8ed8 100644
--- a/docs/changelog.rst
+++ b/docs/changelog.rst
@@ -1,6 +1,18 @@
Changelog
=========
+2017-08-04
+----------
+
+- converters and ``convert=False`` are deleted in favor of ``And`` and ``&``
+- ``String`` parameter ``regex`` deleted in favor of ``Regexp`` and ``RegexpRaw`` usage
+- new ``OnError`` to customize error message
+- ``context=something`` argument for ``__call__`` and ``check`` Trafaret methods.
+ Supported by ``Or``, ``And``, ``Forward`` etc.
+- new customizable method ``transform`` like ``change_and_return`` but takes ``context=`` arg
+- new ``trafaret_instance.async_check`` method that works with ``await``
+
+
2017-05-12
----------
diff --git a/tests/test_base.py b/tests/test_base.py
index 1dac6cc..64ede25 100644
--- a/tests/test_base.py
+++ b/tests/test_base.py
@@ -2,15 +2,16 @@
import unittest
import trafaret as t
from collections import Mapping as AbcMapping
-from trafaret import catch_error, extract_error, ignore, DataError
+from trafaret import catch_error, extract_error, DataError
from trafaret.extras import KeysSubset
class TestAnyTrafaret(unittest.TestCase):
def test_any(self):
+ obj = object()
self.assertEqual(
- (t.Any() >> ignore).check(object()),
- None
+ t.Any().check(obj),
+ obj
)
@@ -50,14 +51,14 @@ def validator(value):
class TestCallableTrafaret(unittest.TestCase):
def test_callable(self):
- (t.Callable() >> t.ignore).check(lambda: 1)
+ t.Callable().check(lambda: 1)
res = extract_error(t.Callable(), 1)
self.assertEqual(res, 'value is not callable')
class TestDictTrafaret(unittest.TestCase):
def test_base(self):
- trafaret = t.Dict(foo=t.Int, bar=t.String) >> t.ignore
+ trafaret = t.Dict(foo=t.Int, bar=t.String)
trafaret.check({"foo": 1, "bar": "spam"})
res = t.extract_error(trafaret, {"foo": 1, "bar": 2})
self.assertEqual(res, {'bar': 'value is not a string'})
@@ -102,8 +103,8 @@ def set_trafaret(self, trafaret):
trafaret = t.Dict({
OldKey(): t.Any
})
- res = trafaret.check({'testkey': 123})
- self.assertEqual(res, {'testkey': 123})
+ with self.assertRaises(ValueError):
+ trafaret.check({'testkey': 123})
def test_callable_key(self):
def simple_key(value):
@@ -237,27 +238,27 @@ def test_dict_keys(self):
class TestEmailTrafaret(unittest.TestCase):
def test_email(self):
- res = t.Email().check('someone@example.net')
+ res = t.Email.check('someone@example.net')
self.assertEqual(res, 'someone@example.net')
- res = extract_error(t.Email(),'someone@example') # try without domain-part
+ res = extract_error(t.Email,'someone@example') # try without domain-part
self.assertEqual(res, 'value is not a valid email address')
- res = str(t.Email().check('someone@пример.рф')) # try with `idna` encoding
+ res = str(t.Email.check('someone@пример.рф')) # try with `idna` encoding
self.assertEqual(res, 'someone@xn--e1afmkfd.xn--p1ai')
- res = (t.Email() >> (lambda m: m.groupdict()['domain'])).check('someone@example.net')
- self.assertEqual(res, 'example.net')
- res = extract_error(t.Email(), 'foo')
+ # res = (t.Email() >> (lambda m: m.groupdict()['domain'])).check('someone@example.net')
+ # self.assertEqual(res, 'example.net')
+ res = extract_error(t.Email, 'foo')
self.assertEqual(res, 'value is not a valid email address')
- res = extract_error(t.Email(), 'f' * 10000 + '@correct.domain.edu')
+ res = extract_error(t.Email, 'f' * 10000 + '@correct.domain.edu')
self.assertEqual(res, 'value is not a valid email address')
- res = extract_error(t.Email(), 'f' * 248 + '@x.edu') == 'f' * 248 + '@x.edu'
+ res = extract_error(t.Email, 'f' * 248 + '@x.edu') == 'f' * 248 + '@x.edu'
self.assertEqual(res, True)
- res = extract_error(t.Email(), 123)
+ res = extract_error(t.Email, 123)
self.assertEqual(res, 'value is not a string')
class TestEnumTrafaret(unittest.TestCase):
def test_enum(self):
- trafaret = t.Enum("foo", "bar", 1) >> ignore
+ trafaret = t.Enum("foo", "bar", 1)
self.assertEqual(repr(trafaret), "")
res = trafaret.check("foo")
res = trafaret.check(1)
@@ -476,7 +477,6 @@ def test_raise_error(self):
self.assertEqual(res, 'other error')
-
class TestStrBoolTrafaret(unittest.TestCase):
def test_str_bool(self):
@@ -522,10 +522,6 @@ def test_string(self):
self.assertEqual(res, '')
res = extract_error(t.String(), 1)
self.assertEqual(res, 'value is not a string')
- res = t.String(regex='\w+').check('wqerwqer')
- self.assertEqual(res, 'wqerwqer')
- res = extract_error(t.String(regex='^\w+$'), 'wqe rwqer')
- self.assertEqual(res, "value does not match pattern: '^\\\\w+$'")
res = t.String(min_length=2, max_length=3).check('123')
self.assertEqual(res, '123')
res = extract_error(t.String(min_length=2, max_length=6), '1')
@@ -611,22 +607,22 @@ class Type(type):
class TestURLTrafaret(unittest.TestCase):
def test_url(self):
- res = t.URL().check('http://example.net/resource/?param=value#anchor')
+ res = t.URL.check('http://example.net/resource/?param=value#anchor')
self.assertEqual(res, 'http://example.net/resource/?param=value#anchor')
- res = str(t.URL().check('http://пример.рф/resource/?param=value#anchor'))
+ res = str(t.URL.check('http://пример.рф/resource/?param=value#anchor'))
self.assertEqual(res, 'http://xn--e1afmkfd.xn--p1ai/resource/?param=value#anchor')
- res = t.URL().check('http://example_underscore.net/resource/?param=value#anchor')
+ res = t.URL.check('http://example_underscore.net/resource/?param=value#anchor')
self.assertEqual(res, 'http://example_underscore.net/resource/?param=value#anchor')
- res = str(t.URL().check('http://user@example.net/resource/?param=value#anchor'))
+ res = str(t.URL.check('http://user@example.net/resource/?param=value#anchor'))
self.assertEqual(res, 'http://user@example.net/resource/?param=value#anchor')
- res = str(t.URL().check('http://user:@example.net/resource/?param=value#anchor'))
+ res = str(t.URL.check('http://user:@example.net/resource/?param=value#anchor'))
self.assertEqual(res, 'http://user:@example.net/resource/?param=value#anchor')
- res = str(t.URL().check('http://user:password@example.net/resource/?param=value#anchor'))
+ res = str(t.URL.check('http://user:password@example.net/resource/?param=value#anchor'))
self.assertEqual(res, 'http://user:password@example.net/resource/?param=value#anchor')
diff --git a/tests/test_context.py b/tests/test_context.py
new file mode 100644
index 0000000..34b274d
--- /dev/null
+++ b/tests/test_context.py
@@ -0,0 +1,39 @@
+import unittest
+import trafaret as t
+
+
+def check_context(value, context=None):
+ if value != context:
+ return t.DataError('have not context there')
+ return value
+
+
+CONTEXT_TRAFARET = (t.String() | t.IntRaw()) & t.Any & check_context
+
+
+class TestContext(unittest.TestCase):
+ def test_context(self):
+ self.assertEqual(CONTEXT_TRAFARET(123, context=123), 123)
+
+ def test_dict_context(self):
+ trafaret = t.Dict({
+ t.Key('b'): CONTEXT_TRAFARET,
+ })
+ self.assertEqual(trafaret({'b': 123}, context=123), {'b': 123})
+
+ def test_list(self):
+ trafaret = t.List(CONTEXT_TRAFARET)
+ self.assertEqual(trafaret([123], context=123), [123])
+
+ def test_tuple(self):
+ trafaret = t.Tuple(CONTEXT_TRAFARET)
+ self.assertEqual(trafaret([123], context=123), (123,))
+
+ def test_mapping(self):
+ trafaret = t.Mapping(CONTEXT_TRAFARET, CONTEXT_TRAFARET)
+ self.assertEqual(trafaret({123: 123}, context=123), {123: 123})
+
+ def test_forward(self):
+ trafaret = t.Forward()
+ trafaret << t.List(CONTEXT_TRAFARET)
+ self.assertEqual(trafaret([123], context=123), [123])
diff --git a/tests/test_contrib.py b/tests/test_contrib.py
new file mode 100644
index 0000000..2f8c8bf
--- /dev/null
+++ b/tests/test_contrib.py
@@ -0,0 +1,11 @@
+import unittest
+import datetime
+
+import trafaret as t
+from trafaret.contrib.rfc_3339 import DateTime
+
+
+class TestDateTime(unittest.TestCase):
+ def test_datetime(self):
+ check = DateTime()
+ assert check('2017-09-01 23:59') == datetime.datetime(2017, 9, 1, 23, 59)
diff --git a/tests3k/test_async.py b/tests3k/test_async.py
new file mode 100644
index 0000000..3b1ed1f
--- /dev/null
+++ b/tests3k/test_async.py
@@ -0,0 +1,55 @@
+import asyncio
+import unittest
+import trafaret as t
+from trafaret.lib import py3
+
+
+async def check_int(value):
+ return value
+
+
+class TestContext(unittest.TestCase):
+ def setUp(self):
+ self.loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(None)
+
+ def tearDown(self):
+ self.loop.close()
+
+ def test_async_check(self):
+ trafaret = t.Int & int
+ res = self.loop.run_until_complete(trafaret.async_check('5'))
+ self.assertEqual(res, 5)
+
+ def test_async_call(self):
+ trafaret = t.Int & int & check_int
+ res = self.loop.run_until_complete(trafaret.async_check('5'))
+ self.assertEqual(res, 5)
+
+ def test_dict(self):
+ trafaret = t.Dict({
+ t.Key('b'): t.Int & check_int,
+ })
+ res = self.loop.run_until_complete(trafaret.async_check({'b': '5'}))
+ self.assertEqual(res, {'b': 5})
+
+ def test_list(self):
+ trafaret = t.List(t.Int & check_int)
+ res = self.loop.run_until_complete(trafaret.async_check(['5']))
+ self.assertEqual(res, [5])
+
+ def test_tuple(self):
+ trafaret = t.Tuple(t.Null, t.Int & check_int)
+ res = self.loop.run_until_complete(trafaret.async_check([None, '5']))
+ self.assertEqual(res, (None, 5))
+
+ def test_mapping(self):
+ trafaret = t.Mapping(t.String, t.Int & check_int)
+ res = self.loop.run_until_complete(trafaret.async_check({'a': '5'}))
+ self.assertEqual(res, {'a': 5})
+
+ def test_forward(self):
+ trafaret = t.Forward()
+ trafaret << t.List(t.Int & check_int)
+ res = self.loop.run_until_complete(trafaret.async_check(['5']))
+ self.assertEqual(res, [5])
diff --git a/tox.ini b/tox.ini
index c2b8b36..6bdc152 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,11 +1,24 @@
[tox]
-envlist = py27,py36
+envlist = py27,py35,py36
whitelist_externals = {toxinidir}/utests.py
[testenv]
deps=
unittest2
+ flake8
+ pylint
pymongo
-commands=python -m unittest discover {toxinidir}/tests
+ python-dateutil
+commands=
+ python -m unittest discover {toxinidir}/tests
+[testenv:py36]
+commands=
+ python -m unittest discover {toxinidir}/tests
+ python -m unittest discover {toxinidir}/tests3k
+ flake8 trafaret
+
+[flake8]
+exclude = .tox,*.egg,build
+max-line-length = 120
diff --git a/trafaret/__init__.py b/trafaret/__init__.py
index d531962..6db0b15 100644
--- a/trafaret/__init__.py
+++ b/trafaret/__init__.py
@@ -1,1620 +1,103 @@
-# -*- coding: utf-8 -*-
-
-import sys
-import functools
-import inspect
-import re
-import itertools
-import numbers
-import warnings
-from collections import Mapping as AbcMapping
-import types
-from .lib import py3, py3metafix
-
-
-__VERSION__ = (0, 10, 4)
-
-
-# Python3 support
-if py3:
- import urllib.parse as urlparse
- str_types = (str, bytes)
- unicode = str
-else:
- try:
- from future_builtins import map
- except ImportError:
- # Support for GAE runner
- from itertools import imap as map
- import urlparse
- str_types = (basestring,)
-
-def _dd(value):
- if not hasattr(value, 'items'):
- return repr(value)
- return r"{%s}" % ', '.join("%r: %s" % (x[0], _dd(x[1])) for x in sorted(value.items(), key=lambda x: x[0]))
-
-
-def deprecated(message):
- warnings.warn(message, DeprecationWarning)
-
-
-"""
-Trafaret is tiny library for data validation
-It provides several primitives to validate complex data structures
-Look at doctests for usage examples
-"""
+from .base import (
+ DataError,
+ Trafaret,
+ Call,
+ Or,
+ And,
+ Forward,
+
+ Any,
+ Null,
+ List,
+ Key,
+ Dict,
+ Enum,
+ Tuple,
+
+ Atom,
+ String,
+ Float,
+ FloatRaw,
+ Int,
+ IntRaw,
+ Callable,
+ Bool,
+ Type,
+ Subclass,
+ Mapping,
+ StrBool,
+ DictKeys,
+
+ guard,
+
+ # utils
+ OnError,
+ ensure_trafaret,
+ extract_error,
+ ignore,
+ _dd,
+ catch,
+ catch_error,
+ str_types,
+)
+from .regexp import Regexp, RegexpRaw
+from .internet import (
+ URL,
+ Email,
+ IPv4,
+ IPv6,
+ IP,
+)
__all__ = (
- "DataError", "Trafaret", "Any", "Int", "String",
- "List", "Dict", "Or", "And", "Null", "Float", "Enum", "Callable",
- "Call", "Forward", "Bool", "Type", "Subclass", "Mapping", "guard", "Key",
- "Tuple", "Atom", "Email", "URL",
+ "DataError",
+ "Trafaret",
+ "Call",
+ "Or",
+ "And",
+ "Forward",
+
+ "Any",
+ "Null",
+ "List",
+ "Key",
+ "Dict",
+ "Enum",
+ "Tuple",
+
+ "Atom",
+ "String",
+ "Float",
+ "FloatRaw",
+ "Int",
+ "IntRaw",
+ "Callable",
+ "Bool",
+ "Type",
+ "Subclass",
+ "Mapping",
+ "StrBool",
+ "DictKeys",
+
+ "guard",
+
+ "Regexp",
+ "RegexpRaw",
+ "URL",
+ "Email",
+ "IPv4",
+ "IPv6",
+ "IP",
+
+ "OnError",
+ "ensure_trafaret",
+ "extract_error",
+ "ignore",
+ "_dd",
+ "catch",
+ "catch_error",
+ "str_types",
)
-_empty = object()
-MAX_EMAIL_LEN = 254
-
-
-class DataError(ValueError):
- """
- Error with data preserve
- error can be a message or None if error raised in childs
- data can be anything
- """
- __slots__ = ['error', 'name', 'value', 'trafaret']
-
- def __init__(self, error=None, name=None, value=_empty, trafaret=None):
- self.error = error
- self.name = name
- self.value = value
- self.trafaret = trafaret
-
- def __str__(self):
- return str(self.error)
-
- def __repr__(self):
- return 'DataError(%s)' % str(self)
-
- def as_dict(self, value=False):
- def as_dict(dataerror):
- if not isinstance(dataerror.error, dict):
- if value and dataerror.value != _empty:
- return '%s, got %r' % (str(dataerror.error), dataerror.value)
- else:
- return str(dataerror.error)
- return dict((k, v.as_dict(value=value) if isinstance(v, DataError) else v)
- for k, v in dataerror.error.items())
- return as_dict(self)
-
-
-class TrafaretMeta(type):
- """
- Metaclass for trafarets to make using "|" operator possible not only
- on instances but on classes
-
- >>> Int | String
- , )>
- >>> Int | String | Null
- , , )>
- >>> (Int >> (lambda v: v if v ** 2 > 15 else 0)).check(5)
- 5
- """
-
- def __or__(cls, other):
- return cls() | other
-
- def __and__(cls, other):
- return cls() & other
-
- def __rshift__(cls, other):
- return cls() >> other
-
-
-@py3metafix
-class Trafaret(object):
- """
- Base class for trafarets, provides only one method for
- trafaret validation failure reporting
- """
-
- __metaclass__ = TrafaretMeta
-
- def check(self, value, convert=True):
- """
- Common logic. In subclasses you need to implement check_value or
- check_and_return.
- """
- if hasattr(self, 'check_value'):
- self.check_value(value)
- return self.converter(value) if convert else value
- if hasattr(self, 'check_and_return'):
- res_value = self.check_and_return(value)
- return self.converter(res_value) if convert else res_value
- cls = "%s.%s" % (type(self).__module__, type(self).__name__)
- raise NotImplementedError("You must implement check_value or"
- " check_and_return methods '%s'" % cls)
-
- def converter(self, value):
- """
- You can change converter with `>>` operator or append method
- """
- return value
-
- def _failure(self, error=None, value=_empty):
- """
- Shortcut method for raising validation error
- """
- raise DataError(error=error, value=value, trafaret=self)
-
- @staticmethod
- def _trafaret(trafaret):
- """
- Helper for complex trafarets, takes trafaret instance or class
- and returns trafaret instance
- """
- return ensure_trafaret(trafaret)
-
- def append(self, converter):
- """
- Appends new converter to list.
- """
- return And(self, converter, disable_old_check_convert=True)
-
- def __or__(self, other):
- return Or(self, other)
-
- def __and__(self, other):
- return And(self, other)
-
- def __rshift__(self, other):
- return And(self, other, disable_old_check_convert=True)
-
- def __call__(self, val):
- return self.check(val)
-
-
-def ensure_trafaret(trafaret):
- """
- Helper for complex trafarets, takes trafaret instance or class
- and returns trafaret instance
- """
- if isinstance(trafaret, Trafaret) or inspect.isroutine(trafaret):
- return trafaret
- elif isinstance(trafaret, type):
- if issubclass(trafaret, Trafaret):
- return trafaret()
- else:
- return Type(trafaret)
- else:
- raise RuntimeError("%r should be instance or subclass"
- " of Trafaret" % trafaret)
-
-class TypeMeta(TrafaretMeta):
-
- def __getitem__(self, type_):
- return self(type_)
-
-
-@py3metafix
-class TypingTrafaret(Trafaret):
- """A trafaret used for instance type and class inheritance checks."""
-
- __metaclass__ = TypeMeta
-
- def __init__(self, type_):
- self.type_ = type_
-
- def check_value(self, value):
- if not self.typing_checker(value, self.type_):
- self._failure(self.failure_message % self.type_.__name__, value=value)
-
- def __repr__(self):
- return "<%s(%s)>" % (self.__class__.__name__, self.type_.__name__)
-
-
-class Subclass(TypingTrafaret):
- """
- >>> Subclass(type)
-
- >>> Subclass[type]
-
- >>> s = Subclass[type]
- >>> s.check(type)
-
- >>> extract_error(s, object)
- 'value is not subclass of type'
- """
-
- typing_checker = issubclass
- failure_message = "value is not subclass of %s"
-
-
-class Type(TypingTrafaret):
- """
- >>> Type(int)
-
- >>> Type[int]
-
- >>> c = Type[int]
- >>> c.check(1)
- 1
- >>> extract_error(c, "foo")
- 'value is not int'
- """
-
- typing_checker = isinstance
- failure_message = "value is not %s"
-
-
-class Any(Trafaret):
- """
- >>> Any()
-
- >>> (Any() >> ignore).check(object())
- """
-
- def check_value(self, value):
- pass
-
- def __repr__(self):
- return ""
-
-
-class OrMeta(TrafaretMeta):
- """
- Allows to use "<<" operator on Or class
-
- >>> Or << Int << String
- , )>
- """
-
- def __lshift__(cls, other):
- return cls() << other
-
-
-@py3metafix
-class Or(Trafaret):
- """
- >>> nullString = Or(String, Null)
- >>> nullString
- , )>
- >>> nullString.check(None)
- >>> nullString.check("test")
- 'test'
- >>> extract_error(nullString, 1)
- {0: 'value is not a string', 1: 'value should be None'}
- """
-
- __metaclass__ = OrMeta
- __slots__ = ['trafarets']
-
- def __init__(self, *trafarets):
- self.trafarets = list(map(ensure_trafaret, trafarets))
-
- def check_and_return(self, value):
- errors = []
- for trafaret in self.trafarets:
- try:
- return trafaret.check(value)
- except DataError as e:
- errors.append(e)
- raise DataError(dict(enumerate(errors)), trafaret=self)
-
- def __lshift__(self, trafaret):
- self.trafarets.append(ensure_trafaret(trafaret))
- return self
-
- def __or__(self, trafaret):
- self << trafaret
- return self
-
- def __repr__(self):
- return "" % (", ".join(map(repr, self.trafarets)))
-
-
-class And(Trafaret):
- """
- Will work over trafarets sequentially
- """
- __slots__ = ('trafaret', 'other', 'disable_old_check_convert')
-
- def __init__(self, trafaret, other, disable_old_check_convert=False):
- self.trafaret = trafaret
- self.other = other
- self.disable_old_check_convert = disable_old_check_convert
-
- def check_and_return(self, value):
- if isinstance(self.trafaret, Trafaret) and self.disable_old_check_convert:
- res = self.trafaret.check(value, convert=False)
- else:
- res = self.trafaret(value)
- if isinstance(res, DataError):
- raise DataError
- res = self.other(res)
- if isinstance(res, DataError):
- raise res
- return res
-
- # support old code for some deprecation period
- def allow_extra(self, *names, **kw):
- deprecated('Call allow_extra after >> or & operations is deprecated')
- self.trafaret = self.trafaret.allow_extra(*names, **kw)
- return self
-
- def ignore_extra(self, *names):
- deprecated('Call ignore_extra after >> or & operations is deprecated')
- self.trafaret = self.trafaret.ignore_extra(*names)
- return self
-
- def __repr__(self):
- return repr(self.trafaret)
-
-
-
-class Null(Trafaret):
- """
- >>> Null()
-
- >>> Null().check(None)
- >>> extract_error(Null(), 1)
- 'value should be None'
- """
-
- def check_value(self, value):
- if value is not None:
- self._failure("value should be None", value=value)
-
- def __repr__(self):
- return ""
-
-
-class Bool(Trafaret):
- """
- >>> Bool()
-
- >>> Bool().check(True)
- True
- >>> Bool().check(False)
- False
- >>> extract_error(Bool(), 1)
- 'value should be True or False'
- """
-
- def check_value(self, value):
- if not isinstance(value, bool):
- self._failure("value should be True or False", value=value)
-
- def __repr__(self):
- return ""
-
-
-class StrBool(Trafaret):
- """
- >>> extract_error(StrBool(), 'aloha')
- "value can't be converted to Bool"
- >>> StrBool().check(1)
- True
- >>> StrBool().check(0)
- False
- >>> StrBool().check('y')
- True
- >>> StrBool().check('n')
- False
- >>> StrBool().check(None)
- False
- >>> StrBool().check('1')
- True
- >>> StrBool().check('0')
- False
- >>> StrBool().check('YeS')
- True
- >>> StrBool().check('No')
- False
- >>> StrBool().check(True)
- True
- >>> StrBool().check(False)
- False
- """
-
- convertable = ('t', 'true', 'false', 'y', 'n', 'yes', 'no', 'on',
- '1', '0', 'none')
-
- def check_value(self, value):
- _value = str(value).strip().lower()
- if _value not in self.convertable:
- self._failure("value can't be converted to Bool", value=value)
-
- def converter(self, value):
- if value is None:
- return False
- _str = str(value).strip().lower()
-
- return _str in ('t', 'true', 'y', 'yes', 'on', '1')
-
- def __repr__(self):
- return ""
-
-
-class NumberMeta(TrafaretMeta):
- """
- Allows slicing syntax for min and max arguments for
- number trafarets
-
- >>> Int[1:]
-
- >>> Int[1:10]
-
- >>> Int[:10]
-
- >>> Float[1:]
-
- >>> Int > 3
-
- >>> 1 < (Float < 10)
-
- >>> (Int > 5).check(10)
- 10
- >>> extract_error(Int > 5, 1)
- 'value should be greater than 5'
- >>> (Int < 3).check(1)
- 1
- >>> extract_error(Int < 3, 3)
- 'value should be less than 3'
- """
-
- def __getitem__(cls, slice_):
- return cls(gte=slice_.start, lte=slice_.stop)
-
- def __lt__(cls, lt):
- return cls(lt=lt)
-
- def __gt__(cls, gt):
- return cls(gt=gt)
-
-
-@py3metafix
-class FloatRaw(Trafaret):
- """
- Tests that value is a float or a string that is convertable to float.
-
- >>> Float()
-
- >>> Float(gte=1)
-
- >>> Float(lte=10)
-
- >>> Float(gte=1, lte=10)
-
- >>> Float().check(1.0)
- 1.0
- >>> extract_error(Float(), 1 + 3j)
- 'value is not float'
- >>> extract_error(Float(), 1)
- 1.0
- >>> Float(gte=2).check(3.0)
- 3.0
- >>> extract_error(Float(gte=2), 1.0)
- 'value is less than 2'
- >>> Float(lte=10).check(5.0)
- 5.0
- >>> extract_error(Float(lte=3), 5.0)
- 'value is greater than 3'
- >>> Float().check("5.0")
- 5.0
- """
-
- __metaclass__ = NumberMeta
-
- convertable = str_types + (numbers.Real,)
- value_type = float
-
- def __init__(self, gte=None, lte=None, gt=None, lt=None):
- self.gte = gte
- self.lte = lte
- self.gt = gt
- self.lt = lt
-
- def _converter(self, value):
- if not isinstance(value, self.convertable):
- self._failure('value is not %s' % self.value_type.__name__, value=value)
- try:
- return self.value_type(value)
- except ValueError:
- self._failure(
- "value can't be converted to %s" % self.value_type.__name__,
- value=value
- )
-
- def _check(self, data):
- if not isinstance(data, self.value_type):
- value = self._converter(data)
- else:
- value = data
- if self.gte is not None and value < self.gte:
- self._failure("value is less than %s" % self.gte, value=data)
- if self.lte is not None and value > self.lte:
- self._failure("value is greater than %s" % self.lte, value=data)
- if self.lt is not None and value >= self.lt:
- self._failure("value should be less than %s" % self.lt, value=data)
- if self.gt is not None and value <= self.gt:
- self._failure("value should be greater than %s" % self.gt, value=data)
- return value
-
- def check_and_return(self, data):
- self._check(data)
- return data
-
- def __lt__(self, lt):
- return type(self)(gte=self.gte, lte=self.lte, gt=self.gt, lt=lt)
-
- def __gt__(self, gt):
- return type(self)(gte=self.gte, lte=self.lte, gt=gt, lt=self.lt)
-
- def __repr__(self):
- r = "<%s" % type(self).__name__
- options = []
- for param in ("gte", "lte", "gt", "lt"):
- if getattr(self, param) is not None:
- options.append("%s=%s" % (param, getattr(self, param)))
- if options:
- r += "(%s)" % (", ".join(options))
- r += ">"
- return r
-
-
-class Float(FloatRaw):
- """Checks that value is a float.
- Or if value is a string converts this string to float
- """
- def check_and_return(self, data):
- return self._check(data)
-
-
-class IntRaw(FloatRaw):
- """
- >>> Int()
-
- >>> Int().check(5)
- 5
- >>> extract_error(Int(), 1.1)
- 'value is not int'
- >>> extract_error(Int(), 1 + 1j)
- 'value is not int'
- """
-
- value_type = int
-
- def _converter(self, value):
- if isinstance(value, float):
- if not value.is_integer():
- self._failure('value is not int', value=value)
- return super(IntRaw, self)._converter(value)
-
-
-class Int(IntRaw):
- def check_and_return(self, data):
- return self._check(data)
-
-
-class Atom(Trafaret):
- """
- >>> Atom('atom').check('atom')
- 'atom'
- >>> extract_error(Atom('atom'), 'molecule')
- "value is not exactly 'atom'"
- """
- __slots__ = ['value']
-
- def __init__(self, value):
- self.value = value
-
- def check_value(self, value):
- if self.value != value:
- self._failure("value is not exactly '%s'" % self.value, value=value)
-
-
-class RegexpRaw(Trafaret):
- """
- Check if given string match given regexp
- """
- __slots__ = ('regexp', 'raw_regexp')
-
- def __init__(self, regexp):
- self.regexp = re.compile(regexp) if isinstance(regexp, str_types) else regexp
- self.raw_regexp = self.regexp.pattern if self.regexp else None
-
- def check_and_return(self, value):
- if not isinstance(value, str_types):
- self._failure("value is not a string", value=value)
- match = self.regexp.match(value)
- if not match:
- self._failure('does not match pattern %s' % self.raw_regexp, value=value)
- return match
-
- def __repr__(self):
- return ''
-
-
-class Regexp(RegexpRaw):
- def check_and_return(self, value):
- return super(Regexp, self).check_and_return(value).group()
-
-
-class String(Trafaret):
- """
- >>> String()
-
- >>> String(allow_blank=True)
-
- >>> String().check("foo")
- 'foo'
- >>> extract_error(String(), "")
- 'blank value is not allowed'
- >>> String(allow_blank=True).check("")
- ''
- >>> extract_error(String(), 1)
- 'value is not a string'
- >>> String(regex='\w+').check('wqerwqer')
- 'wqerwqer'
- >>> String(allow_blank=True, regex='\w+').check('')
- ''
- >>> extract_error(String(regex='^\w+$'), 'wqe rwqer')
- "value does not match pattern: '^\\\\\\\\w+$'"
- >>> String(min_length=2, max_length=3).check('123')
- '123'
- >>> extract_error(String(min_length=2, max_length=6), '1')
- 'String is shorter than 2 characters'
- >>> extract_error(String(min_length=2, max_length=6), '1234567')
- 'String is longer than 6 characters'
- >>> String(min_length=2, max_length=6, allow_blank=True)
- Traceback (most recent call last):
- ...
- AssertionError: Either allow_blank or min_length should be specified, not both
- >>> String(min_length=0, max_length=6, allow_blank=True).check('123')
- '123'
- """
-
- def __init__(self, allow_blank=False, regex=None, min_length=None, max_length=None):
- assert not (allow_blank and min_length), \
- "Either allow_blank or min_length should be specified, not both"
- self.allow_blank = allow_blank
- self.regex = re.compile(regex) if isinstance(regex, str_types) else regex
- if self.regex is not None:
- deprecated('Deprecated, use Regexp or RegexpRaw instead')
- self.min_length = min_length
- self.max_length = max_length
- self._raw_regex = self.regex.pattern if self.regex else None
-
- def check_and_return(self, value):
- if not isinstance(value, str_types):
- self._failure("value is not a string", value=value)
- if not self.allow_blank and len(value) == 0:
- self._failure("blank value is not allowed", value=value)
- elif self.allow_blank and len(value) == 0:
- return value
- if self.min_length is not None and len(value) < self.min_length:
- self._failure('String is shorter than %s characters' % self.min_length, value=value)
- if self.max_length is not None and len(value) > self.max_length:
- self._failure('String is longer than %s characters' % self.max_length, value=value)
- if self.regex is not None:
- match = self.regex.match(value)
- if not match:
- self._failure("value does not match pattern: %s" % repr(self._raw_regex), value=value)
- return match
- return value
-
- def converter(self, value):
- if isinstance(value, str_types):
- return value
- return value.group()
-
- def __repr__(self):
- return "" if self.allow_blank else ""
-
-
-class Email(String):
- """
- >>> Email().check('someone@example.net')
- 'someone@example.net'
- >>> extract_error(Email(),'someone@example') # try without domain-part
- 'value is not a valid email address'
- >>> str(Email().check('someone@пример.рф')) # try with `idna` encoding
- 'someone@xn--e1afmkfd.xn--p1ai'
- >>> (Email() >> (lambda m: m.groupdict()['domain'])).check('someone@example.net')
- 'example.net'
- >>> extract_error(Email(), 'foo')
- 'value is not a valid email address'
- >>> extract_error(Email(), 'f' * 10000 + '@correct.domain.edu')
- 'value is not a valid email address'
- >>> extract_error(Email(), 'f' * 248 + '@x.edu') == 'f' * 248 + '@x.edu'
- True
- """
-
- regex = re.compile(
- r"(?P^[-!#$%&'*+/=?^_`{}|~0-9A-Z]+(\.[-!#$%&'*+/=?^_`{}|~0-9A-Z]+)*" # dot-atom
- r'|^"([\001-\010\013\014\016-\037!#-\[\]-\177]|\\[\001-011\013\014\016-\177])*"' # quoted-string
- r')@(?P(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)$)' # domain
- r'|\[(25[0-5]|2[0-4]\d|[0-1]?\d?\d)(\.(25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}\]$', re.IGNORECASE) # literal form, ipv4 address (SMTP 4.1.3)
-
- def __init__(self, allow_blank=False):
- super(Email, self).__init__(allow_blank=allow_blank,
- regex=self.regex,
- max_length=MAX_EMAIL_LEN)
-
- def check_and_return(self, value):
- try:
- return super(Email, self).check_and_return(value)
- except DataError:
- if value and isinstance(value, str_types):
- if isinstance(value, bytes):
- decoded = value.decode('utf-8')
- else:
- decoded = value
- else:
- raise
- # Trivial case failed. Try for possible IDN domain-part
- if decoded and '@' in decoded:
- parts = decoded.split('@')
- try:
- parts[-1] = parts[-1].encode('idna').decode('ascii')
- except UnicodeError:
- pass
- else:
- try:
- return super(Email, self).check_and_return('@'.join(parts))
- except DataError:
- # Will fail with main error
- pass
- self._failure('value is not a valid email address', value=value)
-
- def __repr__(self):
- return ''
-
-
-class URL(String):
- """
- >>> URL().check('http://example.net/resource/?param=value#anchor')
- 'http://example.net/resource/?param=value#anchor'
- >>> str(URL().check('http://пример.рф/resource/?param=value#anchor'))
- 'http://xn--e1afmkfd.xn--p1ai/resource/?param=value#anchor'
- """
-
- regex = re.compile(
- r'^(?:http|ftp)s?://' # http:// or https://
- r'(?:\S+(?::\S*)?@)?' # user and password
- r'(?:(?:[A-Z0-9](?:[A-Z0-9-_]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
- r'localhost|' # localhost...
- r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
- r'(?::\d+)?' # optional port
- r'(?:/?|[/?]\S+)$', re.IGNORECASE)
- min_length = None
- max_length = None
-
- def __init__(self, allow_blank=False):
- super(URL, self).__init__(allow_blank=allow_blank, regex=self.regex)
-
- def check_and_return(self, value):
- try:
- return super(URL, self).check_and_return(value)
- except DataError:
- # Trivial case failed. Try for possible IDN domain-part
- if value:
- if isinstance(value, bytes):
- decoded = value.decode('utf-8')
- else:
- decoded = value
- scheme, netloc, path, query, fragment = urlparse.urlsplit(decoded)
- try:
- netloc = netloc.encode('idna').decode('ascii') # IDN -> ACE
- except UnicodeError: # invalid domain part
- pass
- else:
- url = urlparse.urlunsplit((scheme, netloc, path, query, fragment))
- try:
- return super(URL, self).check_and_return(url)
- except DataError:
- # Will fail with main error
- pass
- self._failure('value is not URL', value=value)
-
- def __repr__(self):
- return ''
-
-
-class IPv4(Regexp):
- """
- >>> IPv4().check('127.0.0.1')
- '127.0.0.1'
- """
-
- regex = re.compile(
- r'^((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])$', # noqa
- )
-
- def __init__(self):
- super(IPv4, self).__init__(self.regex)
-
- def check_and_return(self, value):
- try:
- return super(IPv4, self).check_and_return(value)
- except DataError:
- self._failure('value is not IPv4 address')
-
- def __repr__(self):
- return ''
-
-
-class IPv6(Regexp):
- """
- >>> IPv6().check('2001:0db8:0000:0042:0000:8a2e:0370:7334')
- '2001:0db8:0000:0042:0000:8a2e:0370:7334'
- """
-
- regex = re.compile(
- r'^('
- r'(::)|'
- r'(::[0-9a-f]{1,4})|'
- r'([0-9a-f]{1,4}:){7,7}[0-9a-f]{1,4}|'
- r'([0-9a-f]{1,4}:){1,7}:|'
- r'([0-9a-f]{1,4}:){1,6}:[0-9a-f]{1,4}|'
- r'([0-9a-f]{1,4}:){1,5}(:[0-9a-f]{1,4}){1,2}|'
- r'([0-9a-f]{1,4}:){1,4}(:[0-9a-f]{1,4}){1,3}|'
- r'([0-9a-f]{1,4}:){1,3}(:[0-9a-f]{1,4}){1,4}|'
- r'([0-9a-f]{1,4}:){1,2}(:[0-9a-f]{1,4}){1,5}|'
- r'[0-9a-f]{1,4}:((:[0-9a-f]{1,4}){1,6})|'
- r':((:[0-9a-f]{1,4}){1,7}:)|'
- r'fe80:(:[0-9a-f]{0,4}){0,4}%[0-9a-z]{1,}|'
- r'::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|' # noqa
- r'([0-9a-f]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])' # noqa
- r')$',
- re.IGNORECASE,
- )
-
- def __init__(self):
- super(IPv6, self).__init__(self.regex)
-
- def check_and_return(self, value):
- try:
- return super(IPv6, self).check_and_return(value)
- except DataError:
- self._failure('value is not IPv6 address')
-
- def __repr__(self):
- return ''
-
-
-class SquareBracketsMeta(TrafaretMeta):
- """
- Allows usage of square brackets for List initialization
-
- >>> List[Int]
- )>
- >>> List[Int, 1:]
- )>
- >>> List[:10, Int]
- )>
- >>> List[1:10]
- Traceback (most recent call last):
- ...
- RuntimeError: Trafaret is required for List initialization
- """
-
- def __getitem__(self, args):
- slice_ = None
- trafaret = None
- if not isinstance(args, tuple):
- args = (args, )
- for arg in args:
- if isinstance(arg, slice):
- slice_ = arg
- elif isinstance(arg, Trafaret) or issubclass(arg, Trafaret) \
- or isinstance(arg, type):
- trafaret = arg
- if not trafaret:
- raise RuntimeError("Trafaret is required for List initialization")
- if slice_:
- return self(trafaret, min_length=slice_.start or 0,
- max_length=slice_.stop)
- return self(trafaret)
-
-
-@py3metafix
-class List(Trafaret):
- """
- >>> List(Int)
- )>
- >>> List(Int, min_length=1)
- )>
- >>> List(Int, min_length=1, max_length=10)
- )>
- >>> extract_error(List(Int), 1)
- 'value is not a list'
- >>> List(Int).check([1, 2, 3])
- [1, 2, 3]
- >>> List(String).check(["foo", "bar", "spam"])
- ['foo', 'bar', 'spam']
- >>> extract_error(List(Int), [1, 2, 1 + 3j])
- {2: 'value is not int'}
- >>> List(Int, min_length=1).check([1, 2, 3])
- [1, 2, 3]
- >>> extract_error(List(Int, min_length=1), [])
- 'list length is less than 1'
- >>> List(Int, max_length=2).check([1, 2])
- [1, 2]
- >>> extract_error(List(Int, max_length=2), [1, 2, 3])
- 'list length is greater than 2'
- >>> extract_error(List(Int), ["a"])
- {0: "value can't be converted to int"}
- """
-
- __metaclass__ = SquareBracketsMeta
- __slots__ = ['trafaret', 'min_length', 'max_length']
-
- def __init__(self, trafaret, min_length=0, max_length=None):
- self.trafaret = ensure_trafaret(trafaret)
- self.min_length = min_length
- self.max_length = max_length
-
- def check_and_return(self, value):
- if not isinstance(value, list):
- self._failure("value is not a list", value=value)
- if len(value) < self.min_length:
- self._failure("list length is less than %s" % self.min_length, value=value)
- if self.max_length is not None and len(value) > self.max_length:
- self._failure("list length is greater than %s" % self.max_length, value=value)
- lst = []
- errors = {}
- for index, item in enumerate(value):
- try:
- lst.append(self.trafaret.check(item))
- except DataError as err:
- errors[index] = err
- if errors:
- raise DataError(error=errors, trafaret=self)
- return lst
-
- def __repr__(self):
- r = ""
- return r
-
-
-class Tuple(Trafaret):
- """
- Tuple checker can be used to check fixed tuples, like (Int, Int, String).
-
- >>> t = Tuple(Int, Int, String)
- >>> t.check([3, 4, '5'])
- (3, 4, '5')
- >>> extract_error(t, [3, 4, 5])
- {2: 'value is not a string'}
- >>> t
- , , )>
- """
- __slots__ = ['trafarets', 'length']
-
- def __init__(self, *args):
- self.trafarets = list(map(ensure_trafaret, args))
- self.length = len(self.trafarets)
-
- def check_and_return(self, value):
- try:
- value = tuple(value)
- except TypeError:
- self._failure('value must be convertable to tuple', value=value)
- if len(value) != self.length:
- self._failure('value must contain %s items' % self.length, value=value)
- result = []
- errors = {}
- for idx, (item, trafaret) in enumerate(zip(value, self.trafarets)):
- try:
- result.append(trafaret.check(item))
- except DataError as err:
- errors[idx] = err
- if errors:
- self._failure(errors, value=value)
- return tuple(result)
-
- def __repr__(self):
- return ''
-
-
-class Key(object):
- """
- Helper class for Dict.
-
- It gets ``name``, and provides method ``extract(data)`` that extract key value
- from data through mapping ``get`` method.
- Key `__call__` method yields ``(key name, Maybe(DataError), [touched keys])`` triples.
-
- You can redefine ``get_data(data, default)`` method in subclassed ``Key`` if you want to use something other
- then ``.get(...)`` method.
-
- Like this for the aiohttp MultiDict::
-
- class MDKey(t.Key):
- def get_data(data, default):
- return data.get_all(self.name, default)
- """
- __slots__ = ['name', 'to_name', 'default', 'optional', 'trafaret']
-
- def __init__(self, name, default=_empty, optional=False, to_name=None, trafaret=None):
- self.name = name
- self.to_name = to_name
- self.default = default
- self.optional = optional
- self.trafaret = trafaret or Any()
-
- def __call__(self, data):
- if self.name in data or self.default is not _empty:
- if callable(self.default):
- default = self.default()
- else:
- default = self.default
- yield (
- self.get_name(),
- catch_error(self.trafaret, self.get_data(data, default)),
- (self.name,)
- )
- return
-
- if not self.optional:
- yield self.name, DataError(error='is required'), (self.name,)
-
- def get_data(self, data, default):
- return data.get(self.name, default)
-
- def keys_names(self):
- yield self.name
-
- def set_trafaret(self, trafaret):
- self.trafaret = ensure_trafaret(trafaret)
- return self
-
- def __rshift__(self, name):
- self.to_name = name
- return self
-
- def get_name(self):
- return self.to_name or self.name
-
- def make_optional(self):
- self.optional = True
-
- def __repr__(self):
- return '<%s "%s"%s>' % (self.__class__.__name__, self.name,
- ' to "%s"' % self.to_name if getattr(self, 'to_name', False) else '')
-
-
-class Dict(Trafaret):
- """
- >>> trafaret = Dict(foo=Int, bar=String) >> ignore
- >>> trafaret.check({"foo": 1, "bar": "spam"})
- >>> extract_error(trafaret, {"foo": 1, "bar": 2})
- {'bar': 'value is not a string'}
- >>> extract_error(trafaret, {"foo": 1})
- {'bar': 'is required'}
- >>> extract_error(trafaret, {"foo": 1, "bar": "spam", "eggs": None})
- {'eggs': 'eggs is not allowed key'}
- >>> trafaret.allow_extra("eggs")
- , foo=)>
- >>> trafaret.check({"foo": 1, "bar": "spam", "eggs": None})
- >>> trafaret.check({"foo": 1, "bar": "spam"})
- >>> extract_error(trafaret, {"foo": 1, "bar": "spam", "ham": 100})
- {'ham': 'ham is not allowed key'}
- >>> trafaret.allow_extra("*")
- , foo=)>
- >>> trafaret.check({"foo": 1, "bar": "spam", "ham": 100})
- >>> trafaret.check({"foo": 1, "bar": "spam", "ham": 100, "baz": None})
- >>> extract_error(trafaret, {"foo": 1, "ham": 100, "baz": None})
- {'bar': 'is required'}
- >>> trafaret = Dict({Key('bar', optional=True): String}, foo=Int)
- >>> trafaret.allow_extra("*")
- , foo=)>
- >>> _dd(trafaret.check({"foo": 1, "ham": 100, "baz": None}))
- "{'baz': None, 'foo': 1, 'ham': 100}"
- >>> _dd(extract_error(trafaret, {"bar": 1, "ham": 100, "baz": None}))
- "{'bar': 'value is not a string', 'foo': 'is required'}"
- >>> extract_error(trafaret, {"foo": 1, "bar": 1, "ham": 100, "baz": None})
- {'bar': 'value is not a string'}
- >>> trafaret = Dict({Key('bar', default='nyanya') >> 'baz': String}, foo=Int)
- >>> _dd(trafaret.check({'foo': 4}))
- "{'baz': 'nyanya', 'foo': 4}"
- >>> _ = trafaret.ignore_extra('fooz')
- >>> _dd(trafaret.check({'foo': 4, 'fooz': 5}))
- "{'baz': 'nyanya', 'foo': 4}"
- >>> _ = trafaret.ignore_extra('*')
- >>> _dd(trafaret.check({'foo': 4, 'foor': 5}))
- "{'baz': 'nyanya', 'foo': 4}"
- """
- __slots__ = ['extras', 'extras_trafaret', 'allow_any', 'ignore', 'ignore_any', 'keys']
-
- def __init__(self, *args, **trafarets):
- if args and isinstance(args[0], AbcMapping):
- keys = args[0]
- args = args[1:]
- else:
- keys = {}
- if any(not callable(key) for key in args):
- raise RuntimeError('Keys in single attributes must be callables')
-
- # extra
- allow_extra = trafarets.pop('allow_extra', [])
- allow_extra_trafaret = trafarets.pop('allow_extra_trafaret', Any)
- self.extras_trafaret = ensure_trafaret(allow_extra_trafaret)
- self.allow_any = '*' in allow_extra
- self.extras = [name for name in allow_extra if name != '*']
- # ignore
- ignore_extra = trafarets.pop('ignore_extra', [])
- self.ignore_any = '*' in ignore_extra
- self.ignore = [name for name in ignore_extra if name != '*']
-
- self.keys = list(args)
- for key, trafaret in itertools.chain(trafarets.items(), keys.items()):
- key_ = Key(key) if isinstance(key, str_types) else key
- key_.set_trafaret(ensure_trafaret(trafaret))
- self.keys.append(key_)
-
- def allow_extra(self, *names, **kw):
- trafaret = kw.get('trafaret', Any)
- for name in names:
- if name == "*":
- self.allow_any = True
- else:
- self.extras.append(name)
- self.extras_trafaret = ensure_trafaret(trafaret)
- return self
-
- def ignore_extra(self, *names):
- for name in names:
- if name == "*":
- self.ignore_any = True
- else:
- self.ignore.append(name)
- return self
-
- def make_optional(self, *args):
- for key in self.keys:
- if key.name in args or '*' in args:
- key.make_optional()
- return self
-
- def check_and_return(self, value):
- if not isinstance(value, AbcMapping):
- self._failure("value is not a dict", value=value)
- collect = {}
- errors = {}
- touched_names = []
- for key in self.keys:
- if callable(key):
- for k, v, names in key(value):
- if isinstance(v, DataError):
- errors[k] = v
- else:
- collect[k] = v
- touched_names.extend(names)
- else:
- deprecated('Old pop based Keys subclasses deprecated. See README')
- value_keys = set(value.keys())
- for k, v in key.pop(value):
- if isinstance(v, DataError):
- errors[k] = v
- else:
- collect[k] = v
- touched_names.extend(value_keys - set(value.keys()))
-
- if not self.ignore_any:
- for key in value:
- if key in touched_names:
- continue
- if key in self.ignore:
- continue
- if not self.allow_any and key not in self.extras:
- errors[key] = DataError("%s is not allowed key" % key)
- elif key in collect:
- errors[key] = DataError("%s key was shadowed" % key)
- else:
- try:
- collect[key] = self.extras_trafaret(value[key])
- except DataError as de:
- errors[key] = de
- if errors:
- raise DataError(error=errors, trafaret=self)
- return collect
-
- def keys_names(self):
- for key in self.keys:
- for k in key.keys_names():
- yield k
-
- def __repr__(self):
- r = ""
- return r
-
- def merge(self, other):
- """
- Extends one Dict with other Dict Key`s or Key`s list,
- or dict instance supposed for Dict
- """
- if not isinstance(other, (Dict, list, dict)):
- raise TypeError('You must merge Dict only with Dict'
- ' or list of Keys')
- if isinstance(other, dict):
- other = Dict(other)
- if isinstance(other, Dict):
- other_keys_names = other.keys_names()
- other_keys = other.keys
- else:
- other_keys_names = [
- key_name
- for key in other
- for key_name in key.keys_names()
- ]
- other_keys = other
- if set(self.keys_names()) & set(other_keys_names):
- raise ValueError('Merged dicts should have '
- 'no interlapping keys')
- if (
- set(key.get_name() for key in self.keys)
- & set(key.get_name() for key in other_keys)
- ):
- raise ValueError('Merged dicts should have '
- 'no interlapping keys to names')
- new_trafaret = self.__class__()
- new_trafaret.keys = self.keys + other_keys
- return new_trafaret
-
- __add__ = merge
-
-
-def DictKeys(keys):
- """
- Checks if dict has all given keys
-
- :param keys:
- :type keys:
-
- >>> _dd(DictKeys(['a','b']).check({'a':1,'b':2,}))
- "{'a': 1, 'b': 2}"
- >>> extract_error(DictKeys(['a','b']), {'a':1,'b':2,'c':3,})
- {'c': 'c is not allowed key'}
- >>> extract_error(DictKeys(['key','key2']), {'key':'val'})
- {'key2': 'is required'}
- """
- def MissingKey(val):
- raise DataError('%s is not in Dict' % val)
-
- req = [(Key(key), Any) for key in keys]
- return Dict(dict(req))
-
-
-class Mapping(Trafaret):
- """
- Mapping gets two trafarets as arguments, one for key and one for value,
- like `Mapping(t.Int, t.List(t.Str))`.
- """
- __slots__ = ['key', 'value']
-
- def __init__(self, key, value):
- self.key = ensure_trafaret(key)
- self.value = ensure_trafaret(value)
-
- def check_and_return(self, mapping):
- if not isinstance(mapping, dict):
- self._failure("value is not a dict", value=mapping)
- checked_mapping = {}
- errors = {}
- for key, value in mapping.items():
- pair_errors = {}
- try:
- checked_key = self.key.check(key)
- except DataError as err:
- pair_errors['key'] = err
- try:
- checked_value = self.value.check(value)
- except DataError as err:
- pair_errors['value'] = err
- if pair_errors:
- errors[key] = DataError(error=pair_errors)
- else:
- checked_mapping[checked_key] = checked_value
- if errors:
- raise DataError(error=errors, trafaret=self)
- return checked_mapping
-
- def __repr__(self):
- return " %r)>" % (self.key, self.value)
-
-
-class Enum(Trafaret):
- """
- >>> trafaret = Enum("foo", "bar", 1) >> ignore
- >>> trafaret
-
- >>> trafaret.check("foo")
- >>> trafaret.check(1)
- >>> extract_error(trafaret, 2)
- "value doesn't match any variant"
- """
- __slots__ = ['variants']
-
- def __init__(self, *variants):
- self.variants = variants[:]
-
- def check_value(self, value):
- if value not in self.variants:
- self._failure("value doesn't match any variant", value=value)
-
- def __repr__(self):
- return "" % (", ".join(map(repr, self.variants)))
-
-
-class Callable(Trafaret):
- """
- >>> (Callable() >> ignore).check(lambda: 1)
- >>> extract_error(Callable(), 1)
- 'value is not callable'
- """
-
- def check_value(self, value):
- if not callable(value):
- self._failure("value is not callable", value=value)
-
- def __repr__(self):
- return ""
-
-
-class Call(Trafaret):
- """
- >>> def validator(value):
- ... if value != "foo":
- ... return DataError("I want only foo!")
- ... return 'foo'
- ...
- >>> trafaret = Call(validator)
- >>> trafaret
-
- >>> trafaret.check("foo")
- 'foo'
- >>> extract_error(trafaret, "bar")
- 'I want only foo!'
- """
- __slots__ = ['fn']
-
- def __init__(self, fn):
- if not callable(fn):
- raise RuntimeError("Call argument should be callable")
- if py3:
- argspec = inspect.getfullargspec(fn)
- else:
- argspec = inspect.getargspec(fn)
- if len(argspec.args) - len(argspec.defaults or []) > 1:
- raise RuntimeError("Call argument should be"
- " one argument function")
- self.fn = fn
-
- def check_and_return(self, value):
- res = self.fn(value)
- if isinstance(res, DataError):
- raise res
- else:
- return res
-
- def __repr__(self):
- return "" % self.fn.__name__
-
-
-class Forward(Trafaret):
- """
- >>> node = Forward()
- >>> node << Dict(name=String, children=List[node])
- >>> node
- )>, name=)>)>
- >>> node.check({"name": "foo", "children": []}) == {'children': [], 'name': 'foo'}
- True
- >>> extract_error(node, {"name": "foo", "children": [1]})
- {'children': {0: 'value is not a dict'}}
- >>> node.check({"name": "foo", "children": [ \
- {"name": "bar", "children": []} \
- ]}) == {'children': [{'children': [], 'name': 'bar'}], 'name': 'foo'}
- True
- >>> empty_node = Forward()
- >>> empty_node
-
- >>> extract_error(empty_node, 'something')
- 'trafaret not set yet'
- """
-
- def __init__(self):
- self.trafaret = None
- self._recur_repr = False
-
- def __lshift__(self, trafaret):
- self.provide(trafaret)
-
- def provide(self, trafaret):
- if self.trafaret:
- raise RuntimeError("trafaret for Forward is already specified")
- self.trafaret = ensure_trafaret(trafaret)
-
- def check_and_return(self, value):
- if self.trafaret is None:
- self._failure('trafaret not set yet', value=value)
- return self.trafaret.check(value)
-
- def __repr__(self):
- # XXX not threadsafe
- if self._recur_repr:
- return ""
- self._recur_repr = True
- r = "" % self.trafaret
- self._recur_repr = False
- return r
-
-
-class GuardError(DataError):
- """
- Raised when guarded function gets invalid arguments,
- inherits error message from corresponding DataError
- """
-
- pass
-
-
-def guard(trafaret=None, **kwargs):
- """
- Decorator for protecting function with trafarets
-
- >>> @guard(a=String, b=Int, c=String)
- ... def fn(a, b, c="default"):
- ... '''docstring'''
- ... return (a, b, c)
- ...
- >>> fn.__module__ = None
- >>> help(fn)
- Help on function fn:
-
- fn(*args, **kwargs)
- guarded with , b=, c=)>
-
- docstring
-
- >>> fn("foo", 1)
- ('foo', 1, 'default')
- >>> extract_error(fn, "foo", 1, 2)
- {'c': 'value is not a string'}
- >>> extract_error(fn, "foo")
- {'b': 'is required'}
- >>> g = guard(Dict())
- >>> c = Forward()
- >>> c << Dict(name=str, children=List[c])
- >>> g = guard(c)
- >>> g = guard(Int())
- Traceback (most recent call last):
- ...
- RuntimeError: trafaret should be instance of Dict or Forward
- """
- if trafaret and not isinstance(trafaret, Dict) and \
- not isinstance(trafaret, Forward):
- raise RuntimeError("trafaret should be instance of Dict or Forward")
- elif trafaret and kwargs:
- raise RuntimeError("choose one way of initialization,"
- " trafaret or kwargs")
- if not trafaret:
- trafaret = Dict(**kwargs)
-
- def wrapper(fn):
- if py3:
- argspec = inspect.getfullargspec(fn)
- else:
- argspec = inspect.getargspec(fn)
-
- @functools.wraps(fn)
- def decor(*args, **kwargs):
- fnargs = argspec.args
- if fnargs[0] in ['self', 'cls']:
- obj = args[0]
- fnargs = fnargs[1:]
- checkargs = args[1:]
- else:
- obj = None
- checkargs = args
-
- try:
- call_args = dict(
- itertools.chain(zip(fnargs, checkargs), kwargs.items())
- )
- for name, default in zip(reversed(fnargs),
- reversed(argspec.defaults or ())):
- if name not in call_args:
- call_args[name] = default
- converted = trafaret.check(call_args)
- except DataError as err:
- raise GuardError(error=err.error)
- return fn(obj, **converted) if obj else fn(**converted)
- decor.__doc__ = "guarded with %r\n\n" % trafaret + (decor.__doc__ or "")
- return decor
- return wrapper
-
-
-def ignore(val):
- """
- Stub to ignore value from trafaret
- Use it like:
-
- >>> a = Int >> ignore
- >>> a.check(7)
- """
- pass
-
-
-def catch(checker, *a, **kw):
- """
- Helper for tests - catch error and return it as dict
- """
-
- try:
- if hasattr(checker, 'check'):
- return checker.check(*a, **kw)
- elif callable(checker):
- return checker(*a, **kw)
- except DataError as error:
- return error
-
-catch_error = catch
-
-
-def extract_error(checker, *a, **kw):
- """
- Helper for tests - catch error and return it as dict
- """
-
- res = catch_error(checker, *a, **kw)
- if isinstance(res, DataError):
- return res.as_dict()
- return res
-
-
-class MissingContribModuleStub(types.ModuleType):
- """
- Preserves initial exception to be raised on module access
- """
-
- def __init__(self, entrypoint, orig):
- self.orig = orig
- self.entrypoint = entrypoint
-
- def __getattr__(self, item):
- raise self.orig
-
- def __call__(self, *args, **kwargs):
- raise self.orig
- @property
- def __name__(self):
- return self.entrypoint.name.lstrip('.')
+__VERSION__ = (0, 11, 1)
diff --git a/trafaret/async.py b/trafaret/async.py
new file mode 100644
index 0000000..71cddd5
--- /dev/null
+++ b/trafaret/async.py
@@ -0,0 +1,184 @@
+import inspect
+from collections import Mapping as AbcMapping
+from .dataerror import DataError
+from .lib import (
+ call_with_context_if_support,
+ _empty,
+)
+
+
+class TrafaretAsyncMixin:
+ async def async_check(self, value, context=None):
+ if hasattr(self, 'async_transform'):
+ return (await self.async_transform(value, context=context))
+ return self.check(value, context=context)
+
+
+class OrAsyncMixin:
+ async def async_transform(self, value, context=None):
+ errors = []
+ for trafaret in self.trafarets:
+ try:
+ return (await trafaret.async_check(value, context=context))
+ except DataError as e:
+ errors.append(e)
+ raise DataError(dict(enumerate(errors)), trafaret=self)
+
+
+class AndAsyncMixin:
+ async def async_transform(self, value, context=None):
+ res = await self.trafaret.async_check(value, context=context)
+ if isinstance(res, DataError):
+ raise DataError
+ res = await self.other.async_check(res, context=context)
+ if isinstance(res, DataError):
+ raise res
+ return res
+
+
+class ListAsyncMixin:
+ async def async_transform(self, value, context=None):
+ self.check_common(value)
+ lst = []
+ errors = {}
+ for index, item in enumerate(value):
+ try:
+ lst.append(await self.trafaret.async_check(item, context=context))
+ except DataError as err:
+ errors[index] = err
+ if errors:
+ raise DataError(error=errors, trafaret=self)
+ return lst
+
+
+class TupleAsyncMixin:
+ async def async_transform(self, value, context=None):
+ self.check_common(value)
+ result = []
+ errors = {}
+ for idx, (item, trafaret) in enumerate(zip(value, self.trafarets)):
+ try:
+ result.append(await trafaret.async_check(item, context=context))
+ except DataError as err:
+ errors[idx] = err
+ if errors:
+ self._failure(errors, value=value)
+ return tuple(result)
+
+
+class MappingAsyncMixin:
+ async def async_transform(self, mapping, context=None):
+ if not isinstance(mapping, AbcMapping):
+ self._failure("value is not a dict", value=mapping)
+ checked_mapping = {}
+ errors = {}
+ for key, value in mapping.items():
+ pair_errors = {}
+ try:
+ checked_key = await self.key.async_check(key, context=context)
+ except DataError as err:
+ pair_errors['key'] = err
+ try:
+ checked_value = await self.value.async_check(value, context=context)
+ except DataError as err:
+ pair_errors['value'] = err
+ if pair_errors:
+ errors[key] = DataError(error=pair_errors)
+ else:
+ checked_mapping[checked_key] = checked_value
+ if errors:
+ raise DataError(error=errors, trafaret=self)
+ return checked_mapping
+
+
+class CallAsyncMixin:
+ async def async_transform(self, value, context=None):
+ if not inspect.iscoroutinefunction(self.fn):
+ return self.transform(value, context=context)
+ if self.supports_context:
+ res = await self.fn(value, context=context)
+ else:
+ res = await self.fn(value)
+ if isinstance(res, DataError):
+ raise res
+ else:
+ return res
+
+
+class ForwardAsyncMixin:
+ async def async_transform(self, value, context=None):
+ if self.trafaret is None:
+ self._failure('trafaret not set yet', value=value)
+ return (await self.trafaret.async_check(value, context=context))
+
+
+class DictAsyncMixin:
+ async def async_transform(self, value, context=None):
+ if not isinstance(value, AbcMapping):
+ self._failure("value is not a dict", value=value)
+ collect = {}
+ errors = {}
+ touched_names = []
+ for key in self.keys:
+ if not callable(key) and not hasattr(key, 'async_call'):
+ raise ValueError('Non callable Keys are not supported')
+ key_run = call_with_context_if_support(
+ getattr(key, 'async_call', key),
+ value,
+ context=context,
+ )
+ if inspect.isasyncgen(key_run):
+ async for k, v, names in key_run:
+ if isinstance(v, DataError):
+ errors[k] = v
+ else:
+ collect[k] = v
+ touched_names.extend(names)
+ else:
+ for k, v, names in key_run:
+ if isinstance(v, DataError):
+ errors[k] = v
+ else:
+ collect[k] = v
+ touched_names.extend(names)
+
+ if not self.ignore_any:
+ for key in value:
+ if key in touched_names:
+ continue
+ if key in self.ignore:
+ continue
+ if not self.allow_any and key not in self.extras:
+ errors[key] = DataError("%s is not allowed key" % key)
+ elif key in collect:
+ errors[key] = DataError("%s key was shadowed" % key)
+ else:
+ try:
+ collect[key] = await self.extras_trafaret.async_check(value[key])
+ except DataError as de:
+ errors[key] = de
+ if errors:
+ raise DataError(error=errors, trafaret=self)
+ return collect
+
+
+class KeyAsyncMixin:
+ async def async_call(self, data, context=None):
+ if self.name in data or self.default is not _empty:
+ if callable(self.default):
+ default = self.default()
+ else:
+ default = self.default
+ try:
+ value = await self.trafaret.async_check(self.get_data(data, default), context=context)
+ except DataError as data_error:
+ value = data_error
+ yield (
+ self.get_name(),
+ value,
+ (self.name,)
+ )
+ return
+
+ if not self.optional:
+ yield self.name, DataError(error='is required'), (self.name,)
diff --git a/trafaret/base.py b/trafaret/base.py
new file mode 100644
index 0000000..8d3dd81
--- /dev/null
+++ b/trafaret/base.py
@@ -0,0 +1,1387 @@
+# -*- coding: utf-8 -*-
+
+import functools
+import inspect
+import itertools
+import numbers
+import warnings
+from collections import Mapping as AbcMapping
+from .lib import (
+ py3,
+ py36,
+ py3metafix,
+ getargspec,
+ call_with_context_if_support,
+ _empty,
+)
+from .dataerror import DataError
+
+
+if py36:
+ from .async import (
+ TrafaretAsyncMixin,
+ OrAsyncMixin,
+ AndAsyncMixin,
+ ListAsyncMixin,
+ TupleAsyncMixin,
+ MappingAsyncMixin,
+ CallAsyncMixin,
+ ForwardAsyncMixin,
+ DictAsyncMixin,
+ KeyAsyncMixin,
+ )
+else:
+ class EmptyMixin(object):
+ pass
+ TrafaretAsyncMixin = EmptyMixin
+ OrAsyncMixin = EmptyMixin
+ AndAsyncMixin = EmptyMixin
+ ListAsyncMixin = EmptyMixin
+ TupleAsyncMixin = EmptyMixin
+ MappingAsyncMixin = EmptyMixin
+ CallAsyncMixin = EmptyMixin
+ ForwardAsyncMixin = EmptyMixin
+ DictAsyncMixin = EmptyMixin
+ KeyAsyncMixin = EmptyMixin
+
+
+# Python3 support
+if py3:
+ str_types = (str, bytes)
+ unicode = str
+ BYTES_TYPE = bytes
+else:
+ try:
+ from future_builtins import map
+ except ImportError:
+ # Support for GAE runner
+ from itertools import imap as map
+ str_types = (basestring,) # noqa
+ BYTES_TYPE = str
+
+
+def _dd(value):
+ if not hasattr(value, 'items'):
+ return repr(value)
+ return r"{%s}" % ', '.join("%r: %s" % (x[0], _dd(x[1])) for x in sorted(value.items(), key=lambda x: x[0]))
+
+
+def deprecated(message):
+ warnings.warn(message, DeprecationWarning)
+
+
+"""
+Trafaret is tiny library for data validation
+It provides several primitives to validate complex data structures
+Look at doctests for usage examples
+"""
+
+
+class TrafaretMeta(type):
+ """
+ Metaclass for trafarets to make using "|" operator possible not only
+ on instances but on classes
+
+ >>> Int | String
+ , )>
+ >>> Int | String | Null
+ , , )>
+ >>> (Int >> (lambda v: v if v ** 2 > 15 else 0)).check(5)
+ 5
+ """
+
+ def __or__(cls, other):
+ return cls() | other
+
+ def __and__(cls, other):
+ return cls() & other
+
+ def __rshift__(cls, other):
+ return cls() >> other
+
+
+@py3metafix
+class Trafaret(TrafaretAsyncMixin):
+ """
+ Base class for trafarets, provides only one method for
+ trafaret validation failure reporting
+ """
+
+ __metaclass__ = TrafaretMeta
+
+ def check(self, value, context=None):
+ """
+ Common logic. In subclasses you need to implement check_value or
+ check_and_return.
+ """
+ if hasattr(self, 'transform'):
+ return self.transform(value, context=context)
+ elif hasattr(self, 'check_value'):
+ self.check_value(value)
+ return value
+ elif hasattr(self, 'check_and_return'):
+ return self.check_and_return(value)
+ else:
+ cls = "{}.{}".format(
+ type(self).__module__,
+ type(self).__name__,
+ )
+ raise NotImplementedError(
+ "You must implement check_value or"
+ " check_and_return methods '%s'" % cls
+ )
+
+ def _failure(self, error=None, value=_empty):
+ """
+ Shortcut method for raising validation error
+ """
+ raise DataError(error=error, value=value, trafaret=self)
+
+ def append(self, other):
+ """
+ Appends new converter to list.
+ """
+ return And(self, other)
+
+ def __or__(self, other):
+ return Or(self, other)
+
+ def __and__(self, other):
+ return And(self, other)
+
+ def __rshift__(self, other):
+ return And(self, other)
+
+ def __call__(self, val, context=None):
+ return self.check(val, context=context)
+
+
+class OnError(Trafaret):
+ def __init__(self, trafaret, message):
+ self.trafaret = trafaret
+ self.message = message
+
+ def transform(self, value, context=None):
+ try:
+ return self.trafaret(value, context=context)
+ except DataError:
+ raise DataError(self.message, value=value)
+
+
+def ensure_trafaret(trafaret):
+ """
+ Helper for complex trafarets, takes trafaret instance or class
+ and returns trafaret instance
+ """
+ if isinstance(trafaret, Trafaret):
+ return trafaret
+ elif inspect.isroutine(trafaret):
+ if 'context' in getargspec(trafaret).args:
+ return trafaret
+ else:
+ return Call(trafaret)
+ elif isinstance(trafaret, type):
+ if issubclass(trafaret, Trafaret):
+ return trafaret()
+ # str, int, float are classes, but its appropriate to use them
+ # as trafaret functions
+ return Call(lambda val: trafaret(val))
+ else:
+ raise RuntimeError("%r should be instance or subclass"
+ " of Trafaret" % trafaret)
+
+
+class TypeMeta(TrafaretMeta):
+ def __getitem__(self, type_):
+ return self(type_)
+
+
+@py3metafix
+class TypingTrafaret(Trafaret):
+ """A trafaret used for instance type and class inheritance checks."""
+
+ __metaclass__ = TypeMeta
+
+ def __init__(self, type_):
+ self.type_ = type_
+
+ def check_value(self, value):
+ if not self.typing_checker(value, self.type_):
+ self._failure(self.failure_message % self.type_.__name__, value=value)
+
+ def __repr__(self):
+ return "<%s(%s)>" % (self.__class__.__name__, self.type_.__name__)
+
+
+class Subclass(TypingTrafaret):
+ """
+ >>> Subclass(type)
+
+ >>> Subclass[type]
+
+ >>> s = Subclass[type]
+ >>> s.check(type)
+
+ >>> extract_error(s, object)
+ 'value is not subclass of type'
+ """
+
+ typing_checker = issubclass
+ failure_message = "value is not subclass of %s"
+
+
+class Type(TypingTrafaret):
+ """
+ >>> Type(int)
+
+ >>> Type[int]
+
+ >>> c = Type[int]
+ >>> c.check(1)
+ 1
+ >>> extract_error(c, "foo")
+ 'value is not int'
+ """
+
+ typing_checker = isinstance
+ failure_message = "value is not %s"
+
+
+class Any(Trafaret):
+ """
+ >>> Any()
+
+ >>> (Any() >> ignore).check(object())
+ """
+
+ def check_value(self, value):
+ pass
+
+ def __repr__(self):
+ return ""
+
+
+class OrMeta(TrafaretMeta):
+ """
+ Allows to use "<<" operator on Or class
+
+ >>> Or << Int << String
+ , )>
+ """
+
+ def __lshift__(cls, other):
+ return cls() << other
+
+
+@py3metafix
+class Or(Trafaret, OrAsyncMixin):
+ """
+ >>> nullString = Or(String, Null)
+ >>> nullString
+ , )>
+ >>> nullString.check(None)
+ >>> nullString.check("test")
+ 'test'
+ >>> extract_error(nullString, 1)
+ {0: 'value is not a string', 1: 'value should be None'}
+ """
+
+ __metaclass__ = OrMeta
+ __slots__ = ['trafarets']
+
+ def __init__(self, *trafarets):
+ self.trafarets = list(map(ensure_trafaret, trafarets))
+
+ def transform(self, value, context=None):
+ errors = []
+ for trafaret in self.trafarets:
+ try:
+ return trafaret(value, context=context)
+ except DataError as e:
+ errors.append(e)
+ raise DataError(dict(enumerate(errors)), trafaret=self)
+
+ def __lshift__(self, trafaret):
+ self.trafarets.append(ensure_trafaret(trafaret))
+ return self
+
+ def __or__(self, trafaret):
+ self << trafaret
+ return self
+
+ def __repr__(self):
+ return "" % (", ".join(map(repr, self.trafarets)))
+
+
+class And(Trafaret, AndAsyncMixin):
+ """
+ Will work over trafarets sequentially
+ """
+ __slots__ = ('trafaret', 'other', 'disable_old_check_convert')
+
+ def __init__(self, trafaret, other):
+ self.trafaret = ensure_trafaret(trafaret)
+ self.other = ensure_trafaret(other)
+
+ def transform(self, value, context=None):
+ res = self.trafaret(value, context=context)
+ if isinstance(res, DataError):
+ raise DataError
+ res = self.other(res, context=context)
+ if isinstance(res, DataError):
+ raise res
+ return res
+
+ def __repr__(self):
+ return repr(self.trafaret)
+
+
+class Null(Trafaret):
+ """
+ >>> Null()
+
+ >>> Null().check(None)
+ >>> extract_error(Null(), 1)
+ 'value should be None'
+ """
+
+ def check_value(self, value):
+ if value is not None:
+ self._failure("value should be None", value=value)
+
+ def __repr__(self):
+ return ""
+
+
+class Bool(Trafaret):
+ """
+ >>> Bool()
+
+ >>> Bool().check(True)
+ True
+ >>> Bool().check(False)
+ False
+ >>> extract_error(Bool(), 1)
+ 'value should be True or False'
+ """
+
+ def check_value(self, value):
+ if not isinstance(value, bool):
+ self._failure("value should be True or False", value=value)
+
+ def __repr__(self):
+ return ""
+
+
+class StrBool(Trafaret):
+ """
+ >>> extract_error(StrBool(), 'aloha')
+ "value can't be converted to Bool"
+ >>> StrBool().check(1)
+ True
+ >>> StrBool().check(0)
+ False
+ >>> StrBool().check('y')
+ True
+ >>> StrBool().check('n')
+ False
+ >>> StrBool().check(None)
+ False
+ >>> StrBool().check('1')
+ True
+ >>> StrBool().check('0')
+ False
+ >>> StrBool().check('YeS')
+ True
+ >>> StrBool().check('No')
+ False
+ >>> StrBool().check(True)
+ True
+ >>> StrBool().check(False)
+ False
+ """
+
+ convertable = ('t', 'true', 'false', 'y', 'n', 'yes', 'no', 'on',
+ '1', '0', 'none')
+
+ def check_and_return(self, value):
+ _value = str(value).strip().lower()
+ if _value not in self.convertable:
+ self._failure("value can't be converted to Bool", value=value)
+ return _value in ('t', 'true', 'y', 'yes', 'on', '1')
+
+ def __repr__(self):
+ return ""
+
+
+class NumberMeta(TrafaretMeta):
+ """
+ Allows slicing syntax for min and max arguments for
+ number trafarets
+
+ >>> Int[1:]
+
+ >>> Int[1:10]
+
+ >>> Int[:10]
+
+ >>> Float[1:]
+
+ >>> Int > 3
+
+ >>> 1 < (Float < 10)
+
+ >>> (Int > 5).check(10)
+ 10
+ >>> extract_error(Int > 5, 1)
+ 'value should be greater than 5'
+ >>> (Int < 3).check(1)
+ 1
+ >>> extract_error(Int < 3, 3)
+ 'value should be less than 3'
+ """
+
+ def __getitem__(cls, slice_):
+ return cls(gte=slice_.start, lte=slice_.stop)
+
+ def __lt__(cls, lt):
+ return cls(lt=lt)
+
+ def __gt__(cls, gt):
+ return cls(gt=gt)
+
+
+@py3metafix
+class FloatRaw(Trafaret):
+ """
+ Tests that value is a float or a string that is convertable to float.
+
+ >>> Float()
+
+ >>> Float(gte=1)
+
+ >>> Float(lte=10)
+
+ >>> Float(gte=1, lte=10)
+
+ >>> Float().check(1.0)
+ 1.0
+ >>> extract_error(Float(), 1 + 3j)
+ 'value is not float'
+ >>> extract_error(Float(), 1)
+ 1.0
+ >>> Float(gte=2).check(3.0)
+ 3.0
+ >>> extract_error(Float(gte=2), 1.0)
+ 'value is less than 2'
+ >>> Float(lte=10).check(5.0)
+ 5.0
+ >>> extract_error(Float(lte=3), 5.0)
+ 'value is greater than 3'
+ >>> Float().check("5.0")
+ 5.0
+ """
+
+ __metaclass__ = NumberMeta
+
+ convertable = str_types + (numbers.Real,)
+ value_type = float
+
+ def __init__(self, gte=None, lte=None, gt=None, lt=None):
+ self.gte = gte
+ self.lte = lte
+ self.gt = gt
+ self.lt = lt
+
+ def _converter(self, value):
+ if not isinstance(value, self.convertable):
+ self._failure('value is not %s' % self.value_type.__name__, value=value)
+ try:
+ return self.value_type(value)
+ except ValueError:
+ self._failure(
+ "value can't be converted to %s" % self.value_type.__name__,
+ value=value
+ )
+
+ def _check(self, data):
+ if not isinstance(data, self.value_type):
+ value = self._converter(data)
+ else:
+ value = data
+ if self.gte is not None and value < self.gte:
+ self._failure("value is less than %s" % self.gte, value=data)
+ if self.lte is not None and value > self.lte:
+ self._failure("value is greater than %s" % self.lte, value=data)
+ if self.lt is not None and value >= self.lt:
+ self._failure("value should be less than %s" % self.lt, value=data)
+ if self.gt is not None and value <= self.gt:
+ self._failure("value should be greater than %s" % self.gt, value=data)
+ return value
+
+ def check_and_return(self, data):
+ self._check(data)
+ return data
+
+ def __lt__(self, lt):
+ return type(self)(gte=self.gte, lte=self.lte, gt=self.gt, lt=lt)
+
+ def __gt__(self, gt):
+ return type(self)(gte=self.gte, lte=self.lte, gt=gt, lt=self.lt)
+
+ def __repr__(self):
+ r = "<%s" % type(self).__name__
+ options = []
+ for param in ("gte", "lte", "gt", "lt"):
+ if getattr(self, param) is not None:
+ options.append("%s=%s" % (param, getattr(self, param)))
+ if options:
+ r += "(%s)" % (", ".join(options))
+ r += ">"
+ return r
+
+
+class Float(FloatRaw):
+ """Checks that value is a float.
+ Or if value is a string converts this string to float
+ """
+ def check_and_return(self, data):
+ return self._check(data)
+
+
+class IntRaw(FloatRaw):
+ """
+ >>> Int()
+
+ >>> Int().check(5)
+ 5
+ >>> extract_error(Int(), 1.1)
+ 'value is not int'
+ >>> extract_error(Int(), 1 + 1j)
+ 'value is not int'
+ """
+
+ value_type = int
+
+ def _converter(self, value):
+ if isinstance(value, float):
+ if not value.is_integer():
+ self._failure('value is not int', value=value)
+ return super(IntRaw, self)._converter(value)
+
+
+class Int(IntRaw):
+ def check_and_return(self, data):
+ return self._check(data)
+
+
+class Atom(Trafaret):
+ """
+ >>> Atom('atom').check('atom')
+ 'atom'
+ >>> extract_error(Atom('atom'), 'molecule')
+ "value is not exactly 'atom'"
+ """
+ __slots__ = ['value']
+
+ def __init__(self, value):
+ self.value = value
+
+ def check_value(self, value):
+ if self.value != value:
+ self._failure("value is not exactly '%s'" % self.value, value=value)
+
+
+class String(Trafaret):
+ """
+ >>> String()
+
+ >>> String(allow_blank=True)
+
+ >>> String().check("foo")
+ 'foo'
+ >>> extract_error(String(), "")
+ 'blank value is not allowed'
+ >>> String(allow_blank=True).check("")
+ ''
+ >>> extract_error(String(), 1)
+ 'value is not a string'
+ >>> String(min_length=2, max_length=3).check('123')
+ '123'
+ >>> extract_error(String(min_length=2, max_length=6), '1')
+ 'String is shorter than 2 characters'
+ >>> extract_error(String(min_length=2, max_length=6), '1234567')
+ 'String is longer than 6 characters'
+ >>> String(min_length=2, max_length=6, allow_blank=True)
+ Traceback (most recent call last):
+ ...
+ AssertionError: Either allow_blank or min_length should be specified, not both
+ >>> String(min_length=0, max_length=6, allow_blank=True).check('123')
+ '123'
+ """
+
+ def __init__(self, allow_blank=False, min_length=None, max_length=None):
+ assert not (allow_blank and min_length), \
+ "Either allow_blank or min_length should be specified, not both"
+ self.allow_blank = allow_blank
+ self.min_length = min_length
+ self.max_length = max_length
+
+ def check_and_return(self, value):
+ if not isinstance(value, str_types):
+ self._failure("value is not a string", value=value)
+ if not self.allow_blank and len(value) == 0:
+ self._failure("blank value is not allowed", value=value)
+ elif self.allow_blank and len(value) == 0:
+ return value
+ if self.min_length is not None and len(value) < self.min_length:
+ self._failure('String is shorter than %s characters' % self.min_length, value=value)
+ if self.max_length is not None and len(value) > self.max_length:
+ self._failure('String is longer than %s characters' % self.max_length, value=value)
+ return value
+
+ def __repr__(self):
+ return "" if self.allow_blank else ""
+
+
+class Bytes(Trafaret):
+ def __init__(self, encoding='utf-8'):
+ self.encoding = encoding
+
+ def check_and_return(self, value):
+ if not isinstance(value, BYTES_TYPE):
+ self._failure('Value is not bytes', value=value)
+ try:
+ return value.decode(self.encoding)
+ except UnicodeError:
+ raise self._failure('value cannot be decoded with %s encoding' % self.encoding)
+
+
+class SquareBracketsMeta(TrafaretMeta):
+ """
+ Allows usage of square brackets for List initialization
+
+ >>> List[Int]
+ )>
+ >>> List[Int, 1:]
+ )>
+ >>> List[:10, Int]
+ )>
+ >>> List[1:10]
+ Traceback (most recent call last):
+ ...
+ RuntimeError: Trafaret is required for List initialization
+ """
+
+ def __getitem__(self, args):
+ slice_ = None
+ trafaret = None
+ if not isinstance(args, tuple):
+ args = (args, )
+ for arg in args:
+ if isinstance(arg, slice):
+ slice_ = arg
+ elif (
+ isinstance(arg, Trafaret)
+ or issubclass(arg, Trafaret)
+ or isinstance(arg, type)
+ ):
+ trafaret = arg
+ if not trafaret:
+ raise RuntimeError("Trafaret is required for List initialization")
+ if slice_:
+ return self(
+ trafaret,
+ min_length=slice_.start or 0,
+ max_length=slice_.stop,
+ )
+ return self(trafaret)
+
+
+@py3metafix
+class List(Trafaret, ListAsyncMixin):
+ """
+ >>> List(Int)
+ )>
+ >>> List(Int, min_length=1)
+ )>
+ >>> List(Int, min_length=1, max_length=10)
+ )>
+ >>> extract_error(List(Int), 1)
+ 'value is not a list'
+ >>> List(Int).check([1, 2, 3])
+ [1, 2, 3]
+ >>> List(String).check(["foo", "bar", "spam"])
+ ['foo', 'bar', 'spam']
+ >>> extract_error(List(Int), [1, 2, 1 + 3j])
+ {2: 'value is not int'}
+ >>> List(Int, min_length=1).check([1, 2, 3])
+ [1, 2, 3]
+ >>> extract_error(List(Int, min_length=1), [])
+ 'list length is less than 1'
+ >>> List(Int, max_length=2).check([1, 2])
+ [1, 2]
+ >>> extract_error(List(Int, max_length=2), [1, 2, 3])
+ 'list length is greater than 2'
+ >>> extract_error(List(Int), ["a"])
+ {0: "value can't be converted to int"}
+ """
+
+ __metaclass__ = SquareBracketsMeta
+ __slots__ = ['trafaret', 'min_length', 'max_length']
+
+ def __init__(self, trafaret, min_length=0, max_length=None):
+ self.trafaret = ensure_trafaret(trafaret)
+ self.min_length = min_length
+ self.max_length = max_length
+
+ def check_common(self, value):
+ if not isinstance(value, list):
+ self._failure("value is not a list", value=value)
+ if len(value) < self.min_length:
+ self._failure("list length is less than %s" % self.min_length, value=value)
+ if self.max_length is not None and len(value) > self.max_length:
+ self._failure("list length is greater than %s" % self.max_length, value=value)
+
+ def transform(self, value, context=None):
+ self.check_common(value)
+ lst = []
+ errors = {}
+ for index, item in enumerate(value):
+ try:
+ lst.append(self.trafaret(item, context=context))
+ except DataError as err:
+ errors[index] = err
+ if errors:
+ raise DataError(error=errors, trafaret=self)
+ return lst
+
+ def __repr__(self):
+ r = ""
+ return r
+
+
+class Tuple(Trafaret, TupleAsyncMixin):
+ """
+ Tuple checker can be used to check fixed tuples, like (Int, Int, String).
+
+ >>> t = Tuple(Int, Int, String)
+ >>> t.check([3, 4, '5'])
+ (3, 4, '5')
+ >>> extract_error(t, [3, 4, 5])
+ {2: 'value is not a string'}
+ >>> t
+ , , )>
+ """
+ __slots__ = ['trafarets', 'length']
+
+ def __init__(self, *args):
+ self.trafarets = list(map(ensure_trafaret, args))
+ self.length = len(self.trafarets)
+
+ def check_common(self, value):
+ try:
+ value = tuple(value)
+ except TypeError:
+ self._failure('value must be convertable to tuple', value=value)
+ if len(value) != self.length:
+ self._failure('value must contain %s items' % self.length, value=value)
+
+ def transform(self, value, context=None):
+ self.check_common(value)
+ result = []
+ errors = {}
+ for idx, (item, trafaret) in enumerate(zip(value, self.trafarets)):
+ try:
+ result.append(trafaret(item, context=context))
+ except DataError as err:
+ errors[idx] = err
+ if errors:
+ self._failure(errors, value=value)
+ return tuple(result)
+
+ def __repr__(self):
+ return ''
+
+
+class Key(KeyAsyncMixin):
+ """
+ Helper class for Dict.
+
+ It gets ``name``, and provides method ``extract(data)`` that extract key value
+ from data through mapping ``get`` method.
+ Key `__call__` method yields ``(key name, Maybe(DataError), [touched keys])`` triples.
+
+ You can redefine ``get_data(data, default)`` method in subclassed ``Key`` if you want to use something other
+ then ``.get(...)`` method.
+
+ Like this for the aiohttp MultiDict::
+
+ class MDKey(t.Key):
+ def get_data(data, default):
+ return data.get_all(self.name, default)
+ """
+ __slots__ = ['name', 'to_name', 'default', 'optional', 'trafaret']
+
+ def __init__(self, name, default=_empty, optional=False, to_name=None, trafaret=None):
+ self.name = name
+ self.to_name = to_name
+ self.default = default
+ self.optional = optional
+ self.trafaret = ensure_trafaret(trafaret) if trafaret else Any()
+
+ def __call__(self, data, context=None):
+ if self.name in data or self.default is not _empty:
+ if callable(self.default):
+ default = self.default()
+ else:
+ default = self.default
+ yield (
+ self.get_name(),
+ catch_error(self.trafaret, self.get_data(data, default), context=context),
+ (self.name,)
+ )
+ return
+
+ if not self.optional:
+ yield self.name, DataError(error='is required'), (self.name,)
+
+ def get_data(self, data, default):
+ return data.get(self.name, default)
+
+ def keys_names(self):
+ yield self.name
+
+ def set_trafaret(self, trafaret):
+ self.trafaret = ensure_trafaret(trafaret)
+ return self
+
+ def __rshift__(self, name):
+ self.to_name = name
+ return self
+
+ def get_name(self):
+ return self.to_name or self.name
+
+ def make_optional(self):
+ self.optional = True
+
+ def __repr__(self):
+ return '<%s "%s"%s>' % (
+ self.__class__.__name__,
+ self.name,
+ ' to "%s"' % self.to_name if getattr(self, 'to_name', False) else '',
+ )
+
+
+class Dict(Trafaret, DictAsyncMixin):
+ """
+ >>> trafaret = Dict(foo=Int, bar=String) >> ignore
+ >>> trafaret.check({"foo": 1, "bar": "spam"})
+ >>> extract_error(trafaret, {"foo": 1, "bar": 2})
+ {'bar': 'value is not a string'}
+ >>> extract_error(trafaret, {"foo": 1})
+ {'bar': 'is required'}
+ >>> extract_error(trafaret, {"foo": 1, "bar": "spam", "eggs": None})
+ {'eggs': 'eggs is not allowed key'}
+ >>> trafaret.allow_extra("eggs")
+ , foo=)>
+ >>> trafaret.check({"foo": 1, "bar": "spam", "eggs": None})
+ >>> trafaret.check({"foo": 1, "bar": "spam"})
+ >>> extract_error(trafaret, {"foo": 1, "bar": "spam", "ham": 100})
+ {'ham': 'ham is not allowed key'}
+ >>> trafaret.allow_extra("*")
+ , foo=)>
+ >>> trafaret.check({"foo": 1, "bar": "spam", "ham": 100})
+ >>> trafaret.check({"foo": 1, "bar": "spam", "ham": 100, "baz": None})
+ >>> extract_error(trafaret, {"foo": 1, "ham": 100, "baz": None})
+ {'bar': 'is required'}
+ >>> trafaret = Dict({Key('bar', optional=True): String}, foo=Int)
+ >>> trafaret.allow_extra("*")
+ , foo=)>
+ >>> _dd(trafaret.check({"foo": 1, "ham": 100, "baz": None}))
+ "{'baz': None, 'foo': 1, 'ham': 100}"
+ >>> _dd(extract_error(trafaret, {"bar": 1, "ham": 100, "baz": None}))
+ "{'bar': 'value is not a string', 'foo': 'is required'}"
+ >>> extract_error(trafaret, {"foo": 1, "bar": 1, "ham": 100, "baz": None})
+ {'bar': 'value is not a string'}
+ >>> trafaret = Dict({Key('bar', default='nyanya') >> 'baz': String}, foo=Int)
+ >>> _dd(trafaret.check({'foo': 4}))
+ "{'baz': 'nyanya', 'foo': 4}"
+ >>> _ = trafaret.ignore_extra('fooz')
+ >>> _dd(trafaret.check({'foo': 4, 'fooz': 5}))
+ "{'baz': 'nyanya', 'foo': 4}"
+ >>> _ = trafaret.ignore_extra('*')
+ >>> _dd(trafaret.check({'foo': 4, 'foor': 5}))
+ "{'baz': 'nyanya', 'foo': 4}"
+ """
+ __slots__ = ['extras', 'extras_trafaret', 'allow_any', 'ignore', 'ignore_any', 'keys']
+
+ def __init__(self, *args, **trafarets):
+ if args and isinstance(args[0], AbcMapping):
+ keys = args[0]
+ args = args[1:]
+ else:
+ keys = {}
+ if any(not callable(key) for key in args):
+ raise RuntimeError('Keys in single attributes must be callables')
+
+ # extra
+ allow_extra = trafarets.pop('allow_extra', [])
+ allow_extra_trafaret = trafarets.pop('allow_extra_trafaret', Any)
+ self.extras_trafaret = ensure_trafaret(allow_extra_trafaret)
+ self.allow_any = '*' in allow_extra
+ self.extras = [name for name in allow_extra if name != '*']
+ # ignore
+ ignore_extra = trafarets.pop('ignore_extra', [])
+ self.ignore_any = '*' in ignore_extra
+ self.ignore = [name for name in ignore_extra if name != '*']
+
+ self.keys = list(args)
+ for key, trafaret in itertools.chain(trafarets.items(), keys.items()):
+ key_ = Key(key) if isinstance(key, str_types) else key
+ key_.set_trafaret(trafaret)
+ self.keys.append(key_)
+
+ def allow_extra(self, *names, **kw):
+ trafaret = kw.get('trafaret', Any)
+ for name in names:
+ if name == "*":
+ self.allow_any = True
+ else:
+ self.extras.append(name)
+ self.extras_trafaret = ensure_trafaret(trafaret)
+ return self
+
+ def ignore_extra(self, *names):
+ for name in names:
+ if name == "*":
+ self.ignore_any = True
+ else:
+ self.ignore.append(name)
+ return self
+
+ def make_optional(self, *args):
+ for key in self.keys:
+ if key.name in args or '*' in args:
+ key.make_optional()
+ return self
+
+ def transform(self, value, context=None):
+ if not isinstance(value, AbcMapping):
+ self._failure("value is not a dict", value=value)
+ collect = {}
+ errors = {}
+ touched_names = []
+ for key in self.keys:
+ if not callable(key):
+ raise ValueError('Non callable Keys are not supported')
+ for k, v, names in call_with_context_if_support(key, value, context=context):
+ if isinstance(v, DataError):
+ errors[k] = v
+ else:
+ collect[k] = v
+ touched_names.extend(names)
+
+ if not self.ignore_any:
+ for key in value:
+ if key in touched_names:
+ continue
+ if key in self.ignore:
+ continue
+ if not self.allow_any and key not in self.extras:
+ errors[key] = DataError("%s is not allowed key" % key)
+ elif key in collect:
+ errors[key] = DataError("%s key was shadowed" % key)
+ else:
+ try:
+ collect[key] = self.extras_trafaret(value[key])
+ except DataError as de:
+ errors[key] = de
+ if errors:
+ raise DataError(error=errors, trafaret=self)
+ return collect
+
+ def keys_names(self):
+ for key in self.keys:
+ for k in key.keys_names():
+ yield k
+
+ def __repr__(self):
+ r = ""
+ return r
+
+ def merge(self, other):
+ """
+ Extends one Dict with other Dict Key`s or Key`s list,
+ or dict instance supposed for Dict
+ """
+ if not isinstance(other, (Dict, list, dict)):
+ raise TypeError('You must merge Dict only with Dict'
+ ' or list of Keys')
+ if isinstance(other, dict):
+ other = Dict(other)
+ if isinstance(other, Dict):
+ other_keys_names = other.keys_names()
+ other_keys = other.keys
+ else:
+ other_keys_names = [
+ key_name
+ for key in other
+ for key_name in key.keys_names()
+ ]
+ other_keys = other
+ if set(self.keys_names()) & set(other_keys_names):
+ raise ValueError(
+ 'Merged dicts should have no interlapping keys'
+ )
+ if (
+ set(key.get_name() for key in self.keys)
+ & set(key.get_name() for key in other_keys)
+ ):
+ raise ValueError(
+ 'Merged dicts should have no interlapping keys to names'
+ )
+ new_trafaret = self.__class__()
+ new_trafaret.keys = self.keys + other_keys
+ return new_trafaret
+
+ __add__ = merge
+
+
+def DictKeys(keys):
+ """
+ Checks if dict has all given keys
+
+ :param keys:
+ :type keys:
+
+ >>> _dd(DictKeys(['a','b']).check({'a':1,'b':2,}))
+ "{'a': 1, 'b': 2}"
+ >>> extract_error(DictKeys(['a','b']), {'a':1,'b':2,'c':3,})
+ {'c': 'c is not allowed key'}
+ >>> extract_error(DictKeys(['key','key2']), {'key':'val'})
+ {'key2': 'is required'}
+ """
+ req = [(Key(key), Any) for key in keys]
+ return Dict(dict(req))
+
+
+class Mapping(Trafaret, MappingAsyncMixin):
+ """
+ Mapping gets two trafarets as arguments, one for key and one for value,
+ like `Mapping(t.Int, t.List(t.Str))`.
+ """
+ __slots__ = ['key', 'value']
+
+ def __init__(self, key, value):
+ self.key = ensure_trafaret(key)
+ self.value = ensure_trafaret(value)
+
+ def transform(self, mapping, context=None):
+ if not isinstance(mapping, AbcMapping):
+ self._failure("value is not a dict", value=mapping)
+ checked_mapping = {}
+ errors = {}
+ for key, value in mapping.items():
+ pair_errors = {}
+ try:
+ checked_key = self.key(key, context=context)
+ except DataError as err:
+ pair_errors['key'] = err
+ try:
+ checked_value = self.value(value, context=context)
+ except DataError as err:
+ pair_errors['value'] = err
+ if pair_errors:
+ errors[key] = DataError(error=pair_errors)
+ else:
+ checked_mapping[checked_key] = checked_value
+ if errors:
+ raise DataError(error=errors, trafaret=self)
+ return checked_mapping
+
+ def __repr__(self):
+ return " %r)>" % (self.key, self.value)
+
+
+class Enum(Trafaret):
+ """
+ >>> trafaret = Enum("foo", "bar", 1) >> ignore
+ >>> trafaret
+
+ >>> trafaret.check("foo")
+ >>> trafaret.check(1)
+ >>> extract_error(trafaret, 2)
+ "value doesn't match any variant"
+ """
+ __slots__ = ['variants']
+
+ def __init__(self, *variants):
+ self.variants = variants[:]
+
+ def check_value(self, value):
+ if value not in self.variants:
+ self._failure("value doesn't match any variant", value=value)
+
+ def __repr__(self):
+ return "" % (", ".join(map(repr, self.variants)))
+
+
+class Callable(Trafaret):
+ """
+ >>> (Callable() >> ignore).check(lambda: 1)
+ >>> extract_error(Callable(), 1)
+ 'value is not callable'
+ """
+
+ def check_value(self, value):
+ if not callable(value):
+ self._failure("value is not callable", value=value)
+
+ def __repr__(self):
+ return ""
+
+
+class Call(Trafaret, CallAsyncMixin):
+ """
+ >>> def validator(value):
+ ... if value != "foo":
+ ... return DataError("I want only foo!")
+ ... return 'foo'
+ ...
+ >>> trafaret = Call(validator)
+ >>> trafaret
+
+ >>> trafaret.check("foo")
+ 'foo'
+ >>> extract_error(trafaret, "bar")
+ 'I want only foo!'
+ """
+ __slots__ = ['fn']
+
+ def __init__(self, fn):
+ if not callable(fn):
+ raise RuntimeError("Call argument should be callable")
+ argspec = getargspec(fn)
+ args = set(argspec.args)
+ self.supports_context = 'context' in args
+ if 'context' in args:
+ args.remove('context')
+ if len(argspec.args) - len(argspec.defaults or []) > 1:
+ raise RuntimeError("Call argument should be"
+ " one argument function")
+ self.fn = fn
+
+ def transform(self, value, context=None):
+ if self.supports_context:
+ res = self.fn(value, context=context)
+ else:
+ res = self.fn(value)
+ if isinstance(res, DataError):
+ raise res
+ else:
+ return res
+
+ def __repr__(self):
+ return "" % self.fn.__name__
+
+
+class Forward(Trafaret, ForwardAsyncMixin):
+ """
+ >>> node = Forward()
+ >>> node << Dict(name=String, children=List[node])
+ >>> node
+ )>, name=)>)>
+ >>> node.check({"name": "foo", "children": []}) == {'children': [], 'name': 'foo'}
+ True
+ >>> extract_error(node, {"name": "foo", "children": [1]})
+ {'children': {0: 'value is not a dict'}}
+ >>> node.check({"name": "foo", "children": [ \
+ {"name": "bar", "children": []} \
+ ]}) == {'children': [{'children': [], 'name': 'bar'}], 'name': 'foo'}
+ True
+ >>> empty_node = Forward()
+ >>> empty_node
+
+ >>> extract_error(empty_node, 'something')
+ 'trafaret not set yet'
+ """
+
+ def __init__(self):
+ self.trafaret = None
+ self._recur_repr = False
+
+ def __lshift__(self, trafaret):
+ self.provide(trafaret)
+
+ def provide(self, trafaret):
+ if self.trafaret:
+ raise RuntimeError("trafaret for Forward is already specified")
+ self.trafaret = ensure_trafaret(trafaret)
+
+ def transform(self, value, context=None):
+ if self.trafaret is None:
+ self._failure('trafaret not set yet', value=value)
+ return self.trafaret(value, context=context)
+
+ def __repr__(self):
+ # XXX not threadsafe
+ if self._recur_repr:
+ return ""
+ self._recur_repr = True
+ r = "" % self.trafaret
+ self._recur_repr = False
+ return r
+
+
+class GuardError(DataError):
+ """
+ Raised when guarded function gets invalid arguments,
+ inherits error message from corresponding DataError
+ """
+
+ pass
+
+
+def guard(trafaret=None, **kwargs):
+ """
+ Decorator for protecting function with trafarets
+
+ >>> @guard(a=String, b=Int, c=String)
+ ... def fn(a, b, c="default"):
+ ... '''docstring'''
+ ... return (a, b, c)
+ ...
+ >>> fn.__module__ = None
+ >>> help(fn)
+ Help on function fn:
+
+ fn(*args, **kwargs)
+ guarded with , b=, c=)>
+
+ docstring
+
+ >>> fn("foo", 1)
+ ('foo', 1, 'default')
+ >>> extract_error(fn, "foo", 1, 2)
+ {'c': 'value is not a string'}
+ >>> extract_error(fn, "foo")
+ {'b': 'is required'}
+ >>> g = guard(Dict())
+ >>> c = Forward()
+ >>> c << Dict(name=str, children=List[c])
+ >>> g = guard(c)
+ >>> g = guard(Int())
+ Traceback (most recent call last):
+ ...
+ RuntimeError: trafaret should be instance of Dict or Forward
+ """
+ if (
+ trafaret
+ and not isinstance(trafaret, Dict)
+ and not isinstance(trafaret, Forward)
+ ):
+ raise RuntimeError("trafaret should be instance of Dict or Forward")
+ elif trafaret and kwargs:
+ raise RuntimeError("choose one way of initialization,"
+ " trafaret or kwargs")
+ if not trafaret:
+ trafaret = Dict(**kwargs)
+
+ def wrapper(fn):
+ if py3:
+ argspec = inspect.getfullargspec(fn)
+ else:
+ argspec = inspect.getargspec(fn)
+
+ @functools.wraps(fn)
+ def decor(*args, **kwargs):
+ fnargs = argspec.args
+ if fnargs[0] in ['self', 'cls']:
+ obj = args[0]
+ fnargs = fnargs[1:]
+ checkargs = args[1:]
+ else:
+ obj = None
+ checkargs = args
+
+ try:
+ call_args = dict(
+ itertools.chain(zip(fnargs, checkargs), kwargs.items())
+ )
+ for name, default in zip(reversed(fnargs),
+ reversed(argspec.defaults or ())):
+ if name not in call_args:
+ call_args[name] = default
+ converted = trafaret(call_args)
+ except DataError as err:
+ raise GuardError(error=err.error)
+ return fn(obj, **converted) if obj else fn(**converted)
+ decor.__doc__ = "guarded with %r\n\n" % trafaret + (decor.__doc__ or "")
+ return decor
+ return wrapper
+
+
+def ignore(val):
+ """
+ Stub to ignore value from trafaret
+ Use it like:
+
+ >>> a = Int >> ignore
+ >>> a.check(7)
+ """
+ pass
+
+
+def catch(checker, *a, **kw):
+ """
+ Helper for tests - catch error and return it as dict
+ """
+
+ try:
+ if hasattr(checker, 'check'):
+ return checker.check(*a, **kw)
+ elif callable(checker):
+ return checker(*a, **kw)
+ except DataError as error:
+ return error
+
+
+catch_error = catch
+
+
+def extract_error(checker, *a, **kw):
+ """
+ Helper for tests - catch error and return it as dict
+ """
+
+ res = catch_error(checker, *a, **kw)
+ if isinstance(res, DataError):
+ return res.as_dict()
+ return res
diff --git a/trafaret/constructor.py b/trafaret/constructor.py
index 6bbd9f6..c567d5f 100644
--- a/trafaret/constructor.py
+++ b/trafaret/constructor.py
@@ -1,5 +1,5 @@
import trafaret as t
-from trafaret.lib import py3, py3metafix
+from trafaret.lib import py3metafix
class ConstructMeta(type):
diff --git a/trafaret/contrib/object_id.py b/trafaret/contrib/object_id.py
index 938d411..c82bc70 100644
--- a/trafaret/contrib/object_id.py
+++ b/trafaret/contrib/object_id.py
@@ -30,16 +30,13 @@ def __init__(self, allow_blank=False):
def __repr__(self):
return "" if self.allow_blank else ""
- def converter(self, value):
- try:
- return self.value_type(value)
- except InvalidId as e:
- self._failure(str(e))
-
def check_and_return(self, value):
if not self.allow_blank and value is None:
self._failure("blank value is not allowed")
if isinstance(value, self.convertable) or value is None:
- return value
+ try:
+ return ObjectId(value)
+ except InvalidId as e:
+ self._failure(str(e))
self._failure('value is not %s' % self.value_type.__name__)
diff --git a/trafaret/contrib/rfc_3339.py b/trafaret/contrib/rfc_3339.py
index 7b6e824..a32770b 100644
--- a/trafaret/contrib/rfc_3339.py
+++ b/trafaret/contrib/rfc_3339.py
@@ -16,19 +16,10 @@ def __init__(self, allow_blank=False):
def __repr__(self):
return "" if self.allow_blank else ""
- def converter(self, value):
+ def check_and_return(self, value):
if isinstance(value, datetime):
return value
try:
return parse(value)
except ValueError as e:
self._failure(e.message)
-
- def check_and_return(self, value):
- if isinstance(value, str_types):
- if len(value) > 0 or self.allow_blank:
- return value
- if isinstance(value, datetime):
- return value
-
- self._failure('value is not valid')
diff --git a/trafaret/dataerror.py b/trafaret/dataerror.py
new file mode 100644
index 0000000..7961056
--- /dev/null
+++ b/trafaret/dataerror.py
@@ -0,0 +1,33 @@
+from .lib import _empty
+
+
+class DataError(ValueError):
+ """
+ Error with data preserve
+ error can be a message or None if error raised in childs
+ data can be anything
+ """
+ __slots__ = ['error', 'name', 'value', 'trafaret']
+
+ def __init__(self, error=None, name=None, value=_empty, trafaret=None):
+ self.error = error
+ self.name = name
+ self.value = value
+ self.trafaret = trafaret
+
+ def __str__(self):
+ return str(self.error)
+
+ def __repr__(self):
+ return 'DataError(%s)' % str(self)
+
+ def as_dict(self, value=False):
+ def as_dict(dataerror):
+ if not isinstance(dataerror.error, dict):
+ if value and dataerror.value != _empty:
+ return '%s, got %r' % (str(dataerror.error), dataerror.value)
+ else:
+ return str(dataerror.error)
+ return dict((k, v.as_dict(value=value) if isinstance(v, DataError) else v)
+ for k, v in dataerror.error.items())
+ return as_dict(self)
diff --git a/trafaret/internet.py b/trafaret/internet.py
new file mode 100644
index 0000000..260fd93
--- /dev/null
+++ b/trafaret/internet.py
@@ -0,0 +1,142 @@
+# -*- coding: utf-8 -*-
+
+import re
+from .regexp import Regexp
+from .base import DataError, String, Bytes, OnError
+from .lib import py3
+
+if py3:
+ import urllib.parse as urlparse
+else:
+ import urlparse
+
+
+MAX_EMAIL_LEN = 254
+
+
+EMAIL_REGEXP = re.compile(
+ # dot-atom
+ r"(?P^[-!#$%&'*+/=?^_`{}|~0-9A-Z]+(\.[-!#$%&'*+/=?^_`{}|~0-9A-Z]+)*"
+ # quoted-string
+ r'|^"([\001-\010\013\014\016-\037!#-\[\]-\177]|\\[\001-011\013\014\016-\177])*"'
+ # domain
+ r')@(?P(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)$)'
+ # literal form, ipv4 address (SMTP 4.1.3)
+ r'|\[(25[0-5]|2[0-4]\d|[0-1]?\d?\d)(\.(25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}\]$',
+ re.IGNORECASE,
+)
+
+
+def email_idna_encode(value):
+ if '@' in value:
+ parts = value.split('@')
+ try:
+ parts[-1] = parts[-1].encode('idna').decode('ascii')
+ return '@'.join(parts)
+ except UnicodeError:
+ pass
+ return value
+
+
+email_regexp_trafaret = OnError(Regexp(EMAIL_REGEXP), 'value is not a valid email address')
+email_trafaret = email_regexp_trafaret | ((Bytes('utf-8') | String()) & email_idna_encode & email_regexp_trafaret)
+Email = String(allow_blank=True) & OnError(
+ String(max_length=MAX_EMAIL_LEN) & email_trafaret,
+ 'value is not a valid email address',
+)
+
+
+URL_REGEXP = re.compile(
+ r'^(?:http|ftp)s?://' # http:// or https://
+ r'(?:\S+(?::\S*)?@)?' # user and password
+ r'(?:(?:[A-Z0-9](?:[A-Z0-9-_]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
+ r'localhost|' # localhost...
+ r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
+ r'(?::\d+)?' # optional port
+ r'(?:/?|[/?]\S+)$',
+ re.IGNORECASE,
+)
+URLRegexp = OnError(Regexp(URL_REGEXP), 'value is not URL')
+URLRegexp = Regexp(URL_REGEXP)
+
+
+def decode_url_idna(value):
+ try:
+ scheme, netloc, path, query, fragment = urlparse.urlsplit(value)
+ netloc = netloc.encode('idna').decode('ascii') # IDN -> ACE
+ except UnicodeError: # invalid domain part
+ pass
+ else:
+ return urlparse.urlunsplit((scheme, netloc, path, query, fragment))
+ return value
+
+
+URL = OnError(
+ URLRegexp | ((Bytes('utf-8') | String()) & decode_url_idna & URLRegexp),
+ 'value is not URL',
+)
+
+
+class IPv4(Regexp):
+ """
+ >>> IPv4().check('127.0.0.1')
+ '127.0.0.1'
+ """
+
+ regex = re.compile(
+ r'^((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])$', # noqa
+ )
+
+ def __init__(self):
+ super(IPv4, self).__init__(self.regex)
+
+ def check_and_return(self, value):
+ try:
+ return super(IPv4, self).check_and_return(value)
+ except DataError:
+ self._failure('value is not IPv4 address')
+
+ def __repr__(self):
+ return ''
+
+
+class IPv6(Regexp):
+ """
+ >>> IPv6().check('2001:0db8:0000:0042:0000:8a2e:0370:7334')
+ '2001:0db8:0000:0042:0000:8a2e:0370:7334'
+ """
+
+ regex = re.compile(
+ r'^('
+ r'(::)|'
+ r'(::[0-9a-f]{1,4})|'
+ r'([0-9a-f]{1,4}:){7,7}[0-9a-f]{1,4}|'
+ r'([0-9a-f]{1,4}:){1,7}:|'
+ r'([0-9a-f]{1,4}:){1,6}:[0-9a-f]{1,4}|'
+ r'([0-9a-f]{1,4}:){1,5}(:[0-9a-f]{1,4}){1,2}|'
+ r'([0-9a-f]{1,4}:){1,4}(:[0-9a-f]{1,4}){1,3}|'
+ r'([0-9a-f]{1,4}:){1,3}(:[0-9a-f]{1,4}){1,4}|'
+ r'([0-9a-f]{1,4}:){1,2}(:[0-9a-f]{1,4}){1,5}|'
+ r'[0-9a-f]{1,4}:((:[0-9a-f]{1,4}){1,6})|'
+ r':((:[0-9a-f]{1,4}){1,7}:)|'
+ r'fe80:(:[0-9a-f]{0,4}){0,4}%[0-9a-z]{1,}|'
+ r'::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|' # noqa
+ r'([0-9a-f]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])' # noqa
+ r')$',
+ re.IGNORECASE,
+ )
+
+ def __init__(self):
+ super(IPv6, self).__init__(self.regex)
+
+ def check_and_return(self, value):
+ try:
+ return super(IPv6, self).check_and_return(value)
+ except DataError:
+ self._failure('value is not IPv6 address')
+
+ def __repr__(self):
+ return ''
+
+
+IP = OnError(IPv4 | IPv6, 'value is not IP address')
diff --git a/trafaret/keys.py b/trafaret/keys.py
new file mode 100644
index 0000000..177868d
--- /dev/null
+++ b/trafaret/keys.py
@@ -0,0 +1,38 @@
+import trafaret as t
+
+
+def xor_key(first, second, trafaret):
+ trafaret = t.Trafaret._trafaret(trafaret)
+
+ def check_(value):
+ if (first in value) ^ (second in value):
+ key = first if first in value else second
+ yield first, t.catch_error(trafaret, value[key]), (key,)
+ elif first in value and second in value:
+ yield first, t.DataError(error=f'correct only if {second} is not defined'), (first,)
+ yield second, t.DataError(error=f'correct only if {first} is not defined'), (second,)
+ else:
+ yield first, t.DataError(error=f'is required if {second} is not defined'), (first,)
+ yield second, t.DataError(error=f'is required if {first} is not defined'), (second,)
+
+ return check_
+
+
+def confirm_key(name, confirm_name, trafaret):
+ def check_(value):
+ first, second = None, None
+ if name in value:
+ first = value[name]
+ else:
+ yield name, t.DataError('is required'), (name,)
+ if confirm_name in value:
+ second = value[confirm_name]
+ else:
+ yield confirm_name, t.DataError('is required'), (confirm_name,)
+ if not (first and second):
+ return
+ yield name, t.catch_error(trafaret, first), (name,)
+ yield confirm_name, t.catch_error(trafaret, second), (confirm_name,)
+ if first != second:
+ yield confirm_name, t.DataError(f'must be equal to {name}'), (confirm_name,)
+ return check_
diff --git a/trafaret/lib.py b/trafaret/lib.py
index 11226d1..83922b5 100644
--- a/trafaret/lib.py
+++ b/trafaret/lib.py
@@ -1,7 +1,18 @@
import sys
+import inspect
py3 = sys.version_info[0] == 3
+py36 = sys.version_info >= (3, 6, 0)
+
+
+if py3:
+ getargspec = inspect.getfullargspec
+else:
+ getargspec = inspect.getargspec
+
+
+_empty = object()
def py3metafix(cls):
@@ -11,3 +22,12 @@ def py3metafix(cls):
newcls = cls.__metaclass__(cls.__name__, (cls,), {})
newcls.__doc__ = cls.__doc__
return newcls
+
+
+def call_with_context_if_support(callble, value, context):
+ if not inspect.isfunction(callble) and hasattr(callble, '__call__'):
+ callble = callble.__call__
+ if 'context' in getargspec(callble).args:
+ return callble(value, context=context)
+ else:
+ return callble(value)
diff --git a/trafaret/regexp.py b/trafaret/regexp.py
new file mode 100644
index 0000000..2130f50
--- /dev/null
+++ b/trafaret/regexp.py
@@ -0,0 +1,29 @@
+import re
+from .base import Trafaret, str_types
+
+
+class RegexpRaw(Trafaret):
+ """
+ Check if given string match given regexp
+ """
+ __slots__ = ('regexp', 'raw_regexp')
+
+ def __init__(self, regexp):
+ self.regexp = re.compile(regexp) if isinstance(regexp, str_types) else regexp
+ self.raw_regexp = self.regexp.pattern if self.regexp else None
+
+ def check_and_return(self, value):
+ if not isinstance(value, str_types):
+ self._failure("value is not a string", value=value)
+ match = self.regexp.match(value)
+ if not match:
+ self._failure('does not match pattern %s' % self.raw_regexp, value=value)
+ return match
+
+ def __repr__(self):
+ return ''
+
+
+class Regexp(RegexpRaw):
+ def check_and_return(self, value):
+ return super(Regexp, self).check_and_return(value).group()
diff --git a/trafaret/utils.py b/trafaret/utils.py
index aa77230..87a2866 100644
--- a/trafaret/utils.py
+++ b/trafaret/utils.py
@@ -3,7 +3,7 @@
"""
import collections
from itertools import groupby
-from . import _dd
+from . import _dd # noqa
def recursive_unfold(data, prefix='', delimeter='__'):
@@ -97,7 +97,9 @@ def deep(data):
return [i[1] for i in sorted(collect.items())]
return collect
- data_ = [(split(key, delimeter), value)
- for key, value in sorted(data.items())]
+ data_ = [
+ (split(key, delimeter), value)
+ for key, value in sorted(data.items())
+ ]
result = deep(data_)
return result[prefix] if prefix else result
diff --git a/trafaret/visitor.py b/trafaret/visitor.py
index 4b3de76..6447cb8 100644
--- a/trafaret/visitor.py
+++ b/trafaret/visitor.py
@@ -1,5 +1,6 @@
-""" This module is expirement. API and implementation are unstable.
- Supposed to use with ``Request`` object or something like that.
+"""
+This module is expirement. API and implementation are unstable.
+Supposed to use with ``Request`` object or something like that.
"""
from collections import Mapping
from . import Trafaret, DataError, Key, catch_error, _empty
@@ -42,8 +43,13 @@ class DeepKey(Key):
def pop(self, data):
try:
- yield self.get_name(), catch_error(self.trafaret,
- get_deep_attr(data, self.name.split('.')))
+ yield (
+ self.get_name(),
+ catch_error(
+ self.trafaret,
+ get_deep_attr(data, self.name.split('.')),
+ )
+ )
except DataError as e:
if self.default != _empty:
yield self.get_name(), self.default