#!/usr/bin/env python # -*- coding: utf-8 -*- """ Copyright(C) 2007 INL Written by Damien Boucard <damien.boucard AT inl.fr> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, version 3 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, see <http://www.gnu.org/licenses/>. --- descmodels.py contains all data structure to manage desc.xml contents. """ import re from IPy import IP from warnings import warn from checkdesc.unicity import check_unicity #, check_increasing from checkdesc.desc_warnings import TypeWarning, NameWarning, \ VersionWarning, ConsistencyWarning from checkdesc.relation import Relation, RelationSet class Desc: """ Represents the root tag <network> """ latest_version = "1.3" def __init__(self, version=None): self.version = self.latest_version if version is not None: self.version = version if self.version != self.latest_version: warn(VersionWarning('Using a deprecated desc version: "%s"; recommending latest version: "%s"'\ %(self.version, self.latest_version)), stacklevel=2) self.firewalls = [] self.networks = [] def check_integrity(self): # checks unicity of attributes #check_increasing("fw", "id", self.firewalls) for firewall in self.firewalls: #check_increasing("interface", "id", firewall.interfaces) check_unicity("name", firewall.interfaces) check_unicity("addr", firewall.interfaces, None) if not firewall.internet_connection: raise ValueError("fw %s: no default gateway is set; one expected." %(firewall.name)) if len(firewall.internet_connection) > 1: raise ValueError("fw %s: too much default gateway are set; only one expected." %(firewall.name)) # for iface in firewall.interfaces: # check_increasing("address", "id", iface) # check_increasing("net", "id", self.networks) # checks interface addresses included in networks for network in self.networks: for connection in network: if isinstance(connection, DirectConnection): addr_in_net = False addr_list = [] for address in connection.interface: addr_list.append(str(address.addr)) if address.addr in network.addr: addr_in_net = True break if not addr_in_net: raise ValueError("fw %s: interface %s has no address included in %s; available addresses: %s" %(connection.interface.firewall.name, connection.interface.name, network.addr, addr_list )) # else: #isinstance(connection, RoutedConnection) # gateway_in_net = False # net_list = [] # for iface_conn in connection.interface.connections: # if isinstance(iface_conn, DirectConnection): # net_list.append(str(iface_conn.network.addr)) # if connection.gateway in iface_conn.network.addr: # gateway_in_net = True # break # if not gateway_in_net: # raise ValueError("fw %s: interface %s has no network where the gateway %s is included in; available networks: %s" # %(connection.interface.firewall.name, # connection.interface.name, # connection.gateway, net_list )) if isinstance(connection, InternetConnection): if connection.default_gateway not in connection.network.addr: warn(ConsistencyWarning("fw %s: the default gateway %s is not included in the network %s" %(connection.interface.firewall.name, connection.default_gateway, connection.network.addr))) class Firewall: """ Represents <fw id="1" type="nufw" name="central" queue="0"> """ type_set = ("nufw",) # type_set[0] by default name_regex = re.compile(r'^[0-9a-zA-Z\.\-_ ]*$') queue_max = 65534 def __init__(self, queue, name="", type=None, id=None): self.internet_connection = RelationSet(InternetConnection, interface__firewall=self) # queue property try: self.queue = int(queue) except (TypeError, ValueError): raise ValueError('Invalid queue value: "%s"; must be a positive numerical value.' %(queue)) if self.queue < 0: raise ValueError('Negative queue value: "%s"; must be a positive numerical value.' %(self.queue)) elif self.queue >= self.queue_max: raise ValueError('Too high queue value: "%s"; must be lesser or equal to %s.' %(self.queue, self.queue_max)) # type property self.type = self.type_set[0] if str(type) in self.type_set: self.type = str(type) elif type is None : warn(TypeWarning('Omitted type property; set to "%s" by default.' %(self.type_set[0])),stacklevel=2) elif str(type).strip().lower() in self.type_set: self.type = str(type).strip().lower() warn(TypeWarning('Malformed type value: "%s"; fixed to "%s".' %(type, str(type).strip().lower())),stacklevel=2) else: raise ValueError('Invalid type value: "%s"; must be one of these values : "%s"' %(type, '", "'.join(self.type_set))) # name property if str(name) == "": warn(NameWarning("Empty name value; a non-empty value is recommended."), stacklevel=2) elif not self.name_regex.match(str(name)): raise ValueError('Invalid name value: "%s"; must be a strictly alphanumerical value.' %(name)) self.name = str(name) # id property if id is not None: try: self.id = int(id) except ValueError: raise ValueError('Invalid id value: "%s"; must be a non-zero positive numerical value.' %(id)) if self.id <= 0: raise ValueError('Negative or null id value: "%s"; must be a non-zero positive numerical value.' %(self.id)) # sub-elements self.interfaces = [] class Interface(list): """ Represents <interface id="1" name="eth0"> """ name_regex_base = r'[0-9a-zA-Z]+(\.[0-9]+)?' name_regex = re.compile('^%s$' %(name_regex_base)) alias_regex = re.compile(r'^%s:%s$' %(name_regex_base, r'\d+')) def __init__(self, firewall, name, id=None, is_vlan=False): self.firewall = firewall self.connections = RelationSet(Connection, interface=self) # name property if self.alias_regex.match(str(name)): raise ValueError('Invalid name value: "%s"; this value looks like an alias interface; "address" sub-elements should be moved into the "interface" element named "%s".' %(name, name.split(':')[0])) elif name is None or not self.name_regex.match(str(name)): raise ValueError('Invalid name value: "%s"; must be a strictly alphanumerical value.' %(name)) self.name = str(name) # id property if id is not None: try: self.id = int(id) except ValueError: raise ValueError('Invalid id value: "%s"; must be a non-zero positive numerical value.' %(id)) if self.id <= 0: raise ValueError('Negative or null id value: "%s"; must be a non-zero positive numerical value.' %(self.id)) self.is_vlan = is_vlan class Address: """ Represents <address id="1" addr="10.60.1.1" /> """ def __init__(self, addr, id=None, type="ipv4"): # addr property if not isinstance(addr, IP): raise ValueError("'addr' argument must be a IPy.IP instance; given: %s (%s)" %(addr, type(addr))) self.addr = addr # id property if id is not None: try: self.id = int(id) except ValueError: raise ValueError('Invalid id value: "%s"; must be a non-zero positive numerical value.' %(id)) if self.id <= 0: raise ValueError('Negative or null id value: "%s"; must be a non-zero positive numerical value.' %(self.id)) self.type = type class Network(RelationSet): """ Represents <net id="1" name="INTERCO" type="ipv4" addr="10.60.1.0/24"> """ name_regex = re.compile(r'^[0-9a-zA-Z_]*$') name_maxlen = 14 name_blacklist = ("LOCAL",) type_set = ("ipv4", "mark") def __init__(self, name, type, addr, id=None, remote=None, enabled=True, local_id=None): RelationSet.__init__(self, Connection, network=self) # name property if name is None or not self.name_regex.match(str(name)): raise ValueError('Invalid name value: "%s"; must be a strictly alphanumerical value.' %(name)) self.name = str(name) if self.name.upper() in self.name_blacklist: raise ValueError('Forbidden type value: "%s"; must not be one of these values : "%s".' %(self.name, '", "'.join(self.name_blacklist))) if self.name.upper() != self.name: warn(NameWarning('Note: network names are commonly in upper case ; given name: "%s".' %(self.name)),stacklevel=2) if len(self.name) == 0: raise ValueError('Invalid name value: cannot be an empty string.') if len(self.name) > self.name_maxlen: raise ValueError('Too long name value: "%s" (%s characters); must be lesser or equal to %s characters.' %(self.name, len(self.name), self.name_maxlen)) # type property if str(type) in self.type_set: self.type = str(type) elif str(type).strip().lower() in self.type_set: self.type = str(type).strip().lower() warn(TypeWarning('Malformed type value: "%s"; fixed to "%s".' %(type, str(type).strip().lower())),stacklevel=2) else: raise ValueError('Invalid type value: "%s"; must be one of these values : "%s"' %(type, '", "'.join(self.type_set))) # addr property if not isinstance(addr, IP): raise ValueError("'addr' argument must be a IPy.IP instance; given: %s (%s)" %(addr, type(addr))) self.addr = addr #if addr is None: # raise ValueError('Omitted addr property; this property is mandatory.') # id property if id is not None: try: self.id = int(id) except ValueError: raise ValueError('Invalid id value: "%s"; must be a non-zero positive numerical value.' %(id)) if self.id <= 0: raise ValueError('Negative or null id value: "%s"; must be a non-zero positive numerical value.' %(self.id)) self.remote = remote self.enabled = enabled self.local_id = local_id class Connection(Relation): """ Abstract class. """ def __init__(self, network, interface): Relation.__init__(self) self.network = network self.interface = interface class DirectConnection(Connection): """ Represents <connection direct="1" fwid="1" iface="2" /> """ pass class InternetConnection(DirectConnection): """ Represents <connection direct="1" fwid="1" iface="1" dftgateway="10.60.1.254" /> """ def __init__(self, network, interface, default_gateway): DirectConnection.__init__(self, network, interface) if not isinstance(default_gateway, IP): raise ValueError("'default_gateway' argument must be a IPy.IP instance; given: %s (%s)" %(default_gateway, type(default_gateway))) self.default_gateway = default_gateway class RoutedConnection(Connection): """ Represents <connection direct="0" fwid="1" iface="3" gateway="192.168.0.1" /> """ def __init__(self, network, interface, gateway): Connection.__init__(self, network, interface) if not isinstance(gateway, IP): raise ValueError("'gateway' argument must be a IPy.IP instance; given: %s (%s)" %(gateway, type(gateway))) self.gateway = gateway