# Copyright(C) 2005 INL # Written by Jean Gillaux <jean@inl.fr> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, version 3 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, see <http://www.gnu.org/licenses/>. """Common stuff to NuFW ACLs backends""" __version__ = '0.01' __author__ = 'Jean Gillaux' from IPy import IP from nupyf.nuacl_func import proto_number BACKEND_ENCODING = 'utf-8' ATTR_TYPE_MULTIPLE = set(['SrcPort','DstPort','Proto','Group','OsName','OsVersion','OsRelease','AppName','AppSig']) ATTR_TYPE_SIMPLE = set(['SrcIPStart','SrcIPEnd','DstIPStart','DstIPEnd','SrcPortStart','SrcPortEnd','DstPortStart','DstPortEnd','Decision','description', 'TimeRange', 'AclFlags']) ALL_ATTR = (ATTR_TYPE_MULTIPLE | ATTR_TYPE_SIMPLE) DESCRIPTION_LENGTH = 32 ACL_FLAGS_ASYNC = (1 << 0) ACL_FLAGS_NOLOG = (1 << 1) ACL_FLAGS_SYNC = (1 << 2) ACL_FLAGS_STRICT = (1 << 3) class NURule(object): disable_logprefix = False def __init__(self, xmlrule=None, attr_type_simple=ATTR_TYPE_SIMPLE, attr_type_multiple=ATTR_TYPE_MULTIPLE, compare_attr=ALL_ATTR): self.h = NURule._init_h() self.__rcpt = 0 self.name = '' self.xmlrule = xmlrule self.attr_type_simple = attr_type_simple self.attr_type_multiple = attr_type_multiple self.compare_attr = compare_attr if xmlrule: self.nurule_from_xml_rule(xmlrule) def _init_h(): """Initialization. """ res={} for item in ALL_ATTR: res[item]=set() return res _init_h = staticmethod(_init_h) def make_decision(cls, dec): """Aggregates decision from rules drop/reject -> 0 others -> 1 """ if dec == 'drop': return '0' if dec =='reject': return '3' return '1' make_decision = classmethod(make_decision) def nurule_from_xml_rule(self, r): """Set NURule attributes from xml rule object r. """ os = r.os() appli = r.appli() res = self nsrc = IP(r.elt_src().get('numnet')) ndst = IP(r.elt_dest().get('numnet')) res['SrcIPStart'] = nsrc.net().strDec() res['SrcIPEnd'] = nsrc.broadcast().strDec() res['DstIPStart'] = ndst.net().strDec() res['DstIPEnd'] = ndst.broadcast().strDec() res['Decision'] = NURule.make_decision(r.decision) res['Group'] = [str(group) for group in r.user_groups()] res['Proto'] = str(proto_number(r.proto().get('proto'))) src_port_start, src_port_end = r.get_src_port_range() if src_port_start: res['SrcPortStart']=src_port_start.encode(BACKEND_ENCODING) if src_port_end: res['SrcPortEnd']=src_port_end.encode(BACKEND_ENCODING) dst_port_start, dst_port_end = r.get_dst_port_range() if not dst_port_start: dst_port_start = '0' if not dst_port_end: dst_port_end = '65536' res['DstPortStart'],res['DstPortEnd'] = (dst_port_start, dst_port_end) res['DstPort'] = dst_port_end #ldap description is limited to 1024 characters. res['description'] = self.createDescription(r.acl) res.set_if_not_empty('OsName',os.get('OSName')) res.set_if_not_empty('OsRelease',os.get('OSRelease')) res.set_if_not_empty('OsVersion',os.get('OSVersion')) res.set_if_not_empty('AppName',appli.get('ApplicationPath')) res.set_if_not_empty('TimeRange',r.period_name.encode(BACKEND_ENCODING)) flags = 0 if r.log == "no": flags |= ACL_FLAGS_NOLOG if r.acl.transparent_proxy: flags |= ACL_FLAGS_SYNC | ACL_FLAGS_STRICT res['AclFlags'] = str(flags) def createDescription(self, acl): verbose = False message = str(acl.name()) if not self.disable_logprefix and acl.ulog_prefix: verbose = True message = str(acl.ulog_prefix) message = "%s%s?:%s" % (acl.acl_type[0], acl.id, message) if DESCRIPTION_LENGTH < len(message): full_message = message message = message[:DESCRIPTION_LENGTH] if verbose: print "Warning: truncate prefix %r to %s characters: %r" % ( full_message, len(message), message) return message def __setitem__(self, key, value): if key in self.attr_type_multiple: if type(value)==list: self.h[key]=set(value) else: self.h[key]=set([value]) if key in self.attr_type_simple: if type(value)==list: if len(value)>0: self.h[key]=set([value[0]]) else: self.h[key]=set([]) else: self.h[key]=set([value]) def __getitem__(self, key): if key not in self.h: raise ValueError('bad ldap attribute %s in __getitem__' %(key)) return [i for i in self.h[key]] def set_if_not_empty(self, attr, value): """set attr to value if value is not empty char """ if value: self[attr] = value def __eq__(self, other): for k, v in self.h.iteritems(): if k in self.compare_attr: if other.h[k]!=v: return False return True def __ne__(self, other): return not self.__eq__(other) def __str__(self): s = 'NURule (' for k, v in self.h.iteritems(): s+=str(k)+'->'+str(v)+', ' s+=')' return s class NUBackend(object): """Parent class for nufw acls backends. """ def __init__(self, fw=None, backend_class=NURule): self.rules = [] self.__rcpt = 0 if fw is None: return for r in fw.allrules(): if r.use_nufw() \ and r.proto().get('proto') in ('tcp', 'udp') \ and r.proto().get('decision') not in ('log', 'ulog'): self.add_rule(backend_class(xmlrule=r)) def add_rule(self, rule): self.rules.append(rule) def set_rules(self, list_of_rules): self.rules = list_of_rules def get_rules(self): return self.rules def process(self): pass