Sophie

Sophie

distrib > Mandriva > 2010.0 > i586 > media > contrib-release > by-pkgid > 429d9064b451fbf785f6785aa01bfcf8 > files > 53

python-storm-0.13-3mdv2010.0.noarch.rpm

#
# Copyright (c) 2006, 2007 Canonical
#
# Written by Gustavo Niemeyer <gustavo@niemeyer.net>
#
# This file is part of Storm Object Relational Mapper.
#
# Storm is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation; either version 2.1 of
# the License, or (at your option) any later version.
#
# Storm is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
from weakref import ref
import gc

from storm.exceptions import ClassInfoError
from storm.properties import Property
from storm.variables import Variable
from storm.expr import Undef, Select, compile
from storm.info import *

from tests.helper import TestHelper, run_this


class Wrapper(object):

    def __init__(self, obj):
        self.obj = obj

    __storm_object_info__ = property(lambda self:
                                     self.obj.__storm_object_info__)


class GetTest(TestHelper):

    def setUp(self):
        TestHelper.setUp(self)
        class Class(object):
            __storm_table__ = "table"
            prop1 = Property("column1", primary=True)
        self.Class = Class
        self.obj = Class()

    def test_get_cls_info(self):
        cls_info = get_cls_info(self.Class)
        self.assertTrue(isinstance(cls_info, ClassInfo))
        self.assertTrue(cls_info is get_cls_info(self.Class))

    def test_get_obj_info(self):
        obj_info = get_obj_info(self.obj)
        self.assertTrue(isinstance(obj_info, ObjectInfo))
        self.assertTrue(obj_info is get_obj_info(self.obj))

    def test_get_obj_info_on_obj_info(self):
        obj_info = get_obj_info(self.obj)
        self.assertTrue(get_obj_info(obj_info) is obj_info)

    def test_set_obj_info(self):
        obj_info1 = get_obj_info(self.obj)
        obj_info2 = ObjectInfo(self.obj)
        self.assertEquals(get_obj_info(self.obj), obj_info1)
        set_obj_info(self.obj, obj_info2)
        self.assertEquals(get_obj_info(self.obj), obj_info2)


class ClassInfoTest(TestHelper):

    def setUp(self):
        TestHelper.setUp(self)
        class Class(object):
            __storm_table__ = "table"
            prop1 = Property("column1", primary=True)
            prop2 = Property("column2")
        self.Class = Class
        self.cls_info = get_cls_info(Class)

    def test_invalid_class(self):
        class Class(object): pass
        self.assertRaises(ClassInfoError, ClassInfo, Class)

    def test_cls(self):
        self.assertEquals(self.cls_info.cls, self.Class)

    def test_columns(self):
        self.assertEquals(self.cls_info.columns,
                          (self.Class.prop1, self.Class.prop2))

    def test_table(self):
        self.assertEquals(self.cls_info.table, "table")

    def test_primary_key(self):
        # Can't use == for props.
        self.assertTrue(self.cls_info.primary_key[0] is self.Class.prop1)
        self.assertEquals(len(self.cls_info.primary_key), 1)

    def test_primary_key_with_attribute(self):
        class SubClass(self.Class):
            __storm_primary__ = "prop2"

        cls_info = get_cls_info(SubClass)

        self.assertTrue(cls_info.primary_key[0] is SubClass.prop2)
        self.assertEquals(len(self.cls_info.primary_key), 1)

    def test_primary_key_composed(self):
        class Class(object):
            __storm_table__ = "table"
            prop1 = Property("column1", primary=2)
            prop2 = Property("column2", primary=1)
        cls_info = ClassInfo(Class)

        # Can't use == for props, since they're columns.
        self.assertTrue(cls_info.primary_key[0] is Class.prop2)
        self.assertTrue(cls_info.primary_key[1] is Class.prop1)
        self.assertEquals(len(cls_info.primary_key), 2)

    def test_primary_key_composed_with_attribute(self):
        class Class(object):
            __storm_table__ = "table"
            __storm_primary__ = "prop2", "prop1"
            # Define primary=True to ensure that the attribute
            # has precedence.
            prop1 = Property("column1", primary=True)
            prop2 = Property("column2")
        cls_info = ClassInfo(Class)

        # Can't use == for props, since they're columns.
        self.assertTrue(cls_info.primary_key[0] is Class.prop2)
        self.assertTrue(cls_info.primary_key[1] is Class.prop1)
        self.assertEquals(len(cls_info.primary_key), 2)

    def test_primary_key_composed_duplicated(self):
        class Class(object):
            __storm_table__ = "table"
            prop1 = Property("column1", primary=True)
            prop2 = Property("column2", primary=True)
        self.assertRaises(ClassInfoError, ClassInfo, Class)

    def test_primary_key_missing(self):
        class Class(object):
            __storm_table__ = "table"
            prop1 = Property("column1")
            prop2 = Property("column2")
        self.assertRaises(ClassInfoError, ClassInfo, Class)

    def test_primary_key_attribute_missing(self):
        class Class(object):
            __storm_table__ = "table"
            __storm_primary__ = ()
            prop1 = Property("column1", primary=True)
            prop2 = Property("column2")
        self.assertRaises(ClassInfoError, ClassInfo, Class)

    def test_primary_key_pos(self):
        class Class(object):
            __storm_table__ = "table"
            prop1 = Property("column1", primary=2)
            prop2 = Property("column2")
            prop3 = Property("column3", primary=1)
        cls_info = ClassInfo(Class)
        self.assertEquals(cls_info.primary_key_pos, (2, 0))


