2
0
mirror of https://gitlab.isc.org/isc-projects/kea synced 2025-08-30 21:45:37 +00:00

[324] added tests for different schema versions for sqlite3_ds.

this is mostly retired, but we are still using it for the current loadzone
script.  so I re-enabled the test, removed old test stuff, and added this
new test case.
This commit is contained in:
JINMEI Tatuya
2012-03-29 23:58:31 -07:00
parent 5b0ea09b5f
commit 8ef09a5063
5 changed files with 12 additions and 116 deletions

View File

@@ -1,7 +1,7 @@
PYCOVERAGE_RUN = @PYCOVERAGE_RUN@
# old tests, TODO remove or change to use new API?
#PYTESTS = master_test.py sqlite3_ds_test.py
PYTESTS = datasrc_test.py
#PYTESTS = master_test.py
PYTESTS = datasrc_test.py sqlite3_ds_test.py
EXTRA_DIST = $(PYTESTS)
EXTRA_DIST += testdata/brokendb.sqlite3

View File

@@ -22,122 +22,18 @@ import sqlite3
TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
TESTDATA_WRITE_PATH = os.environ['TESTDATA_WRITE_PATH'] + os.sep
READ_ZONE_DB_FILE = TESTDATA_PATH + "example.com.sqlite3"
BROKEN_DB_FILE = TESTDATA_PATH + "brokendb.sqlite3"
WRITE_ZONE_DB_FILE = TESTDATA_WRITE_PATH + "example.com.out.sqlite3"
NEW_DB_FILE = TESTDATA_WRITE_PATH + "new_db.sqlite3"
def example_reader():
my_zone = [
("example.com.", "3600", "IN", "SOA", "ns.example.com. admin.example.com. 1234 3600 1800 2419200 7200"),
("example.com.", "3600", "IN", "NS", "ns.example.com."),
("ns.example.com.", "3600", "IN", "A", "192.0.2.1")
]
for rr in my_zone:
yield rr
def example_reader_nested():
# this iterator is used in the 'locked' test; it will cause
# the load() method to try and write to the same database
sqlite3_ds.load(WRITE_ZONE_DB_FILE,
".",
example_reader)
return example_reader()
class TestSqlite3_ds(unittest.TestCase):
def test_zone_exist(self):
# The following file must be non existent and must be non
# "creatable"; the sqlite3 library will try to create a new
# DB file if it doesn't exist, so to test a failure case the
# create operation should also fail. The "nodir", a non
# existent directory, is inserted for this purpose.
nodir = "/nodir/notexist"
self.assertRaises(sqlite3_ds.Sqlite3DSError,
sqlite3_ds.zone_exist, "example.com", nodir)
# Open a broken database file
self.assertRaises(sqlite3_ds.Sqlite3DSError,
sqlite3_ds.zone_exist, "example.com",
BROKEN_DB_FILE)
self.assertTrue(sqlite3_ds.zone_exist("example.com.",
READ_ZONE_DB_FILE))
self.assertFalse(sqlite3_ds.zone_exist("example.org.",
READ_ZONE_DB_FILE))
def test_load_db(self):
sqlite3_ds.load(WRITE_ZONE_DB_FILE, ".", example_reader)
def test_locked_db(self):
# load it first to make sure it exists
sqlite3_ds.load(WRITE_ZONE_DB_FILE, ".", example_reader)
# and manually create a writing session as well
con = sqlite3.connect(WRITE_ZONE_DB_FILE);
cur = con.cursor()
cur.execute("delete from records")
self.assertRaises(sqlite3_ds.Sqlite3DSError,
sqlite3_ds.load, WRITE_ZONE_DB_FILE, ".",
example_reader)
con.rollback()
# and make sure lock does not stay
sqlite3_ds.load(WRITE_ZONE_DB_FILE, ".", example_reader)
# force locked db by nested loads
self.assertRaises(sqlite3_ds.Sqlite3DSError,
sqlite3_ds.load, WRITE_ZONE_DB_FILE, ".",
example_reader_nested)
# and make sure lock does not stay
sqlite3_ds.load(WRITE_ZONE_DB_FILE, ".", example_reader)
DBFILE_NEWSCHEMA = TESTDATA_PATH + "/newschema.sqlite3";
DBFILE_OLDSCHEMA = TESTDATA_PATH + "/oldschema.sqlite3";
DBFILE_NEW_MINOR_SCHEMA = TESTDATA_PATH + "/new_minor_schema.sqlite3";
class NewDBFile(unittest.TestCase):
def tearDown(self):
# remove the created database after every test
if (os.path.exists(NEW_DB_FILE)):
os.remove(NEW_DB_FILE)
def setUp(self):
# remove the created database before every test too, just
# in case a test got aborted half-way, and cleanup didn't occur
if (os.path.exists(NEW_DB_FILE)):
os.remove(NEW_DB_FILE)
def test_new_db(self):
self.assertFalse(os.path.exists(NEW_DB_FILE))
sqlite3_ds.open(NEW_DB_FILE)
self.assertTrue(os.path.exists(NEW_DB_FILE))
def test_new_db_locked(self):
self.assertFalse(os.path.exists(NEW_DB_FILE))
con = sqlite3.connect(NEW_DB_FILE);
con.isolation_level = None
cur = con.cursor()
cur.execute("BEGIN IMMEDIATE TRANSACTION")
# load should now fail, since the database is locked,
# and the open() call needs an exclusive lock
self.assertRaises(sqlite3.OperationalError,
sqlite3_ds.open, NEW_DB_FILE, 0.1)
con.rollback()
cur.close()
con.close()
self.assertTrue(os.path.exists(NEW_DB_FILE))
# now that we closed our connection, load should work again
sqlite3_ds.open(NEW_DB_FILE)
# the database should now have been created, and a new load should
# not require an exclusive lock anymore, so we lock it again
con = sqlite3.connect(NEW_DB_FILE);
cur = con.cursor()
cur.execute("BEGIN IMMEDIATE TRANSACTION")
sqlite3_ds.open(NEW_DB_FILE, 0.1)
con.rollback()
cur.close()
con.close()
def test_different_version(self):
self.assertTrue(os.path.exists(DBFILE_NEWSCHEMA))
self.assertRaises(sqlite3_ds.Sqlite3DSError, sqlite3_ds.open,
DBFILE_NEWSCHEMA)
self.assertRaises(sqlite3_ds.Sqlite3DSError, sqlite3_ds.open,
DBFILE_OLDSCHEMA)
self.assertNotEqual(None, sqlite3_ds.open(DBFILE_NEW_MINOR_SCHEMA)[0])
if __name__ == '__main__':
unittest.main()

Binary file not shown.

Binary file not shown.

Binary file not shown.