Sophie

Sophie

distrib > Mandriva > 2010.0 > i586 > media > contrib-release > by-pkgid > 429d9064b451fbf785f6785aa01bfcf8 > files > 56

python-storm-0.13-3mdv2010.0.noarch.rpm

#
# Copyright (c) 2006, 2007 Canonical
#
# Written by Gustavo Niemeyer <gustavo@niemeyer.net>
#
# This file is part of Storm Object Relational Mapper.
#
# Storm is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation; either version 2.1 of
# the License, or (at your option) any later version.
#
# Storm is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
import datetime
import operator

from storm.database import create_database
from storm.exceptions import NoneError
from storm.sqlobject import *
from storm.store import Store
from storm.expr import Asc, Like
from storm.tz import tzutc

from tests.helper import TestHelper


class SQLObjectTest(TestHelper):

    def setUp(self):
        TestHelper.setUp(self)

        # Allow classes with the same name in different tests to resolve
        # property path strings properly.
        SQLObjectBase._storm_property_registry.clear()

        self.store = Store(create_database("sqlite:"))
        class SQLObject(SQLObjectBase):
            @staticmethod
            def _get_store():
                return self.store

        self.SQLObject = SQLObject

        self.store.execute("CREATE TABLE person "
                           "(id INTEGER PRIMARY KEY, name TEXT, age INTEGER,"
                           " ts TIMESTAMP, delta INTERVAL,"
                           " address_id INTEGER)")
        self.store.execute("INSERT INTO person VALUES "
                           "(1, 'John Joe', 20, '2007-02-05 19:53:15',"
                           " '1 day, 12:34:56', 1)")
        self.store.execute("INSERT INTO person VALUES "
                           "(2, 'John Doe', 20, '2007-02-05 20:53:15',"
                           " '42 days 12:34:56.78', 2)")

        self.store.execute("CREATE TABLE address "
                           "(id INTEGER PRIMARY KEY, city TEXT)")
        self.store.execute("INSERT INTO address VALUES (1, 'Curitiba')")
        self.store.execute("INSERT INTO address VALUES (2, 'Sao Carlos')")

        self.store.execute("CREATE TABLE phone "
                           "(id INTEGER PRIMARY KEY, person_id INTEGER,"
                           "number TEXT)")
        self.store.execute("INSERT INTO phone VALUES (1, 2, '1234-5678')")
        self.store.execute("INSERT INTO phone VALUES (2, 1, '8765-4321')")
        self.store.execute("INSERT INTO phone VALUES (3, 2, '8765-5678')")

        self.store.execute("CREATE TABLE person_phone "
                           "(id INTEGER PRIMARY KEY, person_id INTEGER, "
                           "phone_id INTEGER)")
        self.store.execute("INSERT INTO person_phone VALUES (1, 2, 1)")
        self.store.execute("INSERT INTO person_phone VALUES (2, 2, 2)")
        self.store.execute("INSERT INTO person_phone VALUES (3, 1, 1)")

        class Person(self.SQLObject):
            _defaultOrder = "-Person.name"
            name = StringCol()
            age = IntCol()
            ts = UtcDateTimeCol()

        self.Person = Person

    def test_get(self):
        person = self.Person.get(2)
        self.assertTrue(person)
        self.assertEquals(person.name, "John Doe")

    def test_get_not_found(self):
        self.assertRaises(SQLObjectNotFound, self.Person.get, 1000)

    def test_get_typecast(self):
        person = self.Person.get("2")
        self.assertTrue(person)
        self.assertEquals(person.name, "John Doe")

    def test_destroySelf(self):
        person = self.Person.get(2)
        person.destroySelf()
        self.assertRaises(SQLObjectNotFound, self.Person.get, 2)

    def test_delete(self):
        self.Person.delete(2)
        self.assertRaises(SQLObjectNotFound, self.Person.get, 2)

    def test_custom_table_name(self):
        class MyPerson(self.Person):
            _table = "person"

        person = MyPerson.get(2)

        self.assertTrue(person)
        self.assertEquals(person.name, "John Doe")

    def test_custom_id_name(self):
        class MyPerson(self.SQLObject):
            _defaultOrder = "-Person.name"
            _table = "person"
            _idName = "name"
            _idType = unicode
            age = IntCol()
            ts = UtcDateTimeCol()

        person = MyPerson.get("John Doe")

        self.assertTrue(person)
        self.assertEquals(person.id, "John Doe")

    def test_create(self):
        person = self.Person(name="John Joe")

        self.assertTrue(Store.of(person) is self.store)
        self.assertEquals(type(person.id), int)
        self.assertEquals(person.name, "John Joe")

    def test_SO_creating(self):
        test = self
        class Person(self.Person):
            def set(self, **args):
                test.assertEquals(self._SO_creating, True)
                test.assertEquals(args, {"name": "John Joe"})

        person = Person(name="John Joe")
        self.assertEquals(person._SO_creating, False)

    def test_object_not_added_if__create_fails(self):
        objects = []
        class Person(self.Person):
            def _create(self, id, **kwargs):
                objects.append(self)
                raise RuntimeError
        self.assertRaises(RuntimeError, Person, name="John Joe")
        self.assertEquals(len(objects), 1)
        person = objects[0]
        self.assertEquals(Store.of(person), None)

    def test_init_hook(self):
        called = []
        class Person(self.Person):
            def _init(self, *args, **kwargs):
                called.append(True)

        person = Person(name="John Joe")
        self.assertEquals(called, [True])
        Person.get(2)
        self.assertEquals(called, [True, True])

    def test_alternateID(self):
        class Person(self.SQLObject):
            name = StringCol(alternateID=True)
        person = Person.byName("John Doe")
        self.assertTrue(person)
        self.assertEquals(person.name, "John Doe")

    def test_alternateMethodName(self):
        class Person(self.SQLObject):
            name = StringCol(alternateMethodName="byFoo")

        person = Person.byFoo("John Doe")
        self.assertTrue(person)
        self.assertEquals(person.name, "John Doe")

        self.assertRaises(SQLObjectNotFound, Person.byFoo, "John None")

    def test_select(self):
        result = self.Person.select("name = 'John Joe'")
        self.assertEquals(result[0].name, "John Joe")

    def test_select_sqlbuilder(self):
        result = self.Person.select(self.Person.q.name == "John Joe")
        self.assertEqual(result[0].name, "John Joe")

    def test_select_orderBy(self):
        result = self.Person.select("name LIKE 'John%'", orderBy=("name","id"))
        self.assertEquals(result[0].name, "John Doe")

    def test_select_orderBy_expr(self):
        result = self.Person.select("name LIKE 'John%'",
                                    orderBy=self.Person.name)
        self.assertEquals(result[0].name, "John Doe")

    def test_select_all(self):
        result = self.Person.select()
        self.assertEquals(result[0].name, "John Joe")

    def test_select_empty_string(self):
        result = self.Person.select('')
        self.assertEquals(result[0].name, "John Joe")

    def test_select_limit(self):
        result = self.Person.select(limit=1)
        self.assertEquals(len(list(result)), 1)

    def test_select_negative_offset(self):
        result = self.Person.select(orderBy="name")
        self.assertEquals(result[-1].name, "John Joe")

    def test_select_slice_negative_offset(self):
        result = self.Person.select(orderBy="name")[-1:]
        self.assertEquals(result[0].name, "John Joe")

    def test_select_distinct(self):
        result = self.Person.select("person.name = 'John Joe'",
                                    clauseTables=["phone"], distinct=True)
        self.assertEquals(len(list(result)), 1)

    def test_select_selectAlso(self):
        # Since John Doe has two phone numbers, this would return him
        # twice without the distinct=True bit.
        result = self.Person.select(
            "person.id = phone.person_id",
            clauseTables=["phone"],
            selectAlso="LOWER(name) AS lower_name",
            orderBy="lower_name",
            distinct=True)
        people = list(result)
        self.assertEquals(len(people), 2)
        self.assertEquals(people[0].name, "John Doe")
        self.assertEquals(people[1].name, "John Joe")

    def test_select_selectAlso_with_prejoin(self):
        class Person(self.Person):
            address = ForeignKey(foreignKey="Address", dbName="address_id",
                                 notNull=True)

        class Address(self.SQLObject):
            city = StringCol()

        result = Person.select(
            prejoins=["address"],
            selectAlso="LOWER(person.name) AS lower_name",
            orderBy="lower_name")
        people = list(result)
        self.assertEquals(len(people), 2)
        self.assertEquals([(person.name, person.address.city)
                           for person in people],
                          [("John Doe", "Sao Carlos"),
                           ("John Joe", "Curitiba")])

    def test_select_clauseTables_simple(self):
        result = self.Person.select("name = 'John Joe'", ["person"])
        self.assertEquals(result[0].name, "John Joe")

    def test_select_clauseTables_implicit_join(self):
        result = self.Person.select("person.name = 'John Joe' and "
                                    "phone.person_id = person.id",
                                    ["person", "phone"])
        self.assertEquals(result[0].name, "John Joe")

    def test_select_clauseTables_no_cls_table(self):
        result = self.Person.select("person.name = 'John Joe' and "
                                    "phone.person_id = person.id",
                                    ["phone"])
        self.assertEquals(result[0].name, "John Joe")

    def test_selectBy(self):
        result = self.Person.selectBy(name="John Joe")
        self.assertEquals(result[0].name, "John Joe")

    def test_selectBy_orderBy(self):
        result = self.Person.selectBy(age=20, orderBy="name")
        self.assertEquals(result[0].name, "John Doe")

        result = self.Person.selectBy(age=20, orderBy="-name")
        self.assertEquals(result[0].name, "John Joe")

    def test_selectOne(self):
        person = self.Person.selectOne("name = 'John Joe'")

        self.assertTrue(person)
        self.assertEquals(person.name, "John Joe")

        nobody = self.Person.selectOne("name = 'John None'")

        self.assertEquals(nobody, None)

        # SQLBuilder style expression:
        person = self.Person.selectOne(self.Person.q.name == "John Joe")

        self.assertNotEqual(person, None)
        self.assertEqual(person.name, "John Joe")

    def test_selectOne_multiple_results(self):
        self.assertRaises(SQLObjectMoreThanOneResultError,
                          self.Person.selectOne)

    def test_selectOne_clauseTables(self):
        person = self.Person.selectOne("person.name = 'John Joe' and "
                                       "phone.person_id = person.id",
                                       ["phone"])
        self.assertEquals(person.name, "John Joe")

    def test_selectOneBy(self):
        person = self.Person.selectOneBy(name="John Joe")

        self.assertTrue(person)
        self.assertEquals(person.name, "John Joe")

        nobody = self.Person.selectOneBy(name="John None")

        self.assertEquals(nobody, None)

    def test_selectOneBy_multiple_results(self):
        self.assertRaises(SQLObjectMoreThanOneResultError,
                          self.Person.selectOneBy)

    def test_selectFirst(self):
        person = self.Person.selectFirst("name LIKE 'John%'", orderBy="name")

        self.assertTrue(person)
        self.assertEquals(person.name, "John Doe")

        person = self.Person.selectFirst("name LIKE 'John%'", orderBy="-name")

        self.assertTrue(person)
        self.assertEquals(person.name, "John Joe")

        nobody = self.Person.selectFirst("name = 'John None'", orderBy="name")

        self.assertEquals(nobody, None)

        # SQLBuilder style expression:
        person = self.Person.selectFirst(LIKE(self.Person.q.name, "John%"),
                                         orderBy="name")
        self.assertNotEqual(person, None)
        self.assertEqual(person.name, "John Doe")

    def test_selectFirst_default_order(self):
        person = self.Person.selectFirst("name LIKE 'John%'")

        self.assertTrue(person)
        self.assertEquals(person.name, "John Joe")

    def test_selectFirst_default_order_list(self):
        class Person(self.Person):
            _defaultOrder = ["name"]

        person = Person.selectFirst("name LIKE 'John%'")

        self.assertTrue(person)
        self.assertEquals(person.name, "John Doe")

    def test_selectFirst_default_order_expr(self):
        class Person(self.Person):
            _defaultOrder = [SQLConstant("name")]

        person = Person.selectFirst("name LIKE 'John%'")

        self.assertTrue(person)
        self.assertEquals(person.name, "John Doe")

    def test_selectFirst_default_order_fully_qualified(self):
        class Person(self.Person):
            _defaultOrder = ["person.name"]

        person = Person.selectFirst("name LIKE 'John%'")

        self.assertTrue(person)
        self.assertEquals(person.name, "John Doe")

    def test_selectFirstBy(self):
        person = self.Person.selectFirstBy(age=20, orderBy="name")

        self.assertTrue(person)
        self.assertEquals(person.name, "John Doe")

        person = self.Person.selectFirstBy(age=20, orderBy="-name")

        self.assertTrue(person)
        self.assertEquals(person.name, "John Joe")

        nobody = self.Person.selectFirstBy(age=1000, orderBy="name")

        self.assertEquals(nobody, None)

    def test_selectFirstBy_default_order(self):
        person = self.Person.selectFirstBy(age=20)

        self.assertTrue(person)
        self.assertEquals(person.name, "John Joe")

    def test_syncUpdate(self):
        """syncUpdate() flushes pending changes to the database."""
        person = self.Person.get(id=1)
        person.name = "John Smith"
        person.syncUpdate()
        name = self.store.execute(
            "SELECT name FROM person WHERE id = 1").get_one()[0]
        self.assertEquals(name, "John Smith")

    def test_sync(self):
        """sync() flushes pending changes and invalidates the cache."""
        person = self.Person.get(id=1)
        person.name = "John Smith"
        person.sync()
        name = self.store.execute(
            "SELECT name FROM person WHERE id = 1").get_one()[0]
        self.assertEquals(name, "John Smith")
        # Now make a change behind Storm's back and show that sync()
        # makes the new value from the database visible.
        self.store.execute("UPDATE person SET name = 'Jane Smith' "
                           "WHERE id = 1", noresult=True)
        person.sync()
        self.assertEquals(person.name, "Jane Smith")

    def test_col_name(self):
        class Person(self.SQLObject):
            foo = StringCol(dbName="name")
        person = Person.get(2)
        self.assertEquals(person.foo, "John Doe")

        class Person(self.SQLObject):
            foo = StringCol("name")
        person = Person.get(2)
        self.assertEquals(person.foo, "John Doe")

    def test_col_default(self):
        class Person(self.SQLObject):
            name = StringCol(default="Johny")
        person = Person()
        self.assertEquals(person.name, "Johny")

    def test_col_default_factory(self):
        class Person(self.SQLObject):
            name = StringCol(default=lambda: "Johny")
        person = Person()
        self.assertEquals(person.name, "Johny")

    def test_col_not_null(self):
        class Person(self.SQLObject):
            name = StringCol(notNull=True)
        person = Person.get(2)
        self.assertRaises(NoneError, setattr, person, "name", None)

    def test_col_storm_validator(self):
        calls = []
        def validator(obj, attr, value):
            calls.append((obj, attr, value))
            return value
        class Person(self.SQLObject):
            name = StringCol(storm_validator=validator)
        person = Person.get(2)
        person.name = u'foo'
        self.assertEquals(calls, [(person, 'name', u'foo')])

    def test_string_col(self):
        class Person(self.SQLObject):
            name = StringCol()
        person = Person.get(2)
        self.assertEquals(person.name, "John Doe")

    def test_int_col(self):
        class Person(self.SQLObject):
            age = IntCol()
        person = Person.get(2)
        self.assertEquals(person.age, 20)

    def test_bool_col(self):
        class Person(self.SQLObject):
            age = BoolCol()
        person = Person.get(2)
        self.assertEquals(person.age, True)

    def test_float_col(self):
        class Person(self.SQLObject):
            age = FloatCol()
        person = Person.get(2)
        self.assertTrue(abs(person.age - 20.0) < 1e-6)

    def test_utcdatetime_col(self):
        class Person(self.SQLObject):
            ts = UtcDateTimeCol()
        person = Person.get(2)
        self.assertEquals(person.ts,
                          datetime.datetime(2007, 2, 5, 20, 53, 15,
                                            tzinfo=tzutc()))
    def test_date_col(self):
        class Person(self.SQLObject):
            ts = DateCol()
        person = Person.get(2)
        self.assertEquals(person.ts, datetime.date(2007, 2, 5))

    def test_interval_col(self):
        class Person(self.SQLObject):
            delta = IntervalCol()
        person = Person.get(2)
        self.assertEquals(person.delta, datetime.timedelta(42, 45296, 780000))

    def test_foreign_key(self):
        class Person(self.Person):
            address = ForeignKey(foreignKey="Address", dbName="address_id",
                                 notNull=True)

        class Address(self.SQLObject):
            city = StringCol()

        person = Person.get(2)

        self.assertEquals(person.addressID, 2)
        self.assertEquals(person.address.city, "Sao Carlos")

    def test_foreign_key_no_dbname(self):
        self.store.execute("CREATE TABLE another_person "
                           "(id INTEGER PRIMARY KEY, name TEXT, age INTEGER,"
                           " ts TIMESTAMP, address INTEGER)")
        self.store.execute("INSERT INTO another_person VALUES "
                           "(2, 'John Doe', 20, '2007-02-05 20:53:15', 2)")

        class AnotherPerson(self.Person):
            address = ForeignKey(foreignKey="Address", notNull=True)

        class Address(self.SQLObject):
            city = StringCol()

        person = AnotherPerson.get(2)

        self.assertEquals(person.addressID, 2)
        self.assertEquals(person.address.city, "Sao Carlos")

    def test_foreign_key_orderBy(self):
        class Person(self.Person):
            _defaultOrder = "address"
            address = ForeignKey(foreignKey="Address", dbName="address_id",
                                 notNull=True)

        class Address(self.SQLObject):
            city = StringCol()

        person = Person.selectFirst()
        self.assertEquals(person.addressID, 1)

    def test_foreign_key_storm_validator(self):
        calls = []
        def validator(obj, attr, value):
            calls.append((obj, attr, value))
            return value

        class Person(self.SQLObject):
            address = ForeignKey(foreignKey="Address", dbName="address_id",
                                 storm_validator=validator)

        class Address(self.SQLObject):
            city = StringCol()

        person = Person.get(2)
        address = Address.get(1)
        person.address = address
        self.assertEquals(calls, [(person, 'addressID', 1)])

    def test_multiple_join(self):
        class AnotherPerson(self.Person):
            _table = "person"
            phones = SQLMultipleJoin("Phone", joinColumn="person")

        class Phone(self.SQLObject):
            person = ForeignKey("AnotherPerson", dbName="person_id")
            number = StringCol()

        person = AnotherPerson.get(2)

        # Make sure that the result is wrapped.
        result = person.phones.orderBy("-number")

        self.assertEquals([phone.number for phone in result],
                          ["8765-5678", "1234-5678"])

        # Test add/remove methods.
        number = Phone.selectOneBy(number="1234-5678")
        person.removePhone(number)
        self.assertEquals(sorted(phone.number for phone in person.phones),
                          ["8765-5678"])
        person.addPhone(number)
        self.assertEquals(sorted(phone.number for phone in person.phones),
                          ["1234-5678", "8765-5678"])

    def test_multiple_join_prejoins(self):
        self.store.execute("ALTER TABLE phone ADD COLUMN address_id INT")
        self.store.execute("UPDATE phone SET address_id = 1")
        self.store.execute("UPDATE phone SET address_id = 2 WHERE id = 3")

        class AnotherPerson(self.Person):
            _table = "person"
            phones = SQLMultipleJoin("Phone", joinColumn="person",
                                     orderBy="number", prejoins=["address"])

        class Phone(self.SQLObject):
            person = ForeignKey("AnotherPerson", dbName="person_id")
            address = ForeignKey("Address", dbName="address_id")
            number = StringCol()

        class Address(self.SQLObject):
            city = StringCol()

        person = AnotherPerson.get(2)
        [phone1, phone2] = person.phones

        # Delete addresses behind Storm's back to show that the
        # addresses have been loaded.
        self.store.execute("DELETE FROM address")
        self.assertEquals(phone1.number, "1234-5678")
        self.assertEquals(phone1.address.city, "Curitiba")
        self.assertEquals(phone2.number, "8765-5678")
        self.assertEquals(phone2.address.city, "Sao Carlos")

    def test_related_join(self):
        class AnotherPerson(self.Person):
            _table = "person"
            phones = SQLRelatedJoin("Phone", otherColumn="phone_id",
                                    intermediateTable="PersonPhone",
                                    joinColumn="person_id", orderBy="id")

        class PersonPhone(self.Person):
            person_id = IntCol()
            phone_id = IntCol()

        class Phone(self.SQLObject):
            number = StringCol()

        person = AnotherPerson.get(2)

        self.assertEquals([phone.number for phone in person.phones],
                          ["1234-5678", "8765-4321"])

        # Make sure that the result is wrapped.
        result = person.phones.orderBy("-number")

        self.assertEquals([phone.number for phone in result],
                          ["8765-4321", "1234-5678"])

        # Test add/remove methods.
        number = Phone.selectOneBy(number="1234-5678")
        person.removePhone(number)
        self.assertEquals(sorted(phone.number for phone in person.phones),
                          ["8765-4321"])
        person.addPhone(number)
        self.assertEquals(sorted(phone.number for phone in person.phones),
                          ["1234-5678", "8765-4321"])

    def test_related_join_prejoins(self):
        self.store.execute("ALTER TABLE phone ADD COLUMN address_id INT")
        self.store.execute("UPDATE phone SET address_id = 1")
        self.store.execute("UPDATE phone SET address_id = 2 WHERE id = 2")

        class AnotherPerson(self.Person):
            _table = "person"
            phones = SQLRelatedJoin("Phone", otherColumn="phone_id",
                                    intermediateTable="PersonPhone",
                                    joinColumn="person_id", orderBy="id",
                                    prejoins=["address"])

        class PersonPhone(self.Person):
            person_id = IntCol()
            phone_id = IntCol()

        class Phone(self.SQLObject):
            number = StringCol()
            address = ForeignKey("Address", dbName="address_id")

        class Address(self.SQLObject):
            city = StringCol()

        person = AnotherPerson.get(2)
        [phone1, phone2] = person.phones

        # Delete addresses behind Storm's back to show that the
        # addresses have been loaded.
        self.store.execute("DELETE FROM address")
        self.assertEquals(phone1.number, "1234-5678")
        self.assertEquals(phone1.address.city, "Curitiba")
        self.assertEquals(phone2.number, "8765-4321")
        self.assertEquals(phone2.address.city, "Sao Carlos")

    def test_single_join(self):
        self.store.execute("CREATE TABLE office "
                           "(id INTEGER PRIMARY KEY, phone_id INTEGER,"
                           "name TEXT)")
        self.store.execute("INSERT INTO office VALUES (1, 1, 'An office')")

        class Phone(self.SQLObject):
            office = SingleJoin("Office", joinColumn="phoneID")

        class Office(self.SQLObject):
            phone = ForeignKey(foreignKey="Phone", dbName="phone_id",
                               notNull=True)
            name = StringCol()

        office = Office.get(1)
        self.assertEqual(office.name, "An office")

        phone = Phone.get(1)
        self.assertEqual(phone.office, office)

        # The single join returns None for a phone with no office
        phone = Phone.get(2)
        self.assertEqual(phone.office, None)

    def test_result_set_orderBy(self):
        result = self.Person.select()

        result = result.orderBy("-name")
        self.assertEquals([person.name for person in result],
                          ["John Joe", "John Doe"])

        result = result.orderBy("name")
        self.assertEquals([person.name for person in result],
                          ["John Doe", "John Joe"])

    def test_result_set_orderBy_fully_qualified(self):
        result = self.Person.select()

        result = result.orderBy("-person.name")
        self.assertEquals([person.name for person in result],
                          ["John Joe", "John Doe"])

        result = result.orderBy("person.name")
        self.assertEquals([person.name for person in result],
                          ["John Doe", "John Joe"])

    def test_result_set_count(self):
        result = self.Person.select()
        self.assertEquals(result.count(), 2)

    def test_result_set_count_distinct(self):
        result = self.Person.select(
            "person.id = phone.person_id",
            clauseTables=["phone"],
            distinct=True)
        self.assertEquals(result.count(), 2)

    def test_result_set_count_union_distinct(self):
        result1 = self.Person.select("person.id = 1", distinct=True)
        result2 = self.Person.select("person.id = 2", distinct=True)
        self.assertEquals(result1.union(result2).count(), 2)

    def test_result_set_count_with_joins(self):
        result = self.Person.select(
            "person.address_id = address.id",
            clauseTables=["address"])
        self.assertEquals(result.count(), 2)

    def test_result_set__getitem__(self):
        result = self.Person.select()
        self.assertEquals(result[0].name, "John Joe")

    def test_result_set__iter__(self):
        result = self.Person.select()
        self.assertEquals(list(result.__iter__())[0].name, "John Joe")

    def test_result_set__nonzero__(self):
        result = self.Person.select()
        self.assertEquals(result.__nonzero__(), True)
        result = self.Person.select(self.Person.q.name == "No Person")
        self.assertEquals(result.__nonzero__(), False)

    def test_result_set_distinct(self):
        result = self.Person.select("person.name = 'John Joe'",
                                    clauseTables=["phone"])
        self.assertEquals(len(list(result.distinct())), 1)

    def test_result_set_limit(self):
        result = self.Person.select()
        self.assertEquals(len(list(result.limit(1))), 1)

    def test_result_set_union(self):
        result1 = self.Person.selectBy(id=1)
        result2 = self.Person.selectBy(id=2)
        result3 = result1.union(result2, orderBy="name")
        self.assertEquals([person.name for person in result3],
                          ["John Doe", "John Joe"])

    def test_result_set_union_all(self):
        result1 = self.Person.selectBy(id=1)
        result2 = result1.union(result1, unionAll=True)
        self.assertEquals([person.name for person in result2],
                          ["John Joe", "John Joe"])

    def test_result_set_except_(self):
        person = self.Person(id=3, name="John Moe")
        result1 = self.Person.select()
        result2 = self.Person.selectBy(id=2)
        result3 = result1.except_(result2, orderBy="name")
        self.assertEquals([person.name for person in result3],
                          ["John Joe", "John Moe"])

    def test_result_set_intersect(self):
        person = self.Person(id=3, name="John Moe")
        result1 = self.Person.select()
        result2 = self.Person.select(self.Person.id.is_in((2, 3)))
        result3 = result1.intersect(result2, orderBy="name")
        self.assertEquals([person.name for person in result3],
                          ["John Doe", "John Moe"])

    def test_result_set_prejoin(self):
        self.store.execute("ALTER TABLE person ADD COLUMN phone_id INTEGER")
        self.store.execute("UPDATE person SET phone_id=1 WHERE name='John Doe'")

        class Person(self.Person):
            address = ForeignKey(foreignKey="Address", dbName="address_id")
            phone = ForeignKey(foreignKey="Phone", dbName="phone_id")

        class Address(self.SQLObject):
            city = StringCol()

        class Phone(self.SQLObject):
            number = StringCol()

        result = Person.select("person.name = 'John Doe'")
        result = result.prejoin(["address", "phone"])

        people = list(result)

        # Remove rows behind its back.
        self.store.execute("DELETE FROM address")
        self.store.execute("DELETE FROM phone")

        # They were prefetched, so it should work even then.
        self.assertEquals([person.address.city for person in people],
                          ["Sao Carlos"])
        self.assertEquals([person.phone.number for person in people],
                          ["1234-5678"])

    def test_result_set_prejoin_getitem(self):
        """Ensure that detuplelizing is used on getitem."""

        class Person(self.Person):
            address = ForeignKey(foreignKey="Address", dbName="address_id")

        class Address(self.SQLObject):
            city = StringCol()

        result = Person.select("person.name = 'John Doe'", prejoins=["address"])
        person = result[0]

        # Remove the row behind its back.
        self.store.execute("DELETE FROM address")

        # They were prefetched, so it should work even then.
        self.assertEquals(person.address.city, "Sao Carlos")

    def test_result_set_prejoin_one(self):
        """Ensure that detuplelizing is used on selectOne()."""

        class Person(self.Person):
            address = ForeignKey(foreignKey="Address", dbName="address_id")

        class Address(self.SQLObject):
            city = StringCol()

        person = Person.selectOne("person.name = 'John Doe'",
                                  prejoins=["address"])

        # Remove the row behind its back.
        self.store.execute("DELETE FROM address")

        # They were prefetched, so it should work even then.
        self.assertEquals(person.address.city, "Sao Carlos")

    def test_result_set_prejoin_first(self):
        """Ensure that detuplelizing is used on selectFirst()."""

        class Person(self.Person):
            address = ForeignKey(foreignKey="Address", dbName="address_id")

        class Address(self.SQLObject):
            city = StringCol()

        person = Person.selectFirst("person.name = 'John Doe'",
                                    prejoins=["address"], orderBy="name")

        # Remove the row behind Storm's back.
        self.store.execute("DELETE FROM address")

        # They were prefetched, so it should work even then.
        self.assertEquals(person.address.city, "Sao Carlos")

    def test_result_set_prejoin_by(self):
        """Ensure that prejoins work with selectBy() queries."""

        class Person(self.Person):
            address = ForeignKey(foreignKey="Address", dbName="address_id")

        class Address(self.SQLObject):
            city = StringCol()

        result = Person.selectBy(name="John Doe").prejoin(["address"])
        person = result[0]

        # Remove the row behind Storm's back.
        self.store.execute("DELETE FROM address")

        # They were prefetched, so it should work even then.
        self.assertEquals(person.address.city, "Sao Carlos")

    def test_result_set_prejoin_related(self):
        """Dotted prejoins are used to prejoin through another table."""
        class Phone(self.SQLObject):
            person = ForeignKey(foreignKey="AnotherPerson", dbName="person_id")
            number = StringCol()

        class AnotherPerson(self.Person):
            _table = "person"
            address = ForeignKey(foreignKey="Address", dbName="address_id")

        class Address(self.SQLObject):
            city = StringCol()

        phone = Phone.selectOne("phone.number = '1234-5678'",
                                prejoins=["person.address"])

        # Remove the rows behind Storm's back.
        self.store.execute("DELETE FROM address")
        self.store.execute("DELETE FROM person")

        # They were prefetched, so it should work even then.
        self.assertEquals(phone.person.name, "John Doe")
        self.assertEquals(phone.person.address.city, "Sao Carlos")

    def test_result_set_prejoin_table_twice(self):
        """A single table can be prejoined multiple times."""
        self.store.execute("CREATE TABLE lease "
                           "(id INTEGER PRIMARY KEY,"
                           " landlord_id INTEGER, tenant_id INTEGER)")
        self.store.execute("INSERT INTO lease VALUES (1, 1, 2)")

        class Address(self.SQLObject):
            city = StringCol()

        class AnotherPerson(self.Person):
            _table = "person"
            address = ForeignKey(foreignKey="Address", dbName="address_id")

        class Lease(self.SQLObject):
            landlord = ForeignKey(foreignKey="AnotherPerson",
                                  dbName="landlord_id")
            tenant = ForeignKey(foreignKey="AnotherPerson",
                                dbName="tenant_id")

        lease = Lease.select(prejoins=["landlord", "landlord.address",
                                       "tenant", "tenant.address"])[0]

        # Remove the person rows behind Storm's back.
        self.store.execute("DELETE FROM address")
        self.store.execute("DELETE FROM person")

        self.assertEquals(lease.landlord.name, "John Joe")
        self.assertEquals(lease.landlord.address.city, "Curitiba")
        self.assertEquals(lease.tenant.name, "John Doe")
        self.assertEquals(lease.tenant.address.city, "Sao Carlos")

    def test_result_set_prejoin_count(self):
        """Prejoins do not affect the result of aggregates like COUNT()."""
        class Person(self.Person):
            address = ForeignKey(foreignKey="Address", dbName="address_id")

        class Address(self.SQLObject):
            city = StringCol()

        result = Person.select("name = 'John Doe'", prejoins=["address"])
        self.assertEquals(result.count(), 1)

    def test_result_set_prejoin_mismatch_union(self):
        """Prejoins do not cause UNION incompatibilities. """
        class Person(self.Person):
            address = ForeignKey(foreignKey="Address", dbName="address_id")

        class Address(self.SQLObject):
            city = StringCol()

        # The prejoin should not prevent the union from working.  At
        # the moment this is done by unconditionally stripping the
        # prejoins (which is what our SQLObject patch did), but could
        # be smarter.
        result1 = Person.select("name = 'John Doe'", prejoins=["address"])
        result2 = Person.select("name = 'John Joe'")
        result = result1.union(result2)
        names = sorted(person.name for person in result)
        self.assertEquals(names, ["John Doe", "John Joe"])

    def test_result_set_prejoin_mismatch_except(self):
        """Prejoins do not cause EXCEPT incompatibilities. """
        class Person(self.Person):
            address = ForeignKey(foreignKey="Address", dbName="address_id")

        class Address(self.SQLObject):
            city = StringCol()

        # The prejoin should not prevent the union from working.  At
        # the moment this is done by unconditionally stripping the
        # prejoins (which is what our SQLObject patch did), but could
        # be smarter.
        result1 = Person.select("name = 'John Doe'", prejoins=["address"])
        result2 = Person.select("name = 'John Joe'")
        result = result1.except_(result2)
        names = sorted(person.name for person in result)
        self.assertEquals(names, ["John Doe"])

    def test_result_set_prejoin_mismatch_intersect(self):
        """Prejoins do not cause INTERSECT incompatibilities. """
        class Person(self.Person):
            address = ForeignKey(foreignKey="Address", dbName="address_id")

        class Address(self.SQLObject):
            city = StringCol()

        # The prejoin should not prevent the union from working.  At
        # the moment this is done by unconditionally stripping the
        # prejoins (which is what our SQLObject patch did), but could
        # be smarter.
        result1 = Person.select("name = 'John Doe'", prejoins=["address"])
        result2 = Person.select("name = 'John Doe'")
        result = result1.intersect(result2)
        names = sorted(person.name for person in result)
        self.assertEquals(names, ["John Doe"])

    def test_result_set_prejoinClauseTables(self):
        self.store.execute("ALTER TABLE person ADD COLUMN phone_id INTEGER")
        self.store.execute("UPDATE person SET phone_id=1 WHERE name='John Doe'")

        class Person(self.Person):
            address = ForeignKey(foreignKey="AddressClass", dbName="address_id")
            phone = ForeignKey(foreignKey="PhoneClass", dbName="phone_id")

        # Name the class so that it doesn't match the table name, to ensure
        # that the prejoin is actually using table names, rather than class
        # names.
        class AddressClass(self.SQLObject):
            _table = "address"
            city = StringCol()

        class PhoneClass(self.SQLObject):
            _table = "phone"
            number = StringCol()

        result = Person.select("person.name = 'John Doe' and "
                               "person.phone_id = phone.id and "
                               "person.address_id = address.id",
                               clauseTables=["address", "phone"])
        result = result.prejoinClauseTables(["address", "phone"])

        people = list(result)

        # Remove rows behind its back.
        self.store.execute("DELETE FROM address")
        self.store.execute("DELETE FROM phone")

        # They were prefetched, so it should work even then.
        self.assertEquals([person.address.city for person in people],
                          ["Sao Carlos"])
        self.assertEquals([person.phone.number for person in people],
                          ["1234-5678"])

    def test_result_set_sum_string(self):
        result = self.Person.select()
        self.assertEquals(result.sum('age'), 40)

    def test_result_set_sum_expr(self):
        result = self.Person.select()
        self.assertEquals(result.sum(self.Person.q.age), 40)

    def test_result_set_contains(self):
        john = self.Person.selectOneBy(name="John Doe")
        self.assertTrue(john in self.Person.select())
        self.assertFalse(john in self.Person.selectBy(name="John Joe"))
        self.assertFalse(john in self.Person.select(
                "Person.name = 'John Joe'"))

    def test_result_set_contains_does_not_use_iter(self):
        """Calling 'item in result_set' does not iterate over the set. """
        def no_iter(self):
            raise RuntimeError
        real_iter = SQLObjectResultSet.__iter__
        SQLObjectResultSet.__iter__ = no_iter
        try:
            john = self.Person.selectOneBy(name="John Doe")
            self.assertTrue(john in self.Person.select())
        finally:
            SQLObjectResultSet.__iter__ = real_iter

    def test_result_set_contains_wrong_type(self):
        class Address(self.SQLObject):
            city = StringCol()

        address = Address.get(1)
        result_set = self.Person.select()
        self.assertRaises(TypeError, operator.contains, result_set, address)

    def test_result_set_contains_with_prejoins(self):
        class Person(self.Person):
            address = ForeignKey(foreignKey="Address", dbName="address_id")

        class Address(self.SQLObject):
            city = StringCol()

        john = Person.selectOneBy(name="John Doe")
        result_set = Person.select("name = 'John Doe'", prejoins=["address"])
        self.assertTrue(john in result_set)

    def test_table_dot_q(self):
        # Table.q.fieldname is a syntax used in SQLObject for
        # sqlbuilder expressions.  Storm can use the main properties
        # for this, so the Table.q syntax just returns those
        # properties:
        class Person(self.SQLObject):
            _idName = "name"
            _idType = unicode
            address = ForeignKey(foreignKey="Phone", dbName="address_id",
                                 notNull=True)

        self.assertEquals(id(Person.q.id), id(Person.id))
        self.assertEquals(id(Person.q.address), id(Person.address))
        self.assertEquals(id(Person.q.addressID), id(Person.addressID))

        person = Person.get("John Joe")

        self.assertEquals(id(person.q.id), id(Person.id))
        self.assertEquals(id(person.q.address), id(Person.address))
        self.assertEquals(id(person.q.addressID), id(Person.addressID))

    def test_set(self):
        class Person(self.Person):
            def set(self, **kw):
                kw["id"] += 1
                super(Person, self).set(**kw)
        person = Person(id=3, name="John Moe")
        self.assertEquals(person.id, 4)
        self.assertEquals(person.name, "John Moe")

    def test_CONTAINSSTRING(self):
        expr = CONTAINSSTRING(self.Person.q.name, "Do")
        result = self.Person.select(expr)
        self.assertEquals([person.name for person in result],
                          ["John Doe"])

        person.name = "Funny !%_ Name"

        expr = NOT(CONTAINSSTRING(self.Person.q.name, "!%_"))
        result = self.Person.select(expr)
        self.assertEquals([person.name for person in result],
                          ["John Joe"])