File: //usr/lib/python3/dist-packages/twisted/test/test_dirdbm.py
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Test cases for dirdbm module.
"""
import shutil
from base64 import b64decode
from twisted.trial import unittest
from twisted.persisted import dirdbm
from twisted.python.compat import _PY3
from twisted.python.filepath import FilePath
class DirDbmTests(unittest.TestCase):
def setUp(self):
self.path = FilePath(self.mktemp())
self.dbm = dirdbm.open(self.path.path)
self.items = ((b'abc', b'foo'), (b'/lalal', b'\000\001'), (b'\000\012', b'baz'))
def testAll(self):
k = b64decode("//==")
self.dbm[k] = b"a"
self.dbm[k] = b"a"
self.assertEqual(self.dbm[k], b"a")
def testRebuildInteraction(self):
from twisted.persisted import dirdbm
from twisted.python import rebuild
s = dirdbm.Shelf('dirdbm.rebuild.test')
s[b'key'] = b'value'
rebuild.rebuild(dirdbm)
# print s['key']
if _PY3:
testRebuildInteraction.skip=(
"Does not work on Python 3 (https://tm.tl/8887)")
def testDbm(self):
d = self.dbm
# Insert keys
keys = []
values = set()
for k, v in self.items:
d[k] = v
keys.append(k)
values.add(v)
keys.sort()
# Check they exist
for k, v in self.items:
self.assertIn(k, d)
self.assertEqual(d[k], v)
# Check non existent key
try:
d[b"XXX"]
except KeyError:
pass
else:
assert 0, "didn't raise KeyError on non-existent key"
# Check keys(), values() and items()
dbkeys = d.keys()
dbvalues = set(d.values())
dbitems = set(d.items())
dbkeys.sort()
items = set(self.items)
self.assertEqual(keys, dbkeys,
".keys() output didn't match: %s != %s" %
(repr(keys), repr(dbkeys)))
self.assertEqual(values, dbvalues,
".values() output didn't match: %s != %s" %
(repr(values), repr(dbvalues)))
self.assertEqual(items, dbitems,
"items() didn't match: %s != %s" %
(repr(items), repr(dbitems)))
copyPath = self.mktemp()
d2 = d.copyTo(copyPath)
copykeys = d.keys()
copyvalues = set(d.values())
copyitems = set(d.items())
copykeys.sort()
self.assertEqual(dbkeys, copykeys,
".copyTo().keys() didn't match: %s != %s" %
(repr(dbkeys), repr(copykeys)))
self.assertEqual(dbvalues, copyvalues,
".copyTo().values() didn't match: %s != %s" %
(repr(dbvalues), repr(copyvalues)))
self.assertEqual(dbitems, copyitems,
".copyTo().items() didn't match: %s != %s" %
(repr(dbkeys), repr(copyitems)))
d2.clear()
self.assertTrue(len(d2.keys()) == len(d2.values()) ==
len(d2.items()) == len(d2) == 0, ".clear() failed")
self.assertNotEqual(len(d), len(d2))
shutil.rmtree(copyPath)
# Delete items
for k, v in self.items:
del d[k]
self.assertNotIn(k, d, "key is still in database, even though we deleted it")
self.assertEqual(len(d.keys()), 0, "database has keys")
self.assertEqual(len(d.values()), 0, "database has values")
self.assertEqual(len(d.items()), 0, "database has items")
self.assertEqual(len(d), 0, "database has items")
def testModificationTime(self):
import time
# The mtime value for files comes from a different place than the
# gettimeofday() system call. On linux, gettimeofday() can be
# slightly ahead (due to clock drift which gettimeofday() takes into
# account but which open()/write()/close() do not), and if we are
# close to the edge of the next second, time.time() can give a value
# which is larger than the mtime which results from a subsequent
# write(). I consider this a kernel bug, but it is beyond the scope
# of this test. Thus we keep the range of acceptability to 3 seconds time.
# -warner
self.dbm[b"k"] = b"v"
self.assertTrue(abs(time.time() - self.dbm.getModificationTime(b"k")) <= 3)
self.assertRaises(KeyError, self.dbm.getModificationTime, b"nokey")
def testRecovery(self):
"""
DirDBM: test recovery from directory after a faked crash
"""
k = self.dbm._encode(b"key1")
with self.path.child(k + b".rpl").open(mode="wb") as f:
f.write(b"value")
k2 = self.dbm._encode(b"key2")
with self.path.child(k2).open(mode="wb") as f:
f.write(b"correct")
with self.path.child(k2 + b".rpl").open(mode="wb") as f:
f.write(b"wrong")
with self.path.child("aa.new").open(mode="wb") as f:
f.write(b"deleted")
dbm = dirdbm.DirDBM(self.path.path)
self.assertEqual(dbm[b"key1"], b"value")
self.assertEqual(dbm[b"key2"], b"correct")
self.assertFalse(self.path.globChildren("*.new"))
self.assertFalse(self.path.globChildren("*.rpl"))
def test_nonStringKeys(self):
"""
L{dirdbm.DirDBM} operations only support string keys: other types
should raise a L{TypeError}.
"""
self.assertRaises(TypeError, self.dbm.__setitem__, 2, "3")
try:
self.assertRaises(TypeError, self.dbm.__setitem__, "2", 3)
except unittest.FailTest:
# dirdbm.Shelf.__setitem__ supports non-string values
self.assertIsInstance(self.dbm, dirdbm.Shelf)
self.assertRaises(TypeError, self.dbm.__getitem__, 2)
self.assertRaises(TypeError, self.dbm.__delitem__, 2)
self.assertRaises(TypeError, self.dbm.has_key, 2)
self.assertRaises(TypeError, self.dbm.__contains__, 2)
self.assertRaises(TypeError, self.dbm.getModificationTime, 2)
def test_failSet(self):
"""
Failure path when setting an item.
"""
def _writeFail(path, data):
path.setContent(data)
raise IOError("fail to write")
self.dbm[b"failkey"] = b"test"
self.patch(self.dbm, "_writeFile", _writeFail)
self.assertRaises(IOError, self.dbm.__setitem__, b"failkey", b"test2")
class ShelfTests(DirDbmTests):
def setUp(self):
self.path = FilePath(self.mktemp())
self.dbm = dirdbm.Shelf(self.path.path)
self.items = ((b'abc', b'foo'), (b'/lalal', b'\000\001'), (b'\000\012', b'baz'),
(b'int', 12), (b'float', 12.0), (b'tuple', (None, 12)))
testCases = [DirDbmTests, ShelfTests]