class ObjectInfoTest(TestHelper):

    def setUp(self):
        TestHelper.setUp(self)
        class Class(object):
            __storm_table__ = "table"
            prop1 = Property("column1", primary=True)
            prop2 = Property("column2")
        self.Class = Class
        self.obj = Class()
        self.obj_info = get_obj_info(self.obj)
        self.cls_info = get_cls_info(Class)
        self.variable1 = self.obj_info.variables[Class.prop1]
        self.variable2 = self.obj_info.variables[Class.prop2]

    def test_hashing(self):
        self.assertEquals(hash(self.obj_info), hash(self.obj_info))

    def test_equals(self):
        obj_info1 = self.obj_info
        obj_info2 = get_obj_info(self.Class())
        self.assertFalse(obj_info1 == obj_info2)

    def test_not_equals(self):
        obj_info1 = self.obj_info
        obj_info2 = get_obj_info(self.Class())
        self.assertTrue(obj_info1 != obj_info2)

    def test_dict_subclass(self):
        self.assertTrue(isinstance(self.obj_info, dict))

    def test_variables(self):
        self.assertTrue(isinstance(self.obj_info.variables, dict))
        
        for column in self.cls_info.columns:
            variable = self.obj_info.variables.get(column)
            self.assertTrue(isinstance(variable, Variable))
            self.assertTrue(variable.column is column)

        self.assertEquals(len(self.obj_info.variables),
                          len(self.cls_info.columns))

    def test_variable_has_validator_object_factory(self):
        args = []
        def validator(obj, attr, value):
            args.append((obj, attr, value))
        class Class(object):
            __storm_table__ = "table"
            prop = Property(primary=True,
                            variable_kwargs={"validator": validator})

        obj = Class()
        get_obj_info(obj).variables[Class.prop].set(123)

        self.assertEquals(args, [(obj, "prop", 123)])

    def test_primary_vars(self):
        self.assertTrue(isinstance(self.obj_info.primary_vars, tuple))
        
        for column, variable in zip(self.cls_info.primary_key,
                                    self.obj_info.primary_vars):
            self.assertEquals(self.obj_info.variables.get(column),
                              variable)

        self.assertEquals(len(self.obj_info.primary_vars),
                          len(self.cls_info.primary_key))

    def test_checkpoint(self):
        self.obj.prop1 = 10
        self.obj_info.checkpoint()
        self.assertEquals(self.obj.prop1, 10)
        self.assertEquals(self.variable1.has_changed(), False)
        self.obj.prop1 = 20
        self.assertEquals(self.obj.prop1, 20)
        self.assertEquals(self.variable1.has_changed(), True)
        self.obj_info.checkpoint()
        self.assertEquals(self.obj.prop1, 20)
        self.assertEquals(self.variable1.has_changed(), False)
        self.obj.prop1 = 20
        self.assertEquals(self.obj.prop1, 20)
        self.assertEquals(self.variable1.has_changed(), False)

    def test_add_change_notification(self):
        changes1 = []
        changes2 = []
        def object_changed1(obj_info, variable, old_value, new_value, fromdb):
            changes1.append((1, obj_info, variable,
                             old_value, new_value, fromdb))
        def object_changed2(obj_info, variable, old_value, new_value, fromdb):
            changes2.append((2, obj_info, variable,
                             old_value, new_value, fromdb))

        self.obj_info.checkpoint()
        self.obj_info.event.hook("changed", object_changed1)
        self.obj_info.event.hook("changed", object_changed2)

        self.obj.prop2 = 10
        self.obj.prop1 = 20

        self.assertEquals(changes1,
                      [(1, self.obj_info, self.variable2, Undef, 10, False),
                       (1, self.obj_info, self.variable1, Undef, 20, False)])
        self.assertEquals(changes2,
                      [(2, self.obj_info, self.variable2, Undef, 10, False),
                       (2, self.obj_info, self.variable1, Undef, 20, False)])

        del changes1[:]
        del changes2[:]

        self.obj.prop1 = None
        self.obj.prop2 = None

        self.assertEquals(changes1,
                      [(1, self.obj_info, self.variable1, 20, None, False),
                       (1, self.obj_info, self.variable2, 10, None, False)])
        self.assertEquals(changes2,
                      [(2, self.obj_info, self.variable1, 20, None, False),
                       (2, self.obj_info, self.variable2, 10, None, False)])

        del changes1[:]
        del changes2[:]

        del self.obj.prop1
        del self.obj.prop2

        self.assertEquals(changes1,
                      [(1, self.obj_info, self.variable1, None, Undef, False),
                       (1, self.obj_info, self.variable2, None, Undef, False)])
        self.assertEquals(changes2,
                      [(2, self.obj_info, self.variable1, None, Undef, False),
                       (2, self.obj_info, self.variable2, None, Undef, False)])

    def test_add_change_notification_with_arg(self):
        changes1 = []
        changes2 = []
        def object_changed1(obj_info, variable,
                            old_value, new_value, fromdb, arg):
            changes1.append((1, obj_info, variable,
                             old_value, new_value, fromdb, arg))
        def object_changed2(obj_info, variable,
                            old_value, new_value, fromdb, arg):
            changes2.append((2, obj_info, variable,
                             old_value, new_value, fromdb, arg))

        self.obj_info.checkpoint()

        obj = object()
        
        self.obj_info.event.hook("changed", object_changed1, obj)
        self.obj_info.event.hook("changed", object_changed2, obj)

        self.obj.prop2 = 10
        self.obj.prop1 = 20

        self.assertEquals(changes1,
                  [(1, self.obj_info, self.variable2, Undef, 10, False, obj),
                   (1, self.obj_info, self.variable1, Undef, 20, False, obj)])
        self.assertEquals(changes2,                                        
                  [(2, self.obj_info, self.variable2, Undef, 10, False, obj),
                   (2, self.obj_info, self.variable1, Undef, 20, False, obj)])

        del changes1[:]
        del changes2[:]

        self.obj.prop1 = None
        self.obj.prop2 = None

        self.assertEquals(changes1,
                  [(1, self.obj_info, self.variable1, 20, None, False, obj),
                   (1, self.obj_info, self.variable2, 10, None, False, obj)])
        self.assertEquals(changes2,                                       
                  [(2, self.obj_info, self.variable1, 20, None, False, obj),
                   (2, self.obj_info, self.variable2, 10, None, False, obj)])

        del changes1[:]
        del changes2[:]

        del self.obj.prop1
        del self.obj.prop2

        self.assertEquals(changes1,
              [(1, self.obj_info, self.variable1, None, Undef, False, obj),
               (1, self.obj_info, self.variable2, None, Undef, False, obj)])
        self.assertEquals(changes2,                                      
              [(2, self.obj_info, self.variable1, None, Undef, False, obj),
               (2, self.obj_info, self.variable2, None, Undef, False, obj)])

    def test_remove_change_notification(self):
        changes1 = []
        changes2 = []
        def object_changed1(obj_info, variable, old_value, new_value, fromdb):
            changes1.append((1, obj_info, variable,
                             old_value, new_value, fromdb))
        def object_changed2(obj_info, variable, old_value, new_value, fromdb):
            changes2.append((2, obj_info, variable,
                             old_value, new_value, fromdb))

        self.obj_info.checkpoint()

        self.obj_info.event.hook("changed", object_changed1)
        self.obj_info.event.hook("changed", object_changed2)
        self.obj_info.event.unhook("changed", object_changed1)

        self.obj.prop2 = 20
        self.obj.prop1 = 10

        self.assertEquals(changes1, [])
        self.assertEquals(changes2,
                      [(2, self.obj_info, self.variable2, Undef, 20, False),
                       (2, self.obj_info, self.variable1, Undef, 10, False)])

    def test_remove_change_notification_with_arg(self):
        changes1 = []
        changes2 = []
        def object_changed1(obj_info, variable,
                            old_value, new_value, fromdb, arg):
            changes1.append((1, obj_info, variable,
                             old_value, new_value, fromdb, arg))
        def object_changed2(obj_info, variable,
                            old_value, new_value, fromdb, arg):
            changes2.append((2, obj_info, variable,
                             old_value, new_value, fromdb, arg))

        self.obj_info.checkpoint()

        obj = object()

        self.obj_info.event.hook("changed", object_changed1, obj)
        self.obj_info.event.hook("changed", object_changed2, obj)
        self.obj_info.event.unhook("changed", object_changed1, obj)

        self.obj.prop2 = 20
        self.obj.prop1 = 10

        self.assertEquals(changes1, [])
        self.assertEquals(changes2,
                  [(2, self.obj_info, self.variable2, Undef, 20, False, obj),
                   (2, self.obj_info, self.variable1, Undef, 10, False, obj)])

    def test_auto_remove_change_notification(self):
        changes1 = []
        changes2 = []
        def object_changed1(obj_info, variable, old_value, new_value, fromdb):
            changes1.append((1, obj_info, variable,
                             old_value, new_value, fromdb))
            return False
        def object_changed2(obj_info, variable, old_value, new_value, fromdb):
            changes2.append((2, obj_info, variable,
                             old_value, new_value, fromdb))
            return False

        self.obj_info.checkpoint()

        self.obj_info.event.hook("changed", object_changed1)
        self.obj_info.event.hook("changed", object_changed2)

        self.obj.prop2 = 20
        self.obj.prop1 = 10

        self.assertEquals(changes1,
                      [(1, self.obj_info, self.variable2, Undef, 20, False)])
        self.assertEquals(changes2,
                      [(2, self.obj_info, self.variable2, Undef, 20, False)])

    def test_auto_remove_change_notification_with_arg(self):
        changes1 = []
        changes2 = []
        def object_changed1(obj_info, variable,
                            old_value, new_value, fromdb, arg):
            changes1.append((1, obj_info, variable,
                             old_value, new_value, fromdb, arg))
            return False
        def object_changed2(obj_info, variable,
                            old_value, new_value, fromdb, arg):
            changes2.append((2, obj_info, variable,
                             old_value, new_value, fromdb, arg))
            return False

        self.obj_info.checkpoint()

        obj = object()

        self.obj_info.event.hook("changed", object_changed1, obj)
        self.obj_info.event.hook("changed", object_changed2, obj)

        self.obj.prop2 = 20
        self.obj.prop1 = 10

        self.assertEquals(changes1,
                  [(1, self.obj_info, self.variable2, Undef, 20, False, obj)])
        self.assertEquals(changes2,
                  [(2, self.obj_info, self.variable2, Undef, 20, False, obj)])

    def test_get_obj(self):
        self.assertTrue(self.obj_info.get_obj() is self.obj)

    def test_get_obj_reference(self):
        """
        We used to assign the get_obj() manually. This breaks stored
        references to the method (IOW, what we do in the test below).

        It was a bit faster, but in exchange for the danger of introducing
        subtle bugs which are super hard to debug.
        """
        get_obj = self.obj_info.get_obj
        self.assertTrue(get_obj() is self.obj)
        another_obj = self.Class()
        self.obj_info.set_obj(another_obj)
        self.assertTrue(get_obj() is another_obj)

    def test_set_obj(self):
        obj = self.Class()
        self.obj_info.set_obj(obj)
        self.assertTrue(self.obj_info.get_obj() is obj)

    def test_weak_reference(self):
        obj = self.Class()
        obj_info = get_obj_info(obj)
        del obj
        self.assertEquals(obj_info.get_obj(), None)

    def test_object_deleted_notification(self):
        obj = self.Class()
        obj_info = get_obj_info(obj)
        obj_info["tainted"] = True
        deleted = []
        def object_deleted(obj_info):
            deleted.append(obj_info)
        obj_info.event.hook("object-deleted", object_deleted)
        del obj_info
        del obj
        self.assertEquals(len(deleted), 1)
        self.assertTrue("tainted" in deleted[0])

    def test_object_deleted_notification_after_set_obj(self):
        obj = self.Class()
        obj_info = get_obj_info(obj)
        obj_info["tainted"] = True
        obj = self.Class()
        obj_info.set_obj(obj)
        deleted = []
        def object_deleted(obj_info):
            deleted.append(obj_info)
        obj_info.event.hook("object-deleted", object_deleted)
        del obj_info
        del obj
        self.assertEquals(len(deleted), 1)
        self.assertTrue("tainted" in deleted[0])


