Files
libreoffice/connectivity/workben/postgresql/sdbcx.py
2011-11-17 21:15:14 +01:00

307 lines
12 KiB
Python

#*************************************************************************
#
# $RCSfile: sdbcx.py,v $
#
# $Revision: 1.1.2.6 $
#
# last change: $Author: jbu $ $Date: 2007/01/07 13:50:38 $
#
# The Contents of this file are made available subject to the terms of
# either of the following licenses
#
# - GNU Lesser General Public License Version 2.1
# - Sun Industry Standards Source License Version 1.1
#
# Sun Microsystems Inc., October, 2000
#
# GNU Lesser General Public License Version 2.1
# =============================================
# Copyright 2000 by Sun Microsystems, Inc.
# 901 San Antonio Road, Palo Alto, CA 94303, USA
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 2.1, as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
#
# Sun Industry Standards Source License Version 1.1
# =================================================
# The contents of this file are subject to the Sun Industry Standards
# Source License Version 1.1 (the "License"); You may not use this file
# except in compliance with the License. You may obtain a copy of the
# License at http://www.openoffice.org/license.html.
#
# Software provided under this License is provided on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
# WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
# MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
# See the License for the specific provisions governing your rights and
# obligations concerning the Software.
#
# The Initial Developer of the Original Code is: Joerg Budischewski
#
# Copyright: 2000 by Sun Microsystems, Inc.
#
# All Rights Reserved.
#
# Contributor(s): Joerg Budischewski
#
#
#
#*************************************************************************
import unittest
import ddl
import unohelper
import sys
from com.sun.star.sdbc import SQLException
from com.sun.star.sdbc.DataType import VARCHAR, CHAR, DECIMAL, DOUBLE, BIGINT, NUMERIC
from com.sun.star.sdbc.ColumnValue import NO_NULLS, NULLABLE
from com.sun.star.sdbcx.KeyType import PRIMARY, FOREIGN, UNIQUE
from com.sun.star.sdbc.KeyRule import RESTRICT, CASCADE, NO_ACTION
def suite(ctx,dburl):
suite = unittest.TestSuite()
suite.addTest(TestCase("testTables",ctx,dburl))
suite.addTest(TestCase("testViews",ctx,dburl))
suite.addTest(TestCase("testKeys",ctx,dburl))
suite.addTest(TestCase("testUsers",ctx,dburl))
suite.addTest(TestCase("testIndexes",ctx,dburl))
return suite
def nullable2Str( v ):
if v == NO_NULLS:
return "NOT NULL"
return ""
def autoIncremtent2Str( v ):
if v:
return "auto increment"
return ""
def dumpColumns( columns ):
n = columns.getCount()
print "Name\t type\t prec\t scale\t"
for i in range( 0, n ):
col = columns.getByIndex( i )
print col.Name + "\t "+col.TypeName + "\t " + str(col.Precision) + "\t " + str(col.Scale) + "\t "+\
str( col.DefaultValue ) + "\t " + str( col.Description ) + "\t " +\
autoIncremtent2Str( col.IsAutoIncrement ) + "\t " + \
nullable2Str( col.IsNullable )
class TestCase(unittest.TestCase):
def __init__(self,method,ctx,dburl):
unittest.TestCase.__init__(self,method)
self.ctx = ctx
self.dburl = dburl
def setUp( self ):
self.driver = self.ctx.ServiceManager.createInstanceWithContext(
'org.openoffice.comp.connectivity.pq.Driver' , self.ctx )
self.connection = self.driver.connect( self.dburl, () )
ddl.executeDDLs( self.connection )
def tearDown( self ):
self.connection.close()
def checkDescriptor( self, descriptor, name, typeName, type, prec, scale, defaultValue, desc ):
self.failUnless( descriptor.Name == name )
self.failUnless( descriptor.TypeName == typeName )
self.failUnless( descriptor.Type == type )
self.failUnless( descriptor.Precision == prec )
self.failUnless( descriptor.Scale == scale )
# print descriptor.DefaultValue + " == " + defaultValue
# self.failUnless( descriptor.DefaultValue == defaultValue )
self.failUnless( descriptor.Description == desc )
def testKeys( self ):
dd = self.driver.getDataDefinitionByConnection( self.connection )
tables = dd.getTables()
t = tables.getByName( "public.ordertab" )
keys = t.getKeys()
key = keys.getByName( "cust" )
self.failUnless( key.Name == "cust" )
self.failUnless( key.Type == FOREIGN )
self.failUnless( key.ReferencedTable == "public.customer" )
self.failUnless( key.UpdateRule == RESTRICT )
self.failUnless( key.DeleteRule == CASCADE )
keycolumns = keys.getByName( "ordertab_pkey" ).getColumns()
self.failUnless( keycolumns.getElementNames() == (u"id",) )
key = keys.getByName( "ordertab_pkey" )
self.failUnless( key.Name == "ordertab_pkey" )
self.failUnless( key.Type == PRIMARY )
self.failUnless( key.UpdateRule == NO_ACTION )
self.failUnless( key.DeleteRule == NO_ACTION )
keys = tables.getByName( "public.customer" ).getKeys()
key = keys.getByName( "customer_dummyserial_key" )
self.failUnless( key.Name == "customer_dummyserial_key" )
self.failUnless( key.Type == UNIQUE )
self.failUnless( key.UpdateRule == NO_ACTION )
self.failUnless( key.DeleteRule == NO_ACTION )
keys = tables.getByName( "public.orderpos" ).getKeys()
keyEnum = keys.createEnumeration()
while keyEnum.hasMoreElements():
key = keyEnum.nextElement()
cols = key.getColumns()
colEnum = cols.createEnumeration()
while colEnum.hasMoreElements():
col = colEnum.nextElement()
def testViews( self ):
dd = self.driver.getDataDefinitionByConnection( self.connection )
views = dd.getViews()
v = views.getByName( "public.customer2" )
self.failUnless( v.Name == "customer2" )
self.failUnless( v.SchemaName == "public" )
self.failUnless( v.Command != "" )
def testIndexes( self ):
dd = self.driver.getDataDefinitionByConnection( self.connection )
tables = dd.getTables()
t = tables.getByName( "public.ordertab" )
indexes = t.getIndexes()
index = indexes.getByName( "ordertab_pkey" )
self.failUnless( index.Name == "ordertab_pkey" )
self.failUnless( index.IsPrimaryKeyIndex )
self.failUnless( index.IsUnique )
self.failUnless( not index.IsClustered )
columns = index.getColumns()
self.failUnless( columns.hasByName( "id" ) )
self.failUnless( columns.getByIndex(0).Name == "id" )
def checkRenameTable( self, t , tables):
t.rename( "foo" )
self.failUnless( tables.hasByName( "public.foo" ) )
t.rename( "public.foo2" )
self.failUnless( tables.hasByName( "public.foo2" ) )
try:
t.rename( "pqsdbc_test.foo2" )
self.failUnless( tables.hasByName( "pqsdbc_test.foo2" ) )
print "looks like a server 8.1 or later (changing a schema succeeded)"
t.rename( "pqsdbc_test.foo" )
self.failUnless( tables.hasByName( "pqsdbc_test.foo" ) )
t.rename( "public.foo2" )
self.failUnless( tables.hasByName( "public.foo2" ) )
except SQLException,e:
if e.Message.find( "support changing" ) >= 0:
print "looks like a server prior to 8.1 (changing schema failed with Message [" + e.Message.replace("\n", " ") + "])"
else:
raise e
tables.dropByName( "public.foo2" )
def testTables( self ):
dd = self.driver.getDataDefinitionByConnection( self.connection )
tables = dd.getTables()
t = tables.getByName( "public.customer" )
self.failUnless( t.Name == "customer" )
self.failUnless( t.SchemaName == "public" )
self.failUnless( t.Type == "TABLE" )
cols = t.getColumns()
self.failUnless( cols.hasByName( 'name' ) )
self.failUnless( cols.hasByName( 'id' ) )
col = cols.getByName( "dummyserial" )
# dumpColumns( cols )
self.checkDescriptor( cols.getByName( "id" ), "id", "bpchar", CHAR, 8, 0, "", "unique id" )
self.checkDescriptor( cols.getByName( "name" ), "name", "text", VARCHAR, 0, 0, "", "" )
dd = cols.createDataDescriptor()
dd.Name = "foo"
dd.TypeName = "CHAR"
dd.Type = CHAR
dd.Precision = 25
dd.IsNullable = NULLABLE
cols.appendByDescriptor( dd )
dd.Name = "foo2"
dd.TypeName = "DECIMAL"
dd.Type = DECIMAL
dd.Precision = 12
dd.Scale = 5
dd.DefaultValue = "2.3423"
dd.Description = "foo2 description"
cols.appendByDescriptor( dd )
dd.Name = "cash"
dd.TypeName = "MONEY"
dd.Type = DOUBLE
# dd.IsNullable = NO_NULLS
dd.DefaultValue = "'2.42'"
cols.appendByDescriptor( dd )
cols.refresh()
self.checkDescriptor( cols.getByName( "foo"), "foo", "bpchar", CHAR, 25,0,"","")
self.checkDescriptor(
cols.getByName( "foo2"), "foo2", "numeric", NUMERIC, 12,5,"2.3423","foo2 description")
# dumpColumns( cols )
datadesc = tables.createDataDescriptor()
datadesc.SchemaName = "public"
datadesc.Name = "blub"
datadesc.Description = "This describes blub"
tables.appendByDescriptor( datadesc )
# make the appended descriptors known
tables.refresh()
t = tables.getByName( "public.blub" )
self.failUnless( t.Name == "blub" )
self.failUnless( t.SchemaName == "public" )
self.failUnless( t.Description == "This describes blub" )
cols = t.getColumns()
dd = cols.createDataDescriptor()
dd.Name = "mytext"
dd.TypeName = "text"
dd.Type = VARCHAR
dd.IsNullable = NO_NULLS
cols.appendByDescriptor( dd )
cols.refresh()
dd.DefaultValue = "'myDefault'"
dd.Name = "mytext2"
dd.IsNullable = NULLABLE
dd.Description = "mytext-Description"
t.alterColumnByName( "mytext" , dd )
cols.refresh()
self.checkDescriptor( cols.getByName( "mytext2" ), "mytext2", "text", VARCHAR, 0,0,"'myDefault'","mytext-Description" )
t = tables.getByName( "public.customer2" )
self.checkRenameTable( t,tables )
t = tables.getByName( "public.blub" )
self.checkRenameTable( t,tables )
def testUsers( self ):
dd = self.driver.getDataDefinitionByConnection( self.connection )
users = dd.getUsers()
self.failUnless( "pqsdbc_joe" in users.getElementNames() )