Sophie

Sophie

distrib > Mandriva > 2010.0 > i586 > media > contrib-release > by-pkgid > dca483b59ba61f3fa092de932ddd570e > files > 741

nuface-2.0.14-2mdv2009.1.i586.rpm

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Copyright(C) 2007 INL
Written by Damien Boucard <damien.boucard AT inl.fr>

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, version 3 of the License.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.

---
descmodels.py contains all data structure to manage desc.xml contents.
"""

import re
from IPy import IP
from warnings import warn
from checkdesc.unicity import check_unicity #, check_increasing
from checkdesc.desc_warnings import TypeWarning, NameWarning, \
    VersionWarning, ConsistencyWarning
from checkdesc.relation import Relation, RelationSet

class Desc:
    """
    Represents the root tag <network>
    """
    latest_version = "1.3"

    def __init__(self, version=None):
        self.version = self.latest_version
        if version is not None:
            self.version = version
        if self.version != self.latest_version:
            warn(VersionWarning('Using a deprecated desc version: "%s"; recommending latest version: "%s"'\
                            %(self.version, self.latest_version)), stacklevel=2)
        self.firewalls = []
        self.networks = []

    def check_integrity(self):
        # checks unicity of attributes
        #check_increasing("fw", "id", self.firewalls)
        for firewall in self.firewalls:
            #check_increasing("interface", "id", firewall.interfaces)
            check_unicity("name", firewall.interfaces)
            check_unicity("addr", firewall.interfaces, None)
            if not firewall.internet_connection:
                raise ValueError("fw %s: no default gateway is set; one expected."
                                                              %(firewall.name))
            if len(firewall.internet_connection) > 1:
                raise ValueError("fw %s: too much default gateway are set; only one expected."
                                                              %(firewall.name))
#            for iface in firewall.interfaces:
#                check_increasing("address", "id", iface)
#        check_increasing("net", "id", self.networks)

        # checks interface addresses included in networks
        for network in self.networks:
            for connection in network:
                if isinstance(connection, DirectConnection):
                    addr_in_net = False
                    addr_list = []
                    for address in connection.interface:
                        addr_list.append(str(address.addr))
                        if address.addr in network.addr:
                            addr_in_net = True
                            break
                    if not addr_in_net:
                        raise ValueError("fw %s: interface %s has no address included in %s; available addresses: %s"
                                        %(connection.interface.firewall.name,
                                          connection.interface.name,
                                          network.addr, addr_list            ))
#                else: #isinstance(connection, RoutedConnection)
#                    gateway_in_net = False
#                    net_list = []
#                    for iface_conn in connection.interface.connections:
#                        if isinstance(iface_conn, DirectConnection):
#                            net_list.append(str(iface_conn.network.addr))
#                            if connection.gateway in iface_conn.network.addr:
#                                gateway_in_net = True
#                                break
#                    if not gateway_in_net:
#                        raise ValueError("fw %s: interface %s has no network where the gateway %s is included in; available networks: %s"
#                                        %(connection.interface.firewall.name,
#                                          connection.interface.name,
#                                          connection.gateway, net_list       ))
                if isinstance(connection, InternetConnection):
                    if connection.default_gateway not in connection.network.addr:
                        warn(ConsistencyWarning("fw %s: the default gateway %s is not included in the network %s"
                                        %(connection.interface.firewall.name,
                                          connection.default_gateway,
                                          connection.network.addr)))


class Firewall:
    """
    Represents <fw id="1" type="nufw" name="central" queue="0">
    """
    type_set = ("nufw",) # type_set[0] by default
    name_regex = re.compile(r'^[0-9a-zA-Z\.\-_ ]*$')
    queue_max = 65534

    def __init__(self, queue, name="", type=None, id=None):
        self.internet_connection = RelationSet(InternetConnection, interface__firewall=self)
        # queue property
        try:
            self.queue = int(queue)
        except (TypeError, ValueError):
            raise ValueError('Invalid queue value: "%s"; must be a positive numerical value.'
                                                                      %(queue))
        if self.queue < 0:
            raise ValueError('Negative queue value: "%s"; must be a positive numerical value.'
                                                                 %(self.queue))
        elif self.queue >= self.queue_max:
            raise ValueError('Too high queue value: "%s"; must be lesser or equal to %s.'
                                                 %(self.queue, self.queue_max))

        # type property
        self.type = self.type_set[0]
        if str(type) in self.type_set:
            self.type = str(type)
        elif type is None :
            warn(TypeWarning('Omitted type property; set to "%s" by default.'
                                             %(self.type_set[0])),stacklevel=2)
        elif str(type).strip().lower() in self.type_set:
            self.type = str(type).strip().lower()
            warn(TypeWarning('Malformed type value: "%s"; fixed to "%s".'
                              %(type, str(type).strip().lower())),stacklevel=2)
        else:
            raise ValueError('Invalid type value: "%s"; must be one of these values : "%s"'
                                           %(type, '", "'.join(self.type_set)))

        # name property
        if str(name) == "":
            warn(NameWarning("Empty name value; a non-empty value is recommended."),
                                                                  stacklevel=2)
        elif not self.name_regex.match(str(name)):
            raise ValueError('Invalid name value: "%s"; must be a strictly alphanumerical value.'
                                                                       %(name))
        self.name = str(name)

        # id property
        if id is not None:
            try:
                self.id = int(id)
            except ValueError:
                raise ValueError('Invalid id value: "%s"; must be a non-zero positive numerical value.'
                                                                         %(id))
            if self.id <= 0:
                raise ValueError('Negative or null id value: "%s"; must be a non-zero positive numerical value.'
                                                                    %(self.id))

        # sub-elements
        self.interfaces = []

class Interface(list):
    """
    Represents <interface id="1" name="eth0">
    """
    name_regex_base = r'[0-9a-zA-Z]+(\.[0-9]+)?'
    name_regex = re.compile('^%s$' %(name_regex_base))
    alias_regex = re.compile(r'^%s:%s$' %(name_regex_base, r'\d+'))

    def __init__(self, firewall, name, id=None, is_vlan=False):
        self.firewall = firewall
        self.connections = RelationSet(Connection, interface=self)

        # name property
        if self.alias_regex.match(str(name)):
            raise ValueError('Invalid name value: "%s"; this value looks like an alias interface; "address" sub-elements should be moved into the "interface" element named "%s".'
                                                   %(name, name.split(':')[0]))
        elif name is None or not self.name_regex.match(str(name)):
            raise ValueError('Invalid name value: "%s"; must be a strictly alphanumerical value.'
                                                                       %(name))
        self.name = str(name)

        # id property
        if id is not None:
            try:
                self.id = int(id)
            except ValueError:
                raise ValueError('Invalid id value: "%s"; must be a non-zero positive numerical value.'
                                                                         %(id))
            if self.id <= 0:
                raise ValueError('Negative or null id value: "%s"; must be a non-zero positive numerical value.'
                                                                    %(self.id))
        self.is_vlan = is_vlan

class Address:
    """
    Represents <address id="1" addr="10.60.1.1" />
    """
    def __init__(self, addr, id=None, type="ipv4"):
        # addr property
        if not isinstance(addr, IP):
            raise ValueError("'addr' argument must be a IPy.IP instance; given: %s (%s)"
                                                           %(addr, type(addr)))
        self.addr = addr

        # id property
        if id is not None:
            try:
                self.id = int(id)
            except ValueError:
                raise ValueError('Invalid id value: "%s"; must be a non-zero positive numerical value.'
                                                                         %(id))
            if self.id <= 0:
                raise ValueError('Negative or null id value: "%s"; must be a non-zero positive numerical value.'
                                                                    %(self.id))
        self.type = type

class Network(RelationSet):
    """
    Represents <net id="1" name="INTERCO" type="ipv4" addr="10.60.1.0/24">
    """
    name_regex = re.compile(r'^[0-9a-zA-Z_]*$')
    name_maxlen = 14
    name_blacklist = ("LOCAL",)
    type_set = ("ipv4", "mark")

    def __init__(self, name, type, addr, id=None, remote=None, enabled=True, local_id=None):
        RelationSet.__init__(self, Connection, network=self)

        # name property
        if name is None or not self.name_regex.match(str(name)):
            raise ValueError('Invalid name value: "%s"; must be a strictly alphanumerical value.'
                                                                       %(name))
        self.name = str(name)
        if self.name.upper() in self.name_blacklist:
            raise ValueError('Forbidden type value: "%s"; must not be one of these values : "%s".'
                                %(self.name, '", "'.join(self.name_blacklist)))
        if self.name.upper() != self.name:
            warn(NameWarning('Note: network names are commonly in upper case ; given name: "%s".'
                                                    %(self.name)),stacklevel=2)
        if len(self.name) == 0:
            raise ValueError('Invalid name value: cannot be an empty string.')
        if len(self.name) > self.name_maxlen:
            raise ValueError('Too long name value: "%s" (%s characters); must be lesser or equal to %s characters.'
                                %(self.name, len(self.name), self.name_maxlen))

        # type property
        if str(type) in self.type_set:
            self.type = str(type)
        elif str(type).strip().lower() in self.type_set:
            self.type = str(type).strip().lower()
            warn(TypeWarning('Malformed type value: "%s"; fixed to "%s".'
                              %(type, str(type).strip().lower())),stacklevel=2)
        else:
            raise ValueError('Invalid type value: "%s"; must be one of these values : "%s"'
                                           %(type, '", "'.join(self.type_set)))

        # addr property
        if not isinstance(addr, IP):
            raise ValueError("'addr' argument must be a IPy.IP instance; given: %s (%s)"
                                                           %(addr, type(addr)))
        self.addr = addr
        #if addr is None:
        #    raise ValueError('Omitted addr property; this property is mandatory.')

        # id property
        if id is not None:
            try:
                self.id = int(id)
            except ValueError:
                raise ValueError('Invalid id value: "%s"; must be a non-zero positive numerical value.'
                                                                         %(id))
            if self.id <= 0:
                raise ValueError('Negative or null id value: "%s"; must be a non-zero positive numerical value.'
                                                                    %(self.id))

        self.remote = remote
        self.enabled = enabled
        self.local_id = local_id

class Connection(Relation):
    """
    Abstract class.
    """
    def __init__(self, network, interface):
        Relation.__init__(self)
        self.network = network
        self.interface = interface

class DirectConnection(Connection):
    """
    Represents <connection direct="1" fwid="1" iface="2" />
    """
    pass

class InternetConnection(DirectConnection):
    """
    Represents <connection direct="1" fwid="1" iface="1" dftgateway="10.60.1.254" />
    """
    def __init__(self, network, interface, default_gateway):
        DirectConnection.__init__(self, network, interface)
        if not isinstance(default_gateway, IP):
            raise ValueError("'default_gateway' argument must be a IPy.IP instance; given: %s (%s)"
                                     %(default_gateway, type(default_gateway)))
        self.default_gateway = default_gateway

class RoutedConnection(Connection):
    """
    Represents <connection direct="0" fwid="1" iface="3" gateway="192.168.0.1" />
    """
    def __init__(self, network, interface, gateway):
        Connection.__init__(self, network, interface)
        if not isinstance(gateway, IP):
            raise ValueError("'gateway' argument must be a IPy.IP instance; given: %s (%s)"
                                                     %(gateway, type(gateway)))
        self.gateway = gateway