2
0
mirror of https://github.com/openvswitch/ovs synced 2025-08-31 22:35:15 +00:00

python: idl: Add monitor_cond_since support.

Add support for monitor_cond_since / update3 to python-ovs to
allow more efficient reconnections when connecting to clustered
OVSDB servers.

Signed-off-by: Terry Wilson <twilson@redhat.com>
Acked-by: Dumitru Ceara <dceara@redhat.com>
Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
This commit is contained in:
Terry Wilson
2021-12-01 11:51:20 -06:00
committed by Ilya Maximets
parent 0d1ffb7756
commit 46d44cf3be
3 changed files with 215 additions and 36 deletions

4
NEWS
View File

@@ -17,6 +17,10 @@ Post-v2.16.0
- Python:
* For SSL support, the use of the pyOpenSSL library has been replaced
with the native 'ssl' module.
- OVSDB:
* Python library for OVSDB clients now also supports faster
resynchronization with a clustered database after a brief disconnection,
i.e. 'monitor_cond_since' monitoring method.
- ovs-dpctl and 'ovs-appctl dpctl/':
* New commands 'cache-get-size' and 'cache-set-size' that allows to
get or configure linux kernel datapath cache sizes.

View File

@@ -13,6 +13,7 @@
# limitations under the License.
import collections
import enum
import functools
import uuid
@@ -36,6 +37,7 @@ ROW_DELETE = "delete"
OVSDB_UPDATE = 0
OVSDB_UPDATE2 = 1
OVSDB_UPDATE3 = 2
CLUSTERED = "clustered"
RELAY = "relay"
@@ -75,6 +77,65 @@ class ColumnDefaultDict(dict):
return item in self.keys()
class Monitor(enum.IntEnum):
monitor = OVSDB_UPDATE
monitor_cond = OVSDB_UPDATE2
monitor_cond_since = OVSDB_UPDATE3
class ConditionState(object):
def __init__(self):
self._ack_cond = None
self._req_cond = None
self._new_cond = [True]
def __iter__(self):
return iter([self._new_cond, self._req_cond, self._ack_cond])
@property
def new(self):
"""The latest freshly initialized condition change"""
return self._new_cond
@property
def acked(self):
"""The last condition change that has been accepted by the server"""
return self._ack_cond
@property
def requested(self):
"""A condition that's been requested, but not acked by the server"""
return self._req_cond
@property
def latest(self):
"""The most recent condition change"""
return next(cond for cond in self if cond is not None)
@staticmethod
def is_true(condition):
return condition == [True]
def init(self, cond):
"""Signal that a condition change is being initiated"""
self._new_cond = cond
def ack(self):
"""Signal that a condition change has been acked"""
if self._req_cond is not None:
self._ack_cond, self._req_cond = (self._req_cond, None)
def request(self):
"""Signal that a condition change has been requested"""
if self._new_cond is not None:
self._req_cond, self._new_cond = (self._new_cond, None)
def reset(self):
"""Reset a requested condition change back to new"""
if self._req_cond is not None and self._new_cond is None:
self._new_cond, self._req_cond = (self._req_cond, None)
class Idl(object):
"""Open vSwitch Database Interface Definition Language (OVSDB IDL).
@@ -132,7 +193,13 @@ class Idl(object):
IDL_S_SERVER_MONITOR_REQUESTED = 2
IDL_S_DATA_MONITOR_REQUESTED = 3
IDL_S_DATA_MONITOR_COND_REQUESTED = 4
IDL_S_MONITORING = 5
IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED = 5
IDL_S_MONITORING = 6
monitor_map = {
Monitor.monitor: IDL_S_SERVER_MONITOR_REQUESTED,
Monitor.monitor_cond: IDL_S_DATA_MONITOR_COND_REQUESTED,
Monitor.monitor_cond_since: IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED}
def __init__(self, remote, schema_helper, probe_interval=None,
leader_only=True):
@@ -176,10 +243,12 @@ class Idl(object):
remotes = self._parse_remotes(remote)
self._session = ovs.jsonrpc.Session.open_multiple(remotes,
probe_interval=probe_interval)
self._request_id = None
self._monitor_request_id = None
self._last_seqno = None
self.change_seqno = 0
self.uuid = uuid.uuid1()
self.last_id = str(uuid.UUID(int=0))
# Server monitor.
self._server_schema_request_id = None
@@ -206,6 +275,9 @@ class Idl(object):
self.txn = None
self._outstanding_txns = {}
self.cond_changed = False
self.cond_seqno = 0
for table in schema.tables.values():
for column in table.columns.values():
if not hasattr(column, 'alert'):
@@ -213,8 +285,7 @@ class Idl(object):
table.need_table = False
table.rows = custom_index.IndexedRows(table)
table.idl = self
table.condition = [True]
table.cond_changed = False
table.condition = ConditionState()
def _parse_remotes(self, remote):
# If remote is -
@@ -252,6 +323,38 @@ class Idl(object):
update."""
self._session.close()
def ack_conditions(self):
"""Mark all requested table conditions as acked"""
for table in self.tables.values():
table.condition.ack()
def sync_conditions(self):
"""Synchronize condition state when the FSM is restarted
If a non-zero last_id is available for the DB, then upon reconnect
the IDL should first request acked conditions to avoid missing updates
about records that were added before the transaction with
txn-id == last_id. If there were requested condition changes in flight
and the IDL client didn't set new conditions, then reset the requested
conditions to new to trigger a follow-up monitor_cond_change request.
"""
ack_all = self.last_id == str(uuid.UUID(int=0))
for table in self.tables.values():
if ack_all:
table.condition.request()
table.condition.ack()
else:
table.condition.reset()
self.cond_changed = True
def restart_fsm(self):
# Resync data DB table conditions to avoid missing updated due to
# conditions that were in flight or changed locally while the
# connection was down.
self.sync_conditions()
self.__send_server_schema_request()
self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
def run(self):
"""Processes a batch of messages from the database server. Returns
True if the database as seen through the IDL changed, False if it did
@@ -286,7 +389,7 @@ class Idl(object):
if seqno != self._last_seqno:
self._last_seqno = seqno
self.__txn_abort_all()
self.__send_server_schema_request()
self.restart_fsm()
if self.lock_name:
self.__send_lock_request()
break
@@ -294,8 +397,20 @@ class Idl(object):
msg = self._session.recv()
if msg is None:
break
is_response = msg.type in (ovs.jsonrpc.Message.T_REPLY,
ovs.jsonrpc.Message.T_ERROR)
if is_response and self._request_id and self._request_id == msg.id:
self._request_id = None
# process_response follows
if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
and msg.method == "update3"
and len(msg.params) == 3):
# Database contents changed.
self.__parse_update(msg.params[2], OVSDB_UPDATE3)
self.last_id = msg.params[1]
elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
and msg.method == "update2"
and len(msg.params) == 2):
# Database contents changed.
@@ -320,11 +435,18 @@ class Idl(object):
try:
self.change_seqno += 1
self._monitor_request_id = None
self.__clear()
if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
if (self.state ==
self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED):
# If 'found' is false, clear table rows for new dump
if not msg.result[0]:
self.__clear()
self.__parse_update(msg.result[2], OVSDB_UPDATE3)
elif self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
self.__clear()
self.__parse_update(msg.result, OVSDB_UPDATE2)
else:
assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
self.__clear()
self.__parse_update(msg.result, OVSDB_UPDATE)
self.state = self.IDL_S_MONITORING
@@ -398,11 +520,17 @@ class Idl(object):
elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
# Reply to our echo request. Ignore it.
pass
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
self.state == (
self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED) and
self._monitor_request_id == msg.id):
if msg.error == "unknown method":
self.__send_monitor_request(Monitor.monitor_cond)
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
self._monitor_request_id == msg.id):
if msg.error == "unknown method":
self.__send_monitor_request()
self.__send_monitor_request(Monitor.monitor)
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
self._server_schema_request_id is not None and
self._server_schema_request_id == msg.id):
@@ -418,6 +546,13 @@ class Idl(object):
and self.__txn_process_reply(msg)):
# __txn_process_reply() did everything needed.
pass
elif (msg.type == ovs.jsonrpc.Message.T_REPLY and
self.state == self.IDL_S_MONITORING):
# Mark the last requested conditions as acked and if further
# condition changes were pending, send them now.
self.ack_conditions()
self.send_cond_change()
self.cond_seqno += 1
else:
# This can happen if a transaction is destroyed before we
# receive the reply, so keep the log level low.
@@ -427,14 +562,36 @@ class Idl(object):
return initial_change_seqno != self.change_seqno
def send_cond_change(self):
if not self._session.is_connected():
def compose_cond_change(self):
if not self.cond_changed:
return
change_requests = {}
for table in self.tables.values():
if table.cond_changed:
self.__send_cond_change(table, table.condition)
table.cond_changed = False
# Always use the most recent conditions set by the IDL client when
# requesting monitor_cond_change
if table.condition.new is not None:
change_requests[table.name] = [
{"where": table.condition.new}]
table.condition.request()
if not change_requests:
return
self.cond_changed = False
old_uuid = str(self.uuid)
self.uuid = uuid.uuid1()
params = [old_uuid, str(self.uuid), change_requests]
return ovs.jsonrpc.Message.create_request(
"monitor_cond_change", params)
def send_cond_change(self):
if not self._session.is_connected() or self._request_id is not None:
return
msg = self.compose_cond_change()
if msg:
self.send_request(msg)
def cond_change(self, table_name, cond):
"""Sets the condition for 'table_name' to 'cond', which should be a
@@ -450,13 +607,28 @@ class Idl(object):
if cond == []:
cond = [False]
if table.condition != cond:
table.condition = cond
table.cond_changed = True
# Compare the new condition to the last known condition
if table.condition.latest != cond:
table.condition.init(cond)
self.cond_changed = True
# New condition will be sent out after all already requested ones
# are acked.
if table.condition.new:
any_reqs = any(t.condition.request for t in self.tables.values())
return self.cond_seqno + int(any_reqs) + 1
# Already requested conditions should be up to date at
# self.cond_seqno + 1 while acked conditions are already up to date
return self.cond_seqno + int(bool(table.condition.requested))
def wait(self, poller):
"""Arranges for poller.block() to wake up when self.run() has something
to do or when activity occurs on a transaction on 'self'."""
if self.cond_changed:
poller.immediate_wake()
return
self._session.wait(poller)
self._session.recv_wait(poller)
@@ -531,14 +703,6 @@ class Idl(object):
to doing nothing to avoid overhead where it is not needed.
"""
def __send_cond_change(self, table, cond):
monitor_cond_change = {table.name: [{"where": cond}]}
old_uuid = str(self.uuid)
self.uuid = uuid.uuid1()
params = [old_uuid, str(self.uuid), monitor_cond_change]
msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
self._session.send(msg)
def __clear(self):
changed = False
@@ -547,6 +711,8 @@ class Idl(object):
changed = True
table.rows = custom_index.IndexedRows(table)
self.cond_seqno = 0
if changed:
self.change_seqno += 1
@@ -601,11 +767,18 @@ class Idl(object):
self._db_change_aware_request_id = msg.id
self._session.send(msg)
def __send_monitor_request(self):
if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
self.IDL_S_INITIAL]):
def send_request(self, request):
self._request_id = request.id
if self._session.is_connected():
return self._session.send(request)
def __send_monitor_request(self, max_version=Monitor.monitor_cond_since):
if self.state == self.IDL_S_INITIAL:
self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
method = "monitor_cond"
elif self.state == self.IDL_S_SERVER_MONITOR_REQUESTED:
self.state = self.monitor_map[Monitor(max_version)]
method = Monitor(max_version).name
else:
self.state = self.IDL_S_DATA_MONITOR_REQUESTED
method = "monitor"
@@ -619,22 +792,24 @@ class Idl(object):
(column not in self.readonly[table.name])):
columns.append(column)
monitor_request = {"columns": columns}
if method == "monitor_cond" and table.condition != [True]:
monitor_request["where"] = table.condition
table.cond_change = False
if method in ("monitor_cond", "monitor_cond_since") and (
not ConditionState.is_true(table.condition.acked)):
monitor_request["where"] = table.condition.acked
monitor_requests[table.name] = [monitor_request]
msg = ovs.jsonrpc.Message.create_request(
method, [self._db.name, str(self.uuid), monitor_requests])
args = [self._db.name, str(self.uuid), monitor_requests]
if method == "monitor_cond_since":
args.append(str(self.last_id))
msg = ovs.jsonrpc.Message.create_request(method, args)
self._monitor_request_id = msg.id
self._session.send(msg)
self.send_request(msg)
def __send_server_schema_request(self):
self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
msg = ovs.jsonrpc.Message.create_request(
"get_schema", [self._server_db_name, str(self.uuid)])
self._server_schema_request_id = msg.id
self._session.send(msg)
self.send_request(msg)
def __send_server_monitor_request(self):
self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
@@ -654,7 +829,7 @@ class Idl(object):
str(self.server_monitor_uuid),
monitor_requests])
self._server_monitor_request_id = msg.id
self._session.send(msg)
self.send_request(msg)
def __parse_update(self, update, version, tables=None):
try:
@@ -698,7 +873,7 @@ class Idl(object):
self.cooperative_yield()
if version == OVSDB_UPDATE2:
if version in (OVSDB_UPDATE2, OVSDB_UPDATE3):
changes = self.__process_update2(table, uuid, row_update)
if changes:
notices.append(changes)

View File

@@ -2319,7 +2319,7 @@ m4_define([OVSDB_CHECK_CLUSTER_IDL],
# Checks that monitor_cond_since works fine when disconnects happen
# with cond_change requests in flight (i.e., IDL is properly updated).
OVSDB_CHECK_CLUSTER_IDL_C([simple idl, monitor_cond_since, cluster disconnect],
OVSDB_CHECK_CLUSTER_IDL([simple idl, monitor_cond_since, cluster disconnect],
3,
[['["idltest",
{"op": "insert",