Sophie

Sophie

distrib > Mandriva > 2010.0 > i586 > media > contrib-release > by-pkgid > 429d9064b451fbf785f6785aa01bfcf8 > files > 49

python-storm-0.13-3mdv2010.0.noarch.rpm

#
# Copyright (c) 2006, 2007 Canonical
#
# Written by Gustavo Niemeyer <gustavo@niemeyer.net>
#
# This file is part of Storm Object Relational Mapper.
#
# Storm is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation; either version 2.1 of
# the License, or (at your option) any later version.
#
# Storm is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
from datetime import timedelta
import time
import os

from storm.exceptions import OperationalError
from storm.databases.sqlite import SQLite
from storm.database import create_database
from storm.uri import URI

from tests.databases.base import DatabaseTest, UnsupportedDatabaseTest
from tests.helper import TestHelper, MakePath


class SQLiteMemoryTest(DatabaseTest, TestHelper):

    helpers = [MakePath]
    
    def get_path(self):
        return ""

    def create_database(self):
        self.database = SQLite(URI("sqlite:%s?synchronous=OFF&timeout=0" %
                                   self.get_path()))

    def create_tables(self):
        self.connection.execute("CREATE TABLE number "
                                "(one INTEGER, two INTEGER, three INTEGER)")
        self.connection.execute("CREATE TABLE test "
                                "(id INTEGER PRIMARY KEY, title VARCHAR)")
        self.connection.execute("CREATE TABLE datetime_test "
                                "(id INTEGER PRIMARY KEY,"
                                " dt TIMESTAMP, d DATE, t TIME, td INTERVAL)")
        self.connection.execute("CREATE TABLE bin_test "
                                "(id INTEGER PRIMARY KEY, b BLOB)")

    def drop_tables(self):
        pass

    def test_wb_create_database(self):
        database = create_database("sqlite:")
        self.assertTrue(isinstance(database, SQLite))
        self.assertEquals(database._filename, ":memory:")

    def test_concurrent_behavior(self):
        pass # We can't connect to the in-memory database twice, so we can't
             # exercise the concurrency behavior (nor it makes sense).

    def test_synchronous(self):
        synchronous_values = {"OFF": 0, "NORMAL": 1, "FULL": 2}
        for value in synchronous_values:
            database = SQLite(URI("sqlite:%s?synchronous=%s" %
                                  (self.get_path(), value)))
            connection = database.connect()
            result = connection.execute("PRAGMA synchronous")
            self.assertEquals(result.get_one()[0],
                              synchronous_values[value])


class SQLiteFileTest(SQLiteMemoryTest):

    def get_path(self):
        return self.make_path()

    def test_wb_create_database(self):
        filename = self.make_path()
        database = create_database("sqlite:%s" % filename)
        self.assertTrue(isinstance(database, SQLite))
        self.assertEquals(database._filename, filename)

    def test_timeout(self):
        database = create_database("sqlite:%s?timeout=0.3" % self.get_path())
        connection1 = database.connect()
        connection2 = database.connect()
        connection1.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)")
        connection1.commit()
        connection1.execute("INSERT INTO test VALUES (1)")
        started = time.time()
        try:
            connection2.execute("INSERT INTO test VALUES (2)")
        except OperationalError, exception:
            self.assertEquals(str(exception), "database is locked")
            self.assertTrue(time.time()-started >= 0.3)
        else:
            self.fail("OperationalError not raised")

    def test_commit_timeout(self):
        """Regression test for commit observing the timeout.
        
        In 0.10, the timeout wasn't observed for connection.commit().

        """
        # Create a database with a table.
        database = create_database("sqlite:%s?timeout=0.3" % self.get_path())
        connection1 = database.connect()
        connection1.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)")
        connection1.commit()

        # Put some data in, but also make a second connection to the database,
        # which will prevent a commit until it is closed.
        connection1.execute("INSERT INTO test VALUES (1)")
        connection2 = database.connect()
        connection2.execute("SELECT id FROM test")

        started = time.time()
        try:
            connection1.commit()
        except OperationalError, exception:
            self.assertEquals(str(exception), "database is locked")
            # In 0.10, the next assertion failed because the timeout wasn't
            # enforced for the "COMMIT" statement.
            self.assertTrue(time.time()-started >= 0.3)
        else:
            self.fail("OperationalError not raised")

    def test_recover_after_timeout(self):
        """Regression test for recovering from database locked exception.
        
        In 0.10, connection.commit() would forget that a transaction was in
        progress if an exception was raised, such as an OperationalError due to
        another connection being open.  As a result, a subsequent modification
        to the database would cause BEGIN to be issued to the database, which
        would complain that a transaction was already in progress.

        """
        # Create a database with a table.
        database = create_database("sqlite:%s?timeout=0.3" % self.get_path())
        connection1 = database.connect()
        connection1.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)")
        connection1.commit()

        # Put some data in, but also make a second connection to the database,
        # which will prevent a commit until it is closed.
        connection1.execute("INSERT INTO test VALUES (1)")
        connection2 = database.connect()
        connection2.execute("SELECT id FROM test")
        self.assertRaises(OperationalError, connection1.commit)

        # Close the second connection - it should now be possible to commit.
        connection2.close()

        # In 0.10, the next statement raised OperationalError: cannot start a
        # transaction within a transaction
        connection1.execute("INSERT INTO test VALUES (2)")
        connection1.commit()

        # Check that the correct data is present
        self.assertEquals(connection1.execute("SELECT id FROM test").get_all(),
                          [(1,), (2,)])

class SQLiteUnsupportedTest(UnsupportedDatabaseTest, TestHelper):
 
    dbapi_module_names = ["pysqlite2", "sqlite3"]
    db_module_name = "sqlite"