# Copyright(C) 2005 INL # Written by Jean Gillaux <jean@inl.fr> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, version 3 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, see <http://www.gnu.org/licenses/>. from copy import deepcopy from IPy import IP from nupyf.nuerror import Error from nupyf.nuelt import Elt, EltSet def check_local_ipv4(eltg, local_ips): """Checks that 'net' property of local_ipv4 objects is an ip address of the fw """ if not eltg: return True for elts in eltg.iter_eltsets(): for elt in elts: addr = elt.get('net') if elt.get('is_local') and addr and addr not in local_ips: return False return True def check_rule_network(acl_id, network, typestr): if not network: return has_net = False for elt in network: if not elt.get('net'): continue if has_net: raise Error( "ACL %s: the %s must not contain " "several network definitions" % (acl_id, typestr)) has_net = True def getAclInterface(acl, addr): iface = acl.fw().get_iface_by_net_addr(addr) if not iface: return None return iface class Rule: """Represents a rule from an acl """ def __init__(self, acl, src, dest, proto=None, appli=None, os=None, aclname=''): self.acl = acl check_rule_network(acl.id, src, 'source') check_rule_network(acl.id, dest, 'destination') self.is_auto = 0 self.__src = src self.__dest = dest if self.__src: self.__src.merge_elts() if self.__dest: self.__dest.merge_elts() self.aclid = acl.id self.decision = acl.decision self.ulog_prefix = acl.ulog_prefix self.aclname = aclname self.netdst = None self.netsrc = None self.cpt = 0 self.period_name = '' #time period name self.sort_order = None if proto is None: self._proto = EltSet([Elt()]) else: self._proto = proto self._proto.merge_elts() if appli is None: self._appli = EltSet([Elt()]) else: self._appli = appli self._appli.merge_elts() if os is None: self._os = EltSet([Elt()]) else: self._os = os self._os.merge_elts() self.log = acl.log self.sort_order = acl.sort_order if self.src(): self.ip_src = IP(self.elt_src().get('numnet')) else: self.ip_src = None if self.dest(): self.ip_dest = IP(self.elt_dest().get('numnet')) else: self.ip_dest = None if self.ip_src: self.input_iface = getAclInterface(acl, self.ip_src) else: self.input_iface = None if self.ip_dest: self.output_iface = getAclInterface(acl, self.ip_dest) else: self.output_iface = None def src(self): return self.__src def dest(self): return self.__dest def user_groups(self): return self.acl.user_groups def elt_src(self): return self.__src.elts[0] def elt_dest(self): return self.__dest.elts[0] def proto(self): return self._proto.elts[0] def appli(self): return self._appli.elts[0] def os(self): return self._os.elts[0] def use_nufw(self): return bool(self.acl.user_groups) # FIXME: replace this function by one at acl level # rule is not aware of its place in the network layout def dst_isnot_src(self): #used by Aclcheck_dst_src_rules """This function find out if a rule has its srs and its dst in the same network (that is quit idiot) """ szone = self.elt_src().get('zone') dzone = self.elt_dest().get('zone') if dzone=='external' or szone=='external': # TODO: handle vpn zone return True ips = IP(self.elt_src().get('numnet')) ipd = IP(self.elt_dest().get('numnet')) if ips not in ipd and ipd not in ips: return True return False def __cmp__(self,other): return cmp(int(self.aclid),int(other.aclid)) def get_dst_port_range(self,dir='dport'): s = self.proto().get(dir) if s.count(':')!=0: return s.split(':') return (s,s) def get_src_port_range(self): return self.get_dst_port_range(dir='sport') def __repr__(self): return '<Rule %s of %r>' % (self.cpt, self.acl) def __str__(self): return self.__repr__() def _populate_iter(subject, resource, proto, appli, os): if appli and appli.eltset_t: appli_list = appli.eltset_t else: appli_list = [None] if os and os.eltset_t: os_list = os.eltset_t else: os_list = [None] if proto and proto.eltset_t: proto_list = proto.eltset_t else: proto_list = [None] subj_list = subject.eltset_t if not subj_list: subj_list = [None] res_list = resource.eltset_t if not res_list: res_list = [None] for elt_from in subj_list: for to in res_list: for proto in proto_list: for appli in appli_list: for os in os_list: yield (elt_from, to, proto, appli, os) class Acl: """Represents an acl in a NuFW acl xml file """ def __init__(self, sub, ress, firewall, name='', proto=None, id=-1, decision='', ulog_prefix='', appli=None, os=None, period_name='', sort_order=None, auth=None, log=False, acl_type='FORWARD', transparent_proxy=False): assert isinstance(id, int) self.__rule_list=[] self._firewall = firewall self.id = id self.decision = decision self.ulog_prefix = ulog_prefix self.__name = name self.cpt = 0 self.__proto = proto self.sort_order = sort_order self.log = log self.acl_type = acl_type self.transparent_proxy = transparent_proxy if auth: for eltset in auth.eltset_t: eltset.merge_elts() self.user_groups = [int(eltset.elts[0].get('group')) for eltset in auth.eltset_t] else: self.user_groups = None assert check_local_ipv4(sub, firewall.get_local_ips()) assert check_local_ipv4(ress, firewall.get_local_ips()) self.__populate(sub,ress,proto, appli, os, period_name) def __populate(self, subject, resource, proto, appli, os, period_name): lnet = self._firewall.entreprise_nets() if self._firewall.auth_ext and self._firewall.get_ext_net(): # include INTERNET (default) network in list used # to create netfilter rules for authenticated acls lnet.append(self._firewall.get_ext_net().ip()) for item in _populate_iter(subject, resource, proto, appli, os): (eltsetfrom, eltsetto, eltsetproto, eltsetappli, eltsetos) = item r = Rule(self, deepcopy(eltsetfrom), deepcopy(eltsetto), proto=deepcopy(eltsetproto), appli=deepcopy(eltsetappli), os=deepcopy(eltsetos), aclname=self.__name) # Set time period name if period_name: r.period_name = period_name self.cpt+=1 r.cpt = self.cpt if not r.dest() or r.elt_dest().get('is_local'): list_r = [r,] if not r.dest() or not r.elt_dest().get('numnet'): list_r = self._input_rules_for_local_ress_without_net(r) if not r.elt_src().get('numnet'): list_r = self._input_rules_for_local_ress_without_sujet_net(list_r) for _r in list_r: if not _r.is_auto: continue self.cpt = self.cpt+1 _r.cpt = self.cpt self.__rule_list.extend(list_r) else: l = [] l.extend(filter(lambda x: x is not None, [self._make_rule(r,hfw) for hfw in lnet])) # Remove idiot rules that come from a net and go to the same net l = self.check_dst_src_rules(l) if not l: if not (r.elt_dest().get('is_local') and not r.elt_src().get('numnet')): self.__rule_list.append(r) else: self.__rule_list.extend(l) def _input_rules_for_local_ress_without_net(self, rule): if not rule.dest(): return [deepcopy(rule)] result = [] for addr in self._firewall.get_local_ips(): new_rule = deepcopy(rule) new_rule.elt_dest().set('numnet', addr) new_rule.is_auto = 1 result.append(new_rule) return result def _input_rules_for_local_ress_without_sujet_net(self, rule_list): result = [] for rule in rule_list: for hfw in self._firewall.entreprise_nets(): new_rule = deepcopy(rule) new_rule.elt_src().set('numnet', hfw) new_rule.is_auto = 1 result.append(new_rule) return result def _make_rule(self,r,hfw): res = None change = 0 if r.src() \ and (not r.elt_src().get('is_local') and r.elt_src().get('numnet')==''): res = deepcopy(r) res.elt_src().set('numnet',hfw) res.elt_src().set('zone','internal') change = 1 # We dont want to generate a rule that goes # to an iface and from a lan that is not the iface LAN. if self._firewall.rule_inout(res) and self._firewall.rule_dst_is_src(res) is not True: change = 0 if change == 1: self.cpt = self.cpt+1 res.cpt = self.cpt res.is_auto = 1 return res return None def check_dst_src_rules(self, rules_list) : res = [] for r in rules_list: #if r.dst_isnot_src(): if not self._firewall.rule_dst_is_src(r): res.append(r) elif self._firewall.rule_inout(r): res.append(r) return res def rules(self): return iter(self.__rule_list) def fw(self): return self._firewall def name(self): return self.__name def __repr__(self): return '<ACL id=%s name=%r type=%r>' % (self.id, self.__name, self.acl_type) def __str__(self): return self.__repr__()