mirror of
https://gitlab.isc.org/isc-projects/kea
synced 2025-08-29 21:18:02 +00:00
[2353] Check that BoB.start_msgq() calls isc.cc.Session() and group_subscribe()
This commit is contained in:
parent
de66fdd73b
commit
e6ebe1de8e
@ -1613,8 +1613,6 @@ class TestBossComponents(unittest.TestCase):
|
||||
bob.c_channel_env = {}
|
||||
# keep the timeout small for the test to complete quickly
|
||||
bob.msgq_timeout = 1
|
||||
# specifically set this to False so that the connect is
|
||||
# attempted
|
||||
bob.run_under_unittests = False
|
||||
|
||||
# use the MockProcessInfo creator
|
||||
@ -1637,6 +1635,17 @@ class TestBossComponents(unittest.TestCase):
|
||||
time.time = _my_time
|
||||
time.sleep = _my_sleep
|
||||
|
||||
global cc_sub
|
||||
cc_sub = None
|
||||
class DummySessionAlwaysFails():
|
||||
def __init__(self, socket_file):
|
||||
raise isc.cc.session.SessionError('Connection fails')
|
||||
def group_subscribe(self, s):
|
||||
global cc_sub
|
||||
cc_sub = s
|
||||
|
||||
isc.cc.Session = DummySessionAlwaysFails
|
||||
|
||||
with self.assertRaises(bind10_src.CChannelConnectError):
|
||||
# An exception will be thrown here when it eventually times
|
||||
# out.
|
||||
@ -1647,7 +1656,36 @@ class TestBossComponents(unittest.TestCase):
|
||||
# to happen inside start_msgq().
|
||||
self.assertEqual(attempts, 13)
|
||||
|
||||
# time.time() and time.sleep() are restored during tearDown().
|
||||
# group_subscribe() should not have been called here.
|
||||
self.assertIsNone(cc_sub)
|
||||
|
||||
global cc_socket_file
|
||||
cc_socket_file = None
|
||||
cc_sub = None
|
||||
class DummySession():
|
||||
def __init__(self, socket_file):
|
||||
global cc_socket_file
|
||||
cc_socket_file = socket_file
|
||||
def group_subscribe(self, s):
|
||||
global cc_sub
|
||||
cc_sub = s
|
||||
|
||||
isc.cc.Session = DummySession
|
||||
|
||||
# reset values
|
||||
attempts = 0
|
||||
tsec = 0
|
||||
|
||||
pi = bob.start_msgq()
|
||||
|
||||
# just one attempt, but 2 calls to time.time()
|
||||
self.assertEqual(attempts, 2)
|
||||
|
||||
self.assertEqual(cc_socket_file, bob.msgq_socket_file)
|
||||
self.assertEqual(cc_sub, 'Boss')
|
||||
|
||||
# isc.cc.Session, time.time() and time.sleep() are restored
|
||||
# during tearDown().
|
||||
|
||||
def test_start_cfgmgr(self):
|
||||
'''Test that b10-cfgmgr is started.'''
|
||||
|
Loading…
x
Reference in New Issue
Block a user