#!/usr/bin/env python # -*- coding: utf-8 -*- """ Copyright(C) 2007 INL Written by Damien Boucard <damien.boucard AT inl.fr> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, version 3 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, see <http://www.gnu.org/licenses/>. --- l7ipt.py is a module used for importing and exporting a layer7 filtering content into an iptables command set in iptables-save format (without "iptables -t MANGLE" prefix). """ from os import linesep as endl from iptmodels import IptCommand def load(file): """ Generate model objects from a given iptables script. @param file: file-like object to import. @type file: file @rtype: L7RuleList """ #TODO: code me ! raise NotImplementedError("fixme: Not yet implemented !") def dump(l7rulelist, file): """ Generate an iptables script from a given nulayer7 model. @param l7rulelist: data to export into iptables script. @type l7rulelist: L7RuleList @param file: file-like object to export to. @type file: file """ # ACCEPT non-L7 connection (TODO: should be customizable) file.write("-A PREROUTING -m connmark --mark 0/0x%08X -j ACCEPT"\ %l7rulelist.mask +endl) file.write("-A PREROUTING -m connmark --mark 0x%08X/0x%08X -j DROP"\ %(l7rulelist.mask,l7rulelist.mask) +endl) file.write("-A POSTROUTING -m state --state NEW -m connmark --mark 0/0x%08X -j ACCEPT"\ %l7rulelist.mask +endl) file.write("# L7filter rules"+endl) # Creating LAYER7 chain file.write("-N LAYER7"+endl) # Route to LAYER7 chain file.write("-A PREROUTING -j LAYER7"+endl) file.write("-A POSTROUTING -m state --state NEW -j LAYER7"+endl) L7IptCommand.connmark_mask = l7rulelist.mask for l7rule in l7rulelist: file.write("# %s" %l7rule.name +endl) for l7ruleelt in l7rule: action = l7ruleelt.action or l7rule.action prefix = (l7ruleelt.prefix or l7rule.prefix or "")\ %{'l7proto':l7ruleelt.l7proto} cmd = L7IptCommand(l7rule.connmark, l7ruleelt.l7proto, action, prefix) file.write(str(cmd)+endl) ##cmd.postoptions = {} # Rule for unset protocol cmd = IptCommand(chain="LAYER7", target="ACCEPT") cmd.append_module("connmark", mark="0x%08X/0x%08X" %(l7rule.connmark, l7rulelist.mask)) cmd.append_module("layer7", l7proto="unset") file.write(str(cmd)+endl) # Rule for unknown protocol cmd = UnknownIptCommand(l7rule.connmark, "unknown", l7rule.defaultaction, l7rule.defaultprefix) file.write(str(cmd)+endl) file.write("# Default policy for LAYER7 chain"+endl) # Accepting mark 0 file.write("-A LAYER7 -m connmark --mark 0/0x%08X -j ACCEPT" %(l7rulelist.mask)+endl) # Setting mark of connexion to drop to the value of mask. file.write("-A LAYER7 -j CONNMARK --set-mark 0x%08X/0x%08X" %(l7rulelist.mask,l7rulelist.mask)+endl) # Default policy for LAYER7 chain file.write(str(IptCommand(chain="LAYER7", target="DROP"))+endl) class L7IptCommand(IptCommand): connmark_mask = None def __init__(self, connmark, proto, action, prefix): IptCommand.__init__(self, chain="LAYER7") self.append_module("connmark", mark="0x%08X/0x%08X" %(connmark, self.connmark_mask)) self.append_module("layer7", l7proto=proto) #self.connmark = connmark self.action = action self.prefix = prefix def _set_accept(self): self.target = "CONNMARK" self.add_postoption("set-mark", "0/0x%08X" %(self.connmark_mask)) self.comment = "ACCEPT" def _get_drop_rule(self): return [] def __str__(self): ipt_rules = [] action = self.action # Defining target log_c = '' if action.find("ulog") != -1: log_c = 'u' self.target = "ULOG" elif action.find("log") != -1: self.target = "LOG" elif action == "accept": self._set_accept() else: self.target = action.upper() # Defining log-prefix if self.target in ("LOG", "ULOG") and self.prefix: self.add_postoption(log_c + "log-prefix", '"%s"' %self.prefix) # Writing a L7 rule ipt_rules.append(IptCommand.__str__(self)) if action == log_c + "logaccept": try: self.del_postoption(log_c + "log-prefix") except KeyError: pass self._set_accept() ipt_rules.append(IptCommand.__str__(self)) elif action == log_c + "logdrop": try: self.del_postoption(log_c + "log-prefix") except KeyError: pass ipt_rules.extend(self._get_drop_rule()) return endl.join(ipt_rules) class UnknownIptCommand(L7IptCommand): def _set_accept(self): self.target = "ACCEPT" def _get_drop_rule(self): self.target = "DROP" return [IptCommand.__str__(self),] if __name__ == "__main__": import sys from StringIO import StringIO s = StringIO() dump(load(open(sys.argv[1], 'r')), s) print s.getvalue()