from subprocess import Popen, PIPE from cStringIO import StringIO from IPy import IP from checkdesc.descmodels import (Desc, Firewall, Network, Interface, Address, DirectConnection, InternetConnection, RoutedConnection) from checkdesc.descxml import dump from checkdesc.parser import IfconfigParser, IpAddrParser, RouteParser from checkdesc.tools import IPmakenet from sys import exit, stderr # Commands to get informations about network interfaces IPADDR = ("/sbin/ip", "addr", "list") IFCONFIG = ("/sbin/ifconfig", "-a") ROUTE = ("/sbin/route", "-n") # Default values in desc.xml FIREWALL_QUEUE = 0 FIREWALL_NAME = "firewall" # Special IPs LOOPBACK = IP('127.0.0.0/8') INTERNET = IP('0.0.0.0/0') def searchFirst(condition, data): for item in data: if condition(item): return item raise KeyError('No item matching condition') def runCommand(cmd_args): try: # Run with empty environment to avoid translation to user locale process = Popen(cmd_args, stdout=PIPE, env={}) stdout, process_stderr = process.communicate() if process.returncode: print >>stderr, 'Command "%s" fails with code %s' % ( " ".join(cmd_args), process.returncode) exit(1) return stdout except OSError, err: print >>stderr, 'Unable to run command "%s": %s' \ % (" ".join(cmd_args), err) exit(1) def isLocalAddress(address): if address.scope != 'global': return True if address.network in LOOPBACK: return True return False def removeLocalAddresses(interface): interface['addr'] = filter(lambda addr: not isLocalAddress(addr), interface['addr']) def getInterfaces(ifconfig_format, ifconfig, ignore_interfaces): # Run ifconfig -a and parse output if not ifconfig: ifconfig = runCommand(IPADDR) ifconfig = StringIO(ifconfig) ifconfig_format = "ip addr" if ifconfig_format == 'ifconfig': parser = IfconfigParser(ifconfig, '"%s"' % " ".join(IFCONFIG)) else: parser = IpAddrParser(ifconfig, '"%s"' % " ".join(IPADDR)) # Store interfaces list, but skip loopback (lo) interfaces = {} for interface in parser.interfaces: if interface['name'] in ignore_interfaces: print >>stderr, "INFO: Ignore interface %s" % interface['name'] continue if interface['link'] == 'loopback': print >>stderr, "INFO: Skip interface %s (type %s)" % ( interface['name'], interface['link']) continue removeLocalAddresses(interface) if not interface['addr']: continue interfaces[interface['name']] = interface return interfaces def getRoutes(routes, ignore_interfaces): if not routes: routes = runCommand(ROUTE) routes = StringIO(routes) parser = RouteParser(routes, '"%s"' % " ".join(ROUTE), ignore_interfaces) return parser.routes def createDesc(interfaces, routes, version=None): interface_id = 0 network_id = 0 desc = Desc(version) firewall = Firewall(FIREWALL_QUEUE, FIREWALL_NAME, id=1, type="nufw") desc.firewalls.append(firewall) for interface_dict in interfaces.itervalues(): interface_id += 1 interface = Interface(firewall, interface_dict["name"], id=interface_id) address_id = 1 for address in interface_dict['addr']: ipv4 = address.ip addr = Address(ipv4, id=address_id) interface.append(addr) address_id += 1 firewall.interfaces.append(interface) net_name = interface_dict["name"].upper() subnet_index = 0 for address in interface_dict['addr']: if address.peer: size = 24 ip_mask = IPmakenet("%s/%s" % (address.ip, size)) try: del interface_dict["gateway"] except KeyError: pass print >>stderr, "Convert peer-to-peer address %s to %s" \ % (address.network, ip_mask) else: ip_mask = address.network network_id += 1 if subnet_index: name = "%sALIAS%s" % (net_name, subnet_index) else: name = net_name network = Network(name, "ipv4", ip_mask, id=network_id) if not subnet_index and "gateway" in interface_dict: gateway = IP(interface_dict["gateway"]) InternetConnection(network, interface, gateway) else: DirectConnection(network, interface) desc.networks.append(network) subnet_index += 1 for route in routes: dest = IPmakenet('%s/%s' % (route['dest'], route['genmask'])) gateway = IP(route['gateway']) if dest == INTERNET or gateway == IP('0.0.0.0'): continue try: interface = searchFirst(lambda item: item.name == route['interface'], firewall.interfaces) except KeyError: print >>stderr, "INFO: Skip route %s/%s=>%s (no interface %r)" % ( route['dest'], route['genmask'], route['gateway'], route['interface']) continue try: network = searchFirst(lambda item: item.addr == dest, desc.networks) except KeyError: network_id = len(desc.networks) + 1 name = "NET%s" % network_id network = Network(name, "ipv4", dest, id=network_id) desc.networks.append(network) RoutedConnection(network, interface, IP(gateway)) return desc def writeDescXML(desc_version, output, filename, ifconfig_format, ifconfig, routes, ignore_interfaces): ignore_interfaces = set(ignore_interfaces) interfaces = getInterfaces(ifconfig_format, ifconfig, ignore_interfaces) routes = getRoutes(routes, ignore_interfaces) for route in routes: dest = IPmakenet('%s/%s' % (route['dest'], route['genmask'])) if dest != INTERNET: continue interface = interfaces[route['interface']] interface['gateway'] = route['gateway'] desc = createDesc(interfaces, routes, desc_version) dump(desc, output)