class ClassAliasTest(TestHelper):

    def setUp(self):
        TestHelper.setUp(self)
        class Class(object):
            __storm_table__ = "table"
            prop1 = Property("column1", primary=True)
        self.Class = Class
        self.ClassAlias = ClassAlias(self.Class, "alias")
        
    def test_cls_info_cls(self):
        cls_info = get_cls_info(self.ClassAlias)
        self.assertEquals(cls_info.cls, self.Class)
        self.assertEquals(cls_info.table, "alias")
        self.assertEquals(self.ClassAlias.prop1.name, "column1")
        self.assertEquals(self.ClassAlias.prop1.table, self.ClassAlias)

    def test_compile(self):
        statement = compile(self.ClassAlias)
        self.assertEquals(statement, "alias")

    def test_compile_with_reserved_keyword(self):
        Alias = ClassAlias(self.Class, "select")
        statement = compile(Alias)
        self.assertEquals(statement, '"select"')

    def test_compile_in_select(self):
        expr = Select(self.ClassAlias.prop1, self.ClassAlias.prop1 == 1,
                      self.ClassAlias)
        statement = compile(expr)
        self.assertEquals(statement,
                          'SELECT alias.column1 FROM "table" AS alias '
                          'WHERE alias.column1 = ?')

    def test_compile_in_select_with_reserved_keyword(self):
        Alias = ClassAlias(self.Class, "select")
        expr = Select(Alias.prop1, Alias.prop1 == 1, Alias)
        statement = compile(expr)
        self.assertEquals(statement,
                          'SELECT "select".column1 FROM "table" AS "select" '
                          'WHERE "select".column1 = ?')

    def test_crazy_metaclass(self):
        """We don't want metaclasses playing around when we build an alias."""
        TestHelper.setUp(self)
        class MetaClass(type):
            def __new__(meta_cls, name, bases, dict):
                cls = type.__new__(meta_cls, name, bases, dict)
                cls.__storm_table__ = "HAH! GOTCH YA!"
                return cls
        class Class(object):
            __metaclass__ = MetaClass
            __storm_table__ = "table"
            prop1 = Property("column1", primary=True)
        Alias = ClassAlias(Class, "USE_THIS")
        self.assertEquals(Alias.__storm_table__, "USE_THIS")

    def test_cached_aliases(self):
        """
        Class aliases are cached such that multiple invocations of
        C{ClassAlias} return the same object.
        """
        alias1 = ClassAlias(self.Class, "something_unlikely")
        alias2 = ClassAlias(self.Class, "something_unlikely")
        self.assertIdentical(alias1, alias2)
        alias3 = ClassAlias(self.Class, "something_unlikely2")
        self.assertNotIdentical(alias1, alias3)
        alias4 = ClassAlias(self.Class, "something_unlikely2")
        self.assertIdentical(alias3, alias4)

    def test_unnamed_aliases_not_cached(self):
        alias1 = ClassAlias(self.Class)
        alias2 = ClassAlias(self.Class)
        self.assertNotIdentical(alias1, alias2)

    def test_alias_cache_is_per_class(self):
        """
        The cache of class aliases is not as bad as it once was.
        """
        class LocalClass(self.Class):
            pass
        alias = ClassAlias(self.Class, "something_unlikely")
        alias2 = ClassAlias(LocalClass, "something_unlikely")
        self.assertNotIdentical(alias, alias2)

    def test_aliases_only_last_as_long_as_class(self):
        """
        The cached ClassAliases only last for as long as the class is alive.
        """
        class LocalClass(self.Class):
            pass
        alias = ClassAlias(LocalClass, "something_unlikely3")
        alias_ref = ref(alias)
        class_ref = ref(LocalClass)
        del alias
        del LocalClass

        gc.collect(); gc.collect(); gc.collect()

        self.assertIdentical(class_ref(), None)
        self.assertIdentical(alias_ref(), None)


class TypeCompilerTest(TestHelper):

    def test_nested_classes(self):
        """Convoluted case checking that the model is right."""
        class Class1(object):
            __storm_table__ = "class1"
            id = Property(primary=True)
        class Class2(object):
            __storm_table__ = Class1
            id = Property(primary=True)
        statement = compile(Class2)
        self.assertEquals(statement, "class1")
        alias = ClassAlias(Class2, "alias")
        statement = compile(Select(alias.id))
        self.assertEquals(statement, "SELECT alias.id FROM class1 AS alias")