mirror of
https://gitlab.isc.org/isc-projects/kea
synced 2025-09-01 22:45:18 +00:00
[2855] Add a basic builder thread that understands the shutdown command
This commit is contained in:
@@ -19,6 +19,8 @@ import copy
|
|||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import signal
|
import signal
|
||||||
|
import socket
|
||||||
|
import threading
|
||||||
|
|
||||||
sys.path.append('@@PYTHONPATH@@')
|
sys.path.append('@@PYTHONPATH@@')
|
||||||
import isc.log
|
import isc.log
|
||||||
@@ -28,6 +30,7 @@ from isc.server_common.bind10_server import BIND10Server, BIND10ServerFatal
|
|||||||
from isc.server_common.datasrc_clients_mgr \
|
from isc.server_common.datasrc_clients_mgr \
|
||||||
import DataSrcClientsMgr, ConfigError
|
import DataSrcClientsMgr, ConfigError
|
||||||
from isc.memmgr.datasrc_info import DataSrcInfo
|
from isc.memmgr.datasrc_info import DataSrcInfo
|
||||||
|
from isc.memmgr.builder import MemorySegmentBuilder
|
||||||
import isc.util.process
|
import isc.util.process
|
||||||
|
|
||||||
MODULE_NAME = 'memmgr'
|
MODULE_NAME = 'memmgr'
|
||||||
@@ -58,6 +61,10 @@ class Memmgr(BIND10Server):
|
|||||||
# active configuration generations. Allow tests to inspec it.
|
# active configuration generations. Allow tests to inspec it.
|
||||||
self._datasrc_info_list = []
|
self._datasrc_info_list = []
|
||||||
|
|
||||||
|
self._builder_setup = False
|
||||||
|
self._builder_command_queue = []
|
||||||
|
self._builder_response_queue = []
|
||||||
|
|
||||||
def _config_handler(self, new_config):
|
def _config_handler(self, new_config):
|
||||||
"""Configuration handler, called via BIND10Server.
|
"""Configuration handler, called via BIND10Server.
|
||||||
|
|
||||||
@@ -117,6 +124,46 @@ class Memmgr(BIND10Server):
|
|||||||
# All copy, switch to the new configuration.
|
# All copy, switch to the new configuration.
|
||||||
self._config_params = new_config_params
|
self._config_params = new_config_params
|
||||||
|
|
||||||
|
def __notify_from_builder(self):
|
||||||
|
# Nothing is implemented here for now. This method should have
|
||||||
|
# code to handle responses from the builder in
|
||||||
|
# self._builder_response_queue[]. Access must be synchronized
|
||||||
|
# using self._builder_lock.
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __create_builder_thread(self):
|
||||||
|
(self._master_sock, self._builder_sock) = \
|
||||||
|
socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
self.watch_fileno(self._master_sock, rcallback=self.__notify_from_builder)
|
||||||
|
|
||||||
|
self._builder_cv = threading.Condition()
|
||||||
|
self._builder_lock = threading.Lock()
|
||||||
|
|
||||||
|
self._builder = MemorySegmentBuilder(self._builder_sock,
|
||||||
|
self._builder_cv,
|
||||||
|
self._builder_lock,
|
||||||
|
self._builder_command_queue,
|
||||||
|
self._builder_response_queue)
|
||||||
|
self._builder_thread = threading.Thread(target=self._builder.run)
|
||||||
|
self._builder_thread.start()
|
||||||
|
|
||||||
|
self._builder_setup = True
|
||||||
|
|
||||||
|
def __shutdown_builder_thread(self):
|
||||||
|
if not self._builder_setup:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._builder_setup = False
|
||||||
|
|
||||||
|
with self._builder_cv:
|
||||||
|
with self._builder_lock:
|
||||||
|
self._builder_command_queue.append('shutdown')
|
||||||
|
self._builder_cv.notify_all()
|
||||||
|
|
||||||
|
self._builder_thread.join()
|
||||||
|
self._master_sock.close()
|
||||||
|
self._builder_sock.close()
|
||||||
|
|
||||||
def _setup_module(self):
|
def _setup_module(self):
|
||||||
"""Module specific initialization for BIND10Server."""
|
"""Module specific initialization for BIND10Server."""
|
||||||
try:
|
try:
|
||||||
@@ -130,6 +177,12 @@ class Memmgr(BIND10Server):
|
|||||||
logger.error(MEMMGR_NO_DATASRC_CONF, ex)
|
logger.error(MEMMGR_NO_DATASRC_CONF, ex)
|
||||||
raise BIND10ServerFatal('failed to setup memmgr module')
|
raise BIND10ServerFatal('failed to setup memmgr module')
|
||||||
|
|
||||||
|
self.__create_builder_thread()
|
||||||
|
|
||||||
|
def _shutdown_module(self):
|
||||||
|
"""Module specific finalization."""
|
||||||
|
self.__shutdown_builder_thread()
|
||||||
|
|
||||||
def _datasrc_config_handler(self, new_config, config_data):
|
def _datasrc_config_handler(self, new_config, config_data):
|
||||||
"""Callback of data_sources configuration update.
|
"""Callback of data_sources configuration update.
|
||||||
|
|
||||||
|
@@ -74,6 +74,10 @@ class TestMemmgr(unittest.TestCase):
|
|||||||
self.__orig_isdir = os.path.isdir
|
self.__orig_isdir = os.path.isdir
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
|
self.__mgr._shutdown_module()
|
||||||
|
|
||||||
|
self.assertEqual(len(self.__mgr._builder_command_queue), 0)
|
||||||
|
|
||||||
# Restore faked values
|
# Restore faked values
|
||||||
os.access = self.__orig_os_access
|
os.access = self.__orig_os_access
|
||||||
os.path.isdir = self.__orig_isdir
|
os.path.isdir = self.__orig_isdir
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
SUBDIRS = . tests
|
SUBDIRS = . tests
|
||||||
|
|
||||||
python_PYTHON = __init__.py datasrc_info.py
|
python_PYTHON = __init__.py builder.py datasrc_info.py
|
||||||
|
|
||||||
pythondir = $(pyexecdir)/isc/memmgr
|
pythondir = $(pyexecdir)/isc/memmgr
|
||||||
|
|
||||||
|
48
src/lib/python/isc/memmgr/builder.py
Normal file
48
src/lib/python/isc/memmgr/builder.py
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
# Copyright (C) 2013 Internet Systems Consortium.
|
||||||
|
#
|
||||||
|
# Permission to use, copy, modify, and distribute this software for any
|
||||||
|
# purpose with or without fee is hereby granted, provided that the above
|
||||||
|
# copyright notice and this permission notice appear in all copies.
|
||||||
|
#
|
||||||
|
# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
|
||||||
|
# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
|
||||||
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
|
||||||
|
# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
|
||||||
|
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
|
||||||
|
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
|
||||||
|
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
|
||||||
|
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||||
|
|
||||||
|
class MemorySegmentBuilder:
|
||||||
|
"""The builder runs in a different thread in the memory manager. It
|
||||||
|
waits for commands from the memory manager, and then executes them
|
||||||
|
in the given order sequentially.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, sock, cv, lock, command_queue, response_queue):
|
||||||
|
self._sock = sock
|
||||||
|
self._cv = cv
|
||||||
|
self._lock = lock
|
||||||
|
self._command_queue = command_queue
|
||||||
|
self._response_queue = response_queue
|
||||||
|
self._shutdown = False
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
with self._cv:
|
||||||
|
while not self._shutdown:
|
||||||
|
while len(self._command_queue) == 0:
|
||||||
|
self._cv.wait()
|
||||||
|
# move the queue content to a local queue
|
||||||
|
with self._lock:
|
||||||
|
local_command_queue = self._command_queue.copy()
|
||||||
|
self._command_queue.clear()
|
||||||
|
|
||||||
|
# run commands in the queue in the given order. For
|
||||||
|
# now, it only supports the "shutdown" command, which
|
||||||
|
# just exits the thread.
|
||||||
|
for command in local_command_queue:
|
||||||
|
if command == 'shutdown':
|
||||||
|
self._shutdown = True
|
||||||
|
break
|
||||||
|
raise Exception('Unknown command passed to ' +
|
||||||
|
'MemorySegmentBuilder: ' + command)
|
Reference in New Issue
Block a user