Sophie

Sophie

distrib > Mandriva > 2010.0 > i586 > media > contrib-release > by-pkgid > dca483b59ba61f3fa092de932ddd570e > files > 763

nuface-2.0.14-2mdv2009.1.i586.rpm

# Copyright(C) 2005 INL
# Written by Jean Gillaux <jean@inl.fr>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
#  This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.

"""Common stuff to NuFW ACLs backends"""

__version__ = '0.01'
__author__ = 'Jean Gillaux'

from IPy import IP
from nupyf.nuacl_func import proto_number

BACKEND_ENCODING = 'utf-8'
ATTR_TYPE_MULTIPLE = set(['SrcPort','DstPort','Proto','Group','OsName','OsVersion','OsRelease','AppName','AppSig'])
ATTR_TYPE_SIMPLE = set(['SrcIPStart','SrcIPEnd','DstIPStart','DstIPEnd','SrcPortStart','SrcPortEnd','DstPortStart','DstPortEnd','Decision','description', 'TimeRange', 'AclFlags'])
ALL_ATTR = (ATTR_TYPE_MULTIPLE | ATTR_TYPE_SIMPLE)

DESCRIPTION_LENGTH = 32


ACL_FLAGS_ASYNC = (1 << 0)
ACL_FLAGS_NOLOG = (1 << 1)
ACL_FLAGS_SYNC = (1 << 2)
ACL_FLAGS_STRICT = (1 << 3)

class NURule(object):
    disable_logprefix = False

    def __init__(self, xmlrule=None,
    attr_type_simple=ATTR_TYPE_SIMPLE, attr_type_multiple=ATTR_TYPE_MULTIPLE,
    compare_attr=ALL_ATTR):
        self.h = NURule._init_h()
        self.__rcpt = 0
        self.name = ''
        self.xmlrule = xmlrule
        self.attr_type_simple = attr_type_simple
        self.attr_type_multiple = attr_type_multiple
        self.compare_attr = compare_attr
        if xmlrule:
            self.nurule_from_xml_rule(xmlrule)

    def _init_h():
        """Initialization.
        """
        res={}
        for item in ALL_ATTR:
            res[item]=set()
        return res
    _init_h = staticmethod(_init_h)

    def make_decision(cls, dec):
        """Aggregates decision from rules
           drop/reject -> 0
           others -> 1
        """
        if dec == 'drop':
             return '0'
        if dec =='reject':
            return '3'
        return '1'
    make_decision = classmethod(make_decision)

    def nurule_from_xml_rule(self, r):
        """Set NURule attributes from xml rule object r.
        """
        os = r.os()
        appli = r.appli()
        res = self
        nsrc = IP(r.elt_src().get('numnet'))
        ndst = IP(r.elt_dest().get('numnet'))
        res['SrcIPStart'] = nsrc.net().strDec()
        res['SrcIPEnd'] = nsrc.broadcast().strDec()
        res['DstIPStart'] = ndst.net().strDec()
        res['DstIPEnd'] = ndst.broadcast().strDec()
        res['Decision'] = NURule.make_decision(r.decision)
        res['Group'] = [str(group) for group in r.user_groups()]
        res['Proto'] = str(proto_number(r.proto().get('proto')))
        src_port_start, src_port_end = r.get_src_port_range()
        if src_port_start:
            res['SrcPortStart']=src_port_start.encode(BACKEND_ENCODING)
        if src_port_end:
            res['SrcPortEnd']=src_port_end.encode(BACKEND_ENCODING)
        dst_port_start, dst_port_end = r.get_dst_port_range()
        if not dst_port_start:
             dst_port_start = '0'
        if not dst_port_end:
             dst_port_end = '65536'
        res['DstPortStart'],res['DstPortEnd'] = (dst_port_start, dst_port_end)
        res['DstPort'] = dst_port_end

        #ldap description is limited to 1024 characters.
        res['description'] = self.createDescription(r.acl)

        res.set_if_not_empty('OsName',os.get('OSName'))
        res.set_if_not_empty('OsRelease',os.get('OSRelease'))
        res.set_if_not_empty('OsVersion',os.get('OSVersion'))
        res.set_if_not_empty('AppName',appli.get('ApplicationPath'))
        res.set_if_not_empty('TimeRange',r.period_name.encode(BACKEND_ENCODING))
        flags = 0
        if r.log == "no":
            flags |= ACL_FLAGS_NOLOG
        if r.acl.transparent_proxy:
            flags |= ACL_FLAGS_SYNC | ACL_FLAGS_STRICT
        res['AclFlags'] = str(flags)

    def createDescription(self, acl):
        verbose = False
        message = str(acl.name())
        if not self.disable_logprefix and acl.ulog_prefix:
            verbose = True
            message = str(acl.ulog_prefix)
        message = "%s%s?:%s" % (acl.acl_type[0], acl.id, message)
        if DESCRIPTION_LENGTH < len(message):
            full_message = message
            message = message[:DESCRIPTION_LENGTH]
            if verbose:
                print "Warning: truncate prefix %r to %s characters: %r" % (
                    full_message, len(message), message)
        return message

    def __setitem__(self, key, value):
        if key in self.attr_type_multiple:
            if type(value)==list:
                 self.h[key]=set(value)
            else: self.h[key]=set([value])
        if key in self.attr_type_simple:
            if type(value)==list:
                if len(value)>0:
                     self.h[key]=set([value[0]])
                else: self.h[key]=set([])
            else: self.h[key]=set([value])

    def __getitem__(self, key):
        if key not in self.h:
             raise ValueError('bad ldap attribute %s in __getitem__' %(key))
        return [i for i in self.h[key]]

    def set_if_not_empty(self, attr, value):
        """set attr to value if value is not empty char
        """
        if value:
             self[attr] = value

    def __eq__(self, other):
        for k, v in self.h.iteritems():
            if k in self.compare_attr:
                if other.h[k]!=v:
                    return False
        return True

    def __ne__(self, other):
        return not self.__eq__(other)

    def __str__(self):
        s = 'NURule ('
        for k, v in self.h.iteritems():
            s+=str(k)+'->'+str(v)+', '
        s+=')'
        return s


class NUBackend(object):
    """Parent class for nufw acls backends.
    """

    def __init__(self, fw=None, backend_class=NURule):
        self.rules = []
        self.__rcpt = 0
        if fw is None:
            return
        for r in fw.allrules():
            if r.use_nufw() \
            and r.proto().get('proto') in ('tcp', 'udp') \
            and r.proto().get('decision') not in ('log', 'ulog'):
                self.add_rule(backend_class(xmlrule=r))

    def add_rule(self, rule):
        self.rules.append(rule)

    def set_rules(self, list_of_rules):
        self.rules = list_of_rules

    def get_rules(self):
        return self.rules

    def process(self):
        pass