Source code for thorn.validators

"""Recipient Validators."""
from __future__ import absolute_import, unicode_literals

import socket

from ipaddress import ip_address, ip_network
from six import text_type

from kombu.utils.url import urlparse

from .exceptions import SecurityError

__all__ = [
    'ensure_protocol', 'ensure_port',
    'block_internal_ips', 'block_cidr_network',

validators = {}

def validator(fun):
    # type: (Callable) -> Callable
    """Make validator json serializable."""
    validators[fun.__name__] = fun
    return fun

def serialize_validator(v):
    # type: (Callable) -> Tuple[str, List]
    return (v._validator, v._args)

def deserialize_validator(v):
    # type: (Union[List, Tuple[str, List]) -> Callable
    if isinstance(v, (list, tuple)):
        name, args = v
        return validators[name](*args)
    return v

def _is_internal_address(addr):
    # type: (str) -> bool
    return any([

[docs]@validator def ensure_protocol(*allowed): # type: (*str) -> Callable """Only allow recipient URLs using specific protocols. Example: >>> ensure_protocol('https', 'http://') """ allowed = tuple( x if '://' in x else x + '://' for x in allowed ) def validate_protocol(recipient_url): # type: (str) -> None if not recipient_url.startswith(allowed): raise SecurityError( 'Protocol of recipient URL not allowed ({0} only)'.format( allowed)) validate_protocol._args = allowed validate_protocol._validator = 'ensure_protocol' return validate_protocol
[docs]@validator def ensure_port(*allowed): # type: (*Union[int, str]) -> Callable """Validator that ensures port is member of set allowed.""" allowed = tuple(int(p) for p in allowed) def validate_port(recipient_url): # type: (str) -> None port = urlparse(recipient_url).port if port and int(port) not in allowed: raise SecurityError( 'Port of recipient URL {0} not allowed ({1} only)'.format( recipient_url, allowed)) validate_port._args = allowed validate_port._validator = 'ensure_port' return validate_port
def _url_ip_address(url): # type: (str) -> ipaddress._IPAddressBase try: return ip_address(text_type(url)) except ValueError: host = urlparse(url).hostname return ip_address(text_type(socket.gethostbyname(host)))
[docs]@validator def block_internal_ips(): # type: () -> Callable """Block recipient URLs that have an internal IP address. Warning: This does not check for *private* networks, it will only make sure the IP address is not in a reserved private block (e.g. """ def validate_not_internal_ip(recipient_url): # type: (str) -> None addr = _url_ip_address(recipient_url) if _is_internal_address(addr): raise SecurityError( 'IP address of recipient {0}={1} considered private!'.format( recipient_url, addr)) validate_not_internal_ip._args = () validate_not_internal_ip._validator = 'block_internal_ips' return validate_not_internal_ip
[docs]@validator def block_cidr_network(*blocked_networks): # type: (*str) -> Callable """Block recipient URLs from a list of CIDR networks. Example: >>> block_cidr_network('', '') """ _blocked_networks = [ip_network(text_type(x)) for x in blocked_networks] def validate_cidr(recipient_url): # type: (str) -> None recipient_addr = _url_ip_address(recipient_url) for blocked_network in _blocked_networks: if recipient_addr in blocked_network: raise SecurityError( 'IP address of recipient {0}={1} is in network {2}'.format( recipient_url, recipient_addr, blocked_network, )) validate_cidr._args = blocked_networks validate_cidr._validator = 'block_cidr_network' return validate_cidr