Sophie

Sophie

distrib > Mandriva > 2010.0 > i586 > media > contrib-release > by-pkgid > dca483b59ba61f3fa092de932ddd570e > files > 761

nuface-2.0.14-2mdv2009.1.i586.rpm

# Copyright(C) 2005 INL
# Written by Jean Gillaux <jean@inl.fr>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
#  This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.

from copy import deepcopy
from IPy import IP
from nupyf.nuerror import Error
from nupyf.nuelt import Elt, EltSet

def check_local_ipv4(eltg, local_ips):
    """Checks that 'net' property of local_ipv4 objects is an ip address of the fw
    """
    if not eltg:
        return True
    for elts in eltg.iter_eltsets():
        for elt in elts:
            addr = elt.get('net')
            if elt.get('is_local') and addr and addr not in local_ips:
                return False
    return True

def check_rule_network(acl_id, network, typestr):
    if not network:
        return
    has_net = False
    for elt in network:
        if not elt.get('net'):
            continue
        if has_net:
            raise Error(
                "ACL %s: the %s must not contain "
                "several network definitions" % (acl_id, typestr))
        has_net = True

def getAclInterface(acl, addr):
    iface = acl.fw().get_iface_by_net_addr(addr)
    if not iface:
        return None
    return iface

class Rule:
    """Represents a rule from an acl
    """
    def __init__(self, acl, src, dest,
    proto=None, appli=None, os=None, aclname=''):
        self.acl = acl
        check_rule_network(acl.id, src, 'source')
        check_rule_network(acl.id, dest, 'destination')
        self.is_auto = 0
        self.__src = src
        self.__dest = dest
        if self.__src:
            self.__src.merge_elts()
        if self.__dest:
            self.__dest.merge_elts()
        self.aclid = acl.id
        self.decision = acl.decision
        self.ulog_prefix = acl.ulog_prefix
        self.aclname = aclname
        self.netdst = None
        self.netsrc = None
        self.cpt = 0
        self.period_name = '' #time period name
        self.sort_order = None
        if proto is None:
            self._proto = EltSet([Elt()])
        else:
            self._proto = proto
            self._proto.merge_elts()
        if appli is None:
            self._appli = EltSet([Elt()])
        else:
            self._appli = appli
            self._appli.merge_elts()
        if os is None:
            self._os = EltSet([Elt()])
        else:
            self._os = os
            self._os.merge_elts()
        self.log = acl.log
        self.sort_order = acl.sort_order

        if self.src():
            self.ip_src = IP(self.elt_src().get('numnet'))
        else:
            self.ip_src = None

        if self.dest():
            self.ip_dest = IP(self.elt_dest().get('numnet'))
        else:
            self.ip_dest = None

        if self.ip_src:
            self.input_iface = getAclInterface(acl, self.ip_src)
        else:
            self.input_iface = None
        if self.ip_dest:
            self.output_iface = getAclInterface(acl, self.ip_dest)
        else:
            self.output_iface = None

    def src(self):
        return self.__src

    def dest(self):
        return self.__dest

    def user_groups(self):
        return self.acl.user_groups

    def elt_src(self):
        return self.__src.elts[0]

    def elt_dest(self):
        return self.__dest.elts[0]

    def proto(self):
        return self._proto.elts[0]

    def appli(self):
        return self._appli.elts[0]

    def os(self):
        return self._os.elts[0]

    def use_nufw(self):
        return bool(self.acl.user_groups)

    # FIXME: replace this function by one at acl level
    # rule is not aware of its place in the network layout
    def dst_isnot_src(self): #used by Aclcheck_dst_src_rules
        """This function find out if a rule has its srs and its dst
        in the same network (that is quit idiot)
        """
        szone = self.elt_src().get('zone')
        dzone = self.elt_dest().get('zone')
        if dzone=='external' or szone=='external':
            # TODO: handle vpn zone
            return True
        ips = IP(self.elt_src().get('numnet'))
        ipd = IP(self.elt_dest().get('numnet'))
        if ips not in ipd and ipd not in ips:
            return True
        return False

    def __cmp__(self,other):
        return cmp(int(self.aclid),int(other.aclid))

    def get_dst_port_range(self,dir='dport'):
        s = self.proto().get(dir)
        if s.count(':')!=0:
            return s.split(':')
        return (s,s)

    def get_src_port_range(self):
        return self.get_dst_port_range(dir='sport')

    def __repr__(self):
        return '<Rule %s of %r>' % (self.cpt, self.acl)

    def __str__(self):
        return self.__repr__()

def _populate_iter(subject, resource, proto, appli, os):
    if appli and appli.eltset_t:
        appli_list = appli.eltset_t
    else:
        appli_list = [None]

    if os and os.eltset_t:
        os_list = os.eltset_t
    else:
        os_list = [None]

    if proto and proto.eltset_t:
        proto_list = proto.eltset_t
    else:
        proto_list = [None]

    subj_list = subject.eltset_t
    if not subj_list:
        subj_list = [None]
    res_list = resource.eltset_t
    if not res_list:
        res_list = [None]

    for elt_from in subj_list:
        for to in res_list:
            for proto in proto_list:
                for appli in appli_list:
                    for os in os_list:
                        yield (elt_from, to, proto, appli, os)


class Acl:
    """Represents an acl in a NuFW acl xml file
    """
    def __init__(self, sub, ress, firewall, name='', proto=None, id=-1,
    decision='', ulog_prefix='', appli=None, os=None, period_name='', sort_order=None,
    auth=None, log=False, acl_type='FORWARD', transparent_proxy=False):
        assert isinstance(id, int)
        self.__rule_list=[]
        self._firewall = firewall
        self.id = id
        self.decision = decision
        self.ulog_prefix = ulog_prefix
        self.__name = name
        self.cpt = 0
        self.__proto = proto
        self.sort_order = sort_order
        self.log = log
        self.acl_type = acl_type
        self.transparent_proxy = transparent_proxy
        if auth:
            for eltset in auth.eltset_t:
                eltset.merge_elts()
            self.user_groups = [int(eltset.elts[0].get('group'))
                for eltset in auth.eltset_t]
        else:
            self.user_groups = None
        assert check_local_ipv4(sub, firewall.get_local_ips())
        assert check_local_ipv4(ress, firewall.get_local_ips())
        self.__populate(sub,ress,proto, appli, os, period_name)

    def __populate(self, subject, resource, proto, appli, os, period_name):
        lnet = self._firewall.entreprise_nets()
        if self._firewall.auth_ext and self._firewall.get_ext_net():
            # include INTERNET (default) network in list used
            # to create netfilter rules for authenticated acls
            lnet.append(self._firewall.get_ext_net().ip())
        for item in _populate_iter(subject, resource, proto, appli, os):
            (eltsetfrom, eltsetto, eltsetproto, eltsetappli, eltsetos) = item
            r = Rule(self, deepcopy(eltsetfrom),
                deepcopy(eltsetto),
                proto=deepcopy(eltsetproto),
                appli=deepcopy(eltsetappli),
                os=deepcopy(eltsetos),
                aclname=self.__name)
            # Set time period name
            if period_name:
                r.period_name = period_name
            self.cpt+=1
            r.cpt = self.cpt
            if not r.dest() or r.elt_dest().get('is_local'):
                list_r = [r,]
                if not r.dest() or not r.elt_dest().get('numnet'):
                    list_r = self._input_rules_for_local_ress_without_net(r)
                if not r.elt_src().get('numnet'):
                    list_r = self._input_rules_for_local_ress_without_sujet_net(list_r)
                for _r in list_r:
                    if not _r.is_auto:
                        continue
                    self.cpt = self.cpt+1
                    _r.cpt = self.cpt
                self.__rule_list.extend(list_r)
            else:
                l = []
                l.extend(filter(lambda x: x is not None, [self._make_rule(r,hfw) for hfw in lnet]))

                # Remove idiot rules that come from a net and go to the same net
                l = self.check_dst_src_rules(l)

                if not l:
                    if not (r.elt_dest().get('is_local') and not r.elt_src().get('numnet')):
                        self.__rule_list.append(r)
                else:
                    self.__rule_list.extend(l)

    def _input_rules_for_local_ress_without_net(self, rule):
        if not rule.dest():
            return [deepcopy(rule)]
        result = []
        for addr in self._firewall.get_local_ips():
            new_rule = deepcopy(rule)
            new_rule.elt_dest().set('numnet', addr)
            new_rule.is_auto = 1
            result.append(new_rule)
        return result

    def _input_rules_for_local_ress_without_sujet_net(self, rule_list):
        result = []
        for rule in rule_list:
            for hfw in self._firewall.entreprise_nets():
                new_rule = deepcopy(rule)
                new_rule.elt_src().set('numnet', hfw)
                new_rule.is_auto = 1
                result.append(new_rule)
        return result

    def _make_rule(self,r,hfw):
        res = None
        change = 0
        if r.src() \
        and (not r.elt_src().get('is_local') and r.elt_src().get('numnet')==''):
            res = deepcopy(r)
            res.elt_src().set('numnet',hfw)
            res.elt_src().set('zone','internal')
            change = 1
            # We dont want to generate a rule that goes
            # to an iface and from a lan that is not the iface LAN.
            if self._firewall.rule_inout(res) and self._firewall.rule_dst_is_src(res) is not True:
                change = 0
        if change == 1:
            self.cpt = self.cpt+1
            res.cpt = self.cpt
            res.is_auto = 1
            return res
        return None

    def check_dst_src_rules(self, rules_list) :
        res = []
        for r in rules_list:
            #if r.dst_isnot_src():
            if not self._firewall.rule_dst_is_src(r):
                res.append(r)
            elif self._firewall.rule_inout(r):
                res.append(r)
        return res

    def rules(self):
        return iter(self.__rule_list)

    def fw(self):
        return self._firewall

    def name(self):
        return self.__name

    def __repr__(self):
        return '<ACL id=%s name=%r type=%r>' % (self.id, self.__name, self.acl_type)

    def __str__(self):
        return self.__repr__()