#!/usr/bin/python from sys import exit, stderr, path from optparse import OptionParser from glob import glob from os.path import join as path_join, isdir, exists as path_exists, basename, dirname, normpath from os import mkdir, system, rename import re from shutil import rmtree from difflib import context_diff class RunTests: def __init__(self): self.test_dir = dirname(__file__) nuface_dir = normpath(path_join(self.test_dir, '..')) self.nupyf = path_join(nuface_dir, "scripts/scripts/nupyf") nupyf_module_dir = path_join(nuface_dir, "scripts/nupyf") path.insert(0, nupyf_module_dir) self.output_dir = "/tmp/nupyf-tests" self.iptables = "/sbin/iptables" def parseOptions(self): parser = OptionParser(usage="%prog [-hdc] [<testcase> [<testcase> ...]]") parser.add_option("-d", "--delete-output", help="Does not delete output files after failed test at %s." % self.output_dir, action="store_false", default=True) parser.add_option("-c", "--continue-mode", help="Continues the test procedure even if a test fails.", action="store_true", default=False) parser.add_option("--regenerate", help="Regenerate tests instead of testing", action="store_true", default=False) self.options, self.testcases = parser.parse_args() def compareFiles(self, expect_dir, f): expected = path_join(expect_dir, basename(f)) try: text1 = open(expected).readlines()[2:] text2 = open(f).readlines()[2:] diff = list(context_diff(text1, text2)) if not diff: return False except IOError: if self.options.regenerate: # Ignore IOError on regeneration pass else: raise if self.options.regenerate: rename(f, expected) print "%s regenerated" % expected return False else: print "Diff in file %s" % expected for line in diff: print line.rstrip() return True def compareDirectories(self, expect_dir): failure = False if self.options.regenerate: if not isdir(expect_dir): mkdir(expect_dir) failure = False for f in glob(path_join(self.output_dir, "*")): failure |= self.compareFiles(expect_dir, f) return failure def testFile(self, test_dir, acl): if path_exists(self.output_dir): if isdir(self.output_dir): rmtree(self.output_dir) else: print >>stderr, "%s: file already exists but is not a directory." % self.output_dir exit(1) mkdir(self.output_dir) o = self.output_dir print "* Testing %s" % basename(acl) cmd = self.nupyf cmd += " --debug" cmd += " -d %s" % path_join(o, "dispatch_rules") cmd += " -f %s" % path_join(o, "forward_rules") cmd += " -i %s" % path_join(o, "input_rules") cmd += " -o %s" % path_join(o, "output_rules") cmd += " -n %s" % path_join(o, "nat_rules") cmd += " -m %s" % path_join(o, "mangle_rules") cmd += " --ulog --sortid=1" cmd += " --nufw" cmd += " --same-iface" cmd += " --iptables=%s" % self.iptables cmd += " %s %s" % (path_join(test_dir, "desc.xml"), acl) exit_code = system(cmd) failure = True if exit_code == 0: expect_dir = re.sub("^(.*)/acls_(.*)\.xml$", r'\1/expect_\2', acl) failure = self.compareDirectories(expect_dir) if not failure and self.options.delete_output: rmtree(self.output_dir) if failure: print "[FAIL]" if not self.options.continue_mode: exit(1) else: print "[ OK ]" return failure def runTestCase(self, directory): print "------------------------------------------------------------------------------" print "ENTERING INTO %s TESTCASE" % directory test_dir = directory if not isdir(test_dir): print >>stderr, "%s: no such directory." % test_dir return False desc = path_join(test_dir, "desc.xml") if not path_exists(desc): print >>stderr, "%s: file not found." % desc return False ok = True for acl in glob(path_join(test_dir, "acls*.xml")): ok &= self.testFile(test_dir, acl) return ok def main(self): self.parseOptions() if not self.testcases: print self.test_dir self.testcases = glob(path_join(self.test_dir, "*")) self.testcases = filter(isdir, self.testcases) for testcase in self.testcases: self.runTestCase(testcase) print "------------------------------------------------------------------------------" if __name__ == "__main__": RunTests().main()