Source code for thorn.validators
"""Recipient Validators."""
from __future__ import absolute_import, unicode_literals
import socket
from ipaddress import ip_address, ip_network
from six import text_type
from kombu.utils.url import urlparse
from .exceptions import SecurityError
__all__ = [
'ensure_protocol', 'ensure_port',
'block_internal_ips', 'block_cidr_network',
]
validators = {}
def validator(fun):
# type: (Callable) -> Callable
"""Make validator json serializable."""
validators[fun.__name__] = fun
return fun
def serialize_validator(v):
# type: (Callable) -> Tuple[str, List]
return (v._validator, v._args)
def deserialize_validator(v):
# type: (Union[List, Tuple[str, List]) -> Callable
if isinstance(v, (list, tuple)):
name, args = v
return validators[name](*args)
return v
def _is_internal_address(addr):
# type: (str) -> bool
return any([
addr.is_private,
addr.is_reserved,
addr.is_loopback,
addr.is_multicast,
])
@validator
[docs]def ensure_protocol(*allowed):
# type: (*str) -> Callable
"""Only allow recipient URLs using specific protocols.
Example:
>>> ensure_protocol('https', 'http://')
"""
allowed = tuple(
x if '://' in x else x + '://'
for x in allowed
)
def validate_protocol(recipient_url):
# type: (str) -> None
if not recipient_url.startswith(allowed):
raise SecurityError(
'Protocol of recipient URL not allowed ({0} only)'.format(
allowed))
validate_protocol._args = allowed
validate_protocol._validator = 'ensure_protocol'
return validate_protocol
@validator
[docs]def ensure_port(*allowed):
# type: (*Union[int, str]) -> Callable
"""Validator that ensures port is member of set allowed."""
allowed = tuple(int(p) for p in allowed)
def validate_port(recipient_url):
# type: (str) -> None
port = urlparse(recipient_url).port
if port and int(port) not in allowed:
raise SecurityError(
'Port of recipient URL {0} not allowed ({1} only)'.format(
recipient_url, allowed))
validate_port._args = allowed
validate_port._validator = 'ensure_port'
return validate_port
def _url_ip_address(url):
# type: (str) -> ipaddress._IPAddressBase
try:
return ip_address(text_type(url))
except ValueError:
host = urlparse(url).hostname
return ip_address(text_type(socket.gethostbyname(host)))
@validator
[docs]def block_internal_ips():
# type: () -> Callable
"""Block recipient URLs that have an internal IP address.
Warning:
This does not check for *private* networks, it will only
make sure the IP address is not in a reserved private block
(e.g. 192.168.0.1/24).
"""
def validate_not_internal_ip(recipient_url):
# type: (str) -> None
addr = _url_ip_address(recipient_url)
if _is_internal_address(addr):
raise SecurityError(
'IP address of recipient {0}={1} considered private!'.format(
recipient_url, addr))
validate_not_internal_ip._args = ()
validate_not_internal_ip._validator = 'block_internal_ips'
return validate_not_internal_ip
@validator
[docs]def block_cidr_network(*blocked_networks):
# type: (*str) -> Callable
"""Block recipient URLs from a list of CIDR networks.
Example:
>>> block_cidr_network('192.168.0.0/24', '132.34.23.0/24')
"""
_blocked_networks = [ip_network(text_type(x)) for x in blocked_networks]
def validate_cidr(recipient_url):
# type: (str) -> None
recipient_addr = _url_ip_address(recipient_url)
for blocked_network in _blocked_networks:
if recipient_addr in blocked_network:
raise SecurityError(
'IP address of recipient {0}={1} is in network {2}'.format(
recipient_url, recipient_addr, blocked_network,
))
validate_cidr._args = blocked_networks
validate_cidr._validator = 'block_cidr_network'
return validate_cidr