Sophie

Sophie

distrib > Mandriva > 2010.0 > i586 > media > contrib-release > by-pkgid > f27a9743f854b1c1ae6a97c15111e3ca > files > 31

python-psycopg-1.1.21-10mdv2010.0.i586.rpm

# threads.py -- example of multiple threads using psycopg
#
# Copyright (C) 2001 Federico Di Gregorio  <fog@debian.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
# for more details.
#
# -*- Mode: pyhton -*-

## put in DSN your DSN string

DSN = 'dbname=test user=test'

## some others parameters
INSERT_THREADS = ('A', 'B', 'C')
SELECT_THREADS = ('1', '2')

ROWS = 1000

COMMIT_STEP = 20
SELECT_SIZE = 25001
SELECT_STEP = 500
SELECT_DIV  = 250
SERIALIZE   = 1

## don't modify anything below tis line (except for experimenting)

import sys, psycopg, threading

if len(sys.argv) > 1:
    DSN = sys.argv[1]
if len(sys.argv) > 2:
    SERIALIZE = int(sys.argv[2])
    
print "Opening connection using dns:", DSN
conn = psycopg.connect(DSN, serialize=SERIALIZE)
curs = conn.cursor()

try:
    curs.execute("""CREATE TABLE test_threads (
                        name text, value1 int4, value2 float)""")
except:
    conn.rollback()
    curs.execute("DROP TABLE test_threads")
    curs.execute("""CREATE TABLE test_threads (
                        name text, value1 int4, value2 float)""")
conn.commit()


## this function inserts a big number of rows and creates and destroys
## a large number of cursors

def insert_func(conn, rows):
    name = threading.currentThread().getName()
    
    if SERIALIZE: cmt = conn
    else: cmt = conn.cursor()
    
    for i in range(rows):
        if divmod(i, COMMIT_STEP)[1] == 0:
            cmt.commit()
            s = name + ": COMMIT STEP " + str(i)
            print s
            c = conn.cursor()
            if not SERIALIZE: cmt = c
        try:
            c.execute("INSERT INTO test_threads VALUES (%s, %d, %f)",
                      (str(i), i, float(i)))
        except psycopg.ProgrammingError, err:
            print name, ": an error occurred; skipping this insert"
            print err
    cmt.commit()

## a nice select function that prints the current number of rows in the
## database (and transefer them, putting some pressure on the network)
    
def select_func(conn, z):
    name = threading.currentThread().getName()

    c = conn.cursor()
    if SERIALIZE: cmt = conn
    else:
        cmt = None
        c.autocommit()
    
    for i in range(SELECT_SIZE):
        if divmod(i, SELECT_STEP)[1] == 0:
            try:
                c.execute("SELECT * FROM test_threads WHERE value2 < %d",
                          ((i/z),))
                l = c.fetchall()
                s = name + ": number of rows fetched: " + str(len(l))
                print s
            except psycopg.ProgrammingError, err:
                print name, ": an error occurred; skipping this select"
                print err

            # update the cursor if not in autocommit
            if cmt:
                cmt.commit()
            else:
                c = conn.cursor()
                c.autocommit()

## create the threads
threads = []

print "Creating INSERT threads:"
for name in INSERT_THREADS:
    t = threading.Thread(None, insert_func, 'Thread-'+name, (conn, ROWS))
    t.setDaemon(0)
    threads.append(t)

print "Creating SELECT threads:"
for name in SELECT_THREADS:
    t = threading.Thread(None, select_func, 'Thread-'+name, (conn, SELECT_DIV))
    t.setDaemon(0)
    threads.append(t)

## really start the threads now
for t in threads:
    t.start()

# and wait for them to finish
for t in threads:
    t.join()
    print t.getName(), "exited OK"


curs.execute("SELECT count(name) FROM test_threads")
print "Inserted", curs.fetchone()[0], "rows."

curs.execute("DROP TABLE test_threads")