Sophie

Sophie

distrib > Mandriva > 2010.0 > i586 > media > contrib-release > by-pkgid > dca483b59ba61f3fa092de932ddd570e > files > 743

nuface-2.0.14-2mdv2009.1.i586.rpm

from subprocess import Popen, PIPE
from cStringIO import StringIO
from IPy import IP
from checkdesc.descmodels import (Desc, Firewall, Network, Interface, Address,
    DirectConnection, InternetConnection, RoutedConnection)
from checkdesc.descxml import dump
from checkdesc.parser import IfconfigParser, IpAddrParser, RouteParser
from checkdesc.tools import IPmakenet
from sys import exit, stderr

# Commands to get informations about network interfaces
IPADDR = ("/sbin/ip", "addr", "list")
IFCONFIG = ("/sbin/ifconfig", "-a")
ROUTE = ("/sbin/route", "-n")

# Default values in desc.xml
FIREWALL_QUEUE = 0
FIREWALL_NAME = "firewall"

# Special IPs
LOOPBACK = IP('127.0.0.0/8')
INTERNET = IP('0.0.0.0/0')

def searchFirst(condition, data):
    for item in data:
        if condition(item):
            return item
    raise KeyError('No item matching condition')

def runCommand(cmd_args):
    try:
        # Run with empty environment to avoid translation to user locale
        process = Popen(cmd_args, stdout=PIPE, env={})
        stdout, process_stderr = process.communicate()
        if process.returncode:
            print >>stderr, 'Command "%s" fails with code %s' % (
                " ".join(cmd_args), process.returncode)
            exit(1)
        return stdout
    except OSError, err:
        print >>stderr, 'Unable to run command "%s": %s' \
            % (" ".join(cmd_args), err)
        exit(1)

def isLocalAddress(address):
    if address.scope != 'global':
        return True
    if address.network in LOOPBACK:
        return True
    return False

def removeLocalAddresses(interface):
    interface['addr'] = filter(lambda addr: not isLocalAddress(addr), interface['addr'])

def getInterfaces(ifconfig_format, ifconfig, ignore_interfaces):
    # Run ifconfig -a and parse output
    if not ifconfig:
        ifconfig = runCommand(IPADDR)
        ifconfig = StringIO(ifconfig)
        ifconfig_format = "ip addr"
    if ifconfig_format == 'ifconfig':
        parser = IfconfigParser(ifconfig, '"%s"' % " ".join(IFCONFIG))
    else:
        parser = IpAddrParser(ifconfig, '"%s"' % " ".join(IPADDR))

    # Store interfaces list, but skip loopback (lo)
    interfaces = {}
    for interface in parser.interfaces:
        if interface['name'] in ignore_interfaces:
            print >>stderr, "INFO: Ignore interface %s" % interface['name']
            continue
        if interface['link'] == 'loopback':
            print >>stderr, "INFO: Skip interface %s (type %s)" % (
                interface['name'], interface['link'])
            continue
        removeLocalAddresses(interface)
        if not interface['addr']:
            continue
        interfaces[interface['name']] = interface
    return interfaces

def getRoutes(routes, ignore_interfaces):
    if not routes:
        routes = runCommand(ROUTE)
        routes = StringIO(routes)
    parser = RouteParser(routes, '"%s"' % " ".join(ROUTE), ignore_interfaces)
    return parser.routes

def createDesc(interfaces, routes, version=None):
    interface_id = 0
    network_id = 0

    desc = Desc(version)
    firewall = Firewall(FIREWALL_QUEUE, FIREWALL_NAME, id=1, type="nufw")
    desc.firewalls.append(firewall)
    for interface_dict in interfaces.itervalues():
        interface_id += 1
        interface = Interface(firewall, interface_dict["name"], id=interface_id)
        address_id = 1
        for address in interface_dict['addr']:
            ipv4 = address.ip
            addr = Address(ipv4, id=address_id)
            interface.append(addr)
            address_id += 1
        firewall.interfaces.append(interface)

        net_name = interface_dict["name"].upper()
        subnet_index = 0
        for address in interface_dict['addr']:
            if address.peer:
                size = 24
                ip_mask = IPmakenet("%s/%s" % (address.ip, size))
                try:
                    del interface_dict["gateway"]
                except KeyError:
                    pass
                print >>stderr, "Convert peer-to-peer address %s to %s" \
                    % (address.network, ip_mask)
            else:
                ip_mask = address.network
            network_id += 1
            if subnet_index:
                name = "%sALIAS%s" % (net_name, subnet_index)
            else:
                name = net_name
            network = Network(name, "ipv4", ip_mask, id=network_id)
            if not subnet_index and "gateway" in interface_dict:
                gateway = IP(interface_dict["gateway"])
                InternetConnection(network, interface, gateway)
            else:
                DirectConnection(network, interface)
            desc.networks.append(network)
            subnet_index += 1

    for route in routes:
        dest = IPmakenet('%s/%s' % (route['dest'], route['genmask']))
        gateway = IP(route['gateway'])
        if dest == INTERNET or gateway == IP('0.0.0.0'):
            continue

        try:
            interface = searchFirst(lambda item: item.name == route['interface'], firewall.interfaces)
        except KeyError:
            print >>stderr, "INFO: Skip route %s/%s=>%s (no interface %r)" % (
                route['dest'],
                route['genmask'],
                route['gateway'],
                route['interface'])
            continue
        try:
            network = searchFirst(lambda item: item.addr == dest, desc.networks)
        except KeyError:
            network_id = len(desc.networks) + 1
            name = "NET%s" % network_id
            network = Network(name, "ipv4", dest, id=network_id)
            desc.networks.append(network)
        RoutedConnection(network, interface, IP(gateway))

    return desc

def writeDescXML(desc_version, output, filename, ifconfig_format, ifconfig, routes, ignore_interfaces):
    ignore_interfaces = set(ignore_interfaces)
    interfaces = getInterfaces(ifconfig_format, ifconfig, ignore_interfaces)
    routes = getRoutes(routes, ignore_interfaces)
    for route in routes:
        dest = IPmakenet('%s/%s' % (route['dest'], route['genmask']))
        if dest != INTERNET:
            continue
        interface = interfaces[route['interface']]
        interface['gateway'] = route['gateway']
    desc = createDesc(interfaces, routes, desc_version)
    dump(desc, output)