2
0
mirror of https://github.com/openvswitch/ovs synced 2025-09-04 00:05:15 +00:00

python: idl: Add monitor_cond_since support.

Add support for monitor_cond_since / update3 to python-ovs to
allow more efficient reconnections when connecting to clustered
OVSDB servers.

Signed-off-by: Terry Wilson <twilson@redhat.com>
Acked-by: Dumitru Ceara <dceara@redhat.com>
Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
This commit is contained in:
Terry Wilson
2021-12-01 11:51:20 -06:00
committed by Ilya Maximets
parent 0d1ffb7756
commit 46d44cf3be
3 changed files with 215 additions and 36 deletions

4
NEWS
View File

@@ -17,6 +17,10 @@ Post-v2.16.0
- Python: - Python:
* For SSL support, the use of the pyOpenSSL library has been replaced * For SSL support, the use of the pyOpenSSL library has been replaced
with the native 'ssl' module. with the native 'ssl' module.
- OVSDB:
* Python library for OVSDB clients now also supports faster
resynchronization with a clustered database after a brief disconnection,
i.e. 'monitor_cond_since' monitoring method.
- ovs-dpctl and 'ovs-appctl dpctl/': - ovs-dpctl and 'ovs-appctl dpctl/':
* New commands 'cache-get-size' and 'cache-set-size' that allows to * New commands 'cache-get-size' and 'cache-set-size' that allows to
get or configure linux kernel datapath cache sizes. get or configure linux kernel datapath cache sizes.

View File

@@ -13,6 +13,7 @@
# limitations under the License. # limitations under the License.
import collections import collections
import enum
import functools import functools
import uuid import uuid
@@ -36,6 +37,7 @@ ROW_DELETE = "delete"
OVSDB_UPDATE = 0 OVSDB_UPDATE = 0
OVSDB_UPDATE2 = 1 OVSDB_UPDATE2 = 1
OVSDB_UPDATE3 = 2
CLUSTERED = "clustered" CLUSTERED = "clustered"
RELAY = "relay" RELAY = "relay"
@@ -75,6 +77,65 @@ class ColumnDefaultDict(dict):
return item in self.keys() return item in self.keys()
class Monitor(enum.IntEnum):
monitor = OVSDB_UPDATE
monitor_cond = OVSDB_UPDATE2
monitor_cond_since = OVSDB_UPDATE3
class ConditionState(object):
def __init__(self):
self._ack_cond = None
self._req_cond = None
self._new_cond = [True]
def __iter__(self):
return iter([self._new_cond, self._req_cond, self._ack_cond])
@property
def new(self):
"""The latest freshly initialized condition change"""
return self._new_cond
@property
def acked(self):
"""The last condition change that has been accepted by the server"""
return self._ack_cond
@property
def requested(self):
"""A condition that's been requested, but not acked by the server"""
return self._req_cond
@property
def latest(self):
"""The most recent condition change"""
return next(cond for cond in self if cond is not None)
@staticmethod
def is_true(condition):
return condition == [True]
def init(self, cond):
"""Signal that a condition change is being initiated"""
self._new_cond = cond
def ack(self):
"""Signal that a condition change has been acked"""
if self._req_cond is not None:
self._ack_cond, self._req_cond = (self._req_cond, None)
def request(self):
"""Signal that a condition change has been requested"""
if self._new_cond is not None:
self._req_cond, self._new_cond = (self._new_cond, None)
def reset(self):
"""Reset a requested condition change back to new"""
if self._req_cond is not None and self._new_cond is None:
self._new_cond, self._req_cond = (self._req_cond, None)
class Idl(object): class Idl(object):
"""Open vSwitch Database Interface Definition Language (OVSDB IDL). """Open vSwitch Database Interface Definition Language (OVSDB IDL).
@@ -132,7 +193,13 @@ class Idl(object):
IDL_S_SERVER_MONITOR_REQUESTED = 2 IDL_S_SERVER_MONITOR_REQUESTED = 2
IDL_S_DATA_MONITOR_REQUESTED = 3 IDL_S_DATA_MONITOR_REQUESTED = 3
IDL_S_DATA_MONITOR_COND_REQUESTED = 4 IDL_S_DATA_MONITOR_COND_REQUESTED = 4
IDL_S_MONITORING = 5 IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED = 5
IDL_S_MONITORING = 6
monitor_map = {
Monitor.monitor: IDL_S_SERVER_MONITOR_REQUESTED,
Monitor.monitor_cond: IDL_S_DATA_MONITOR_COND_REQUESTED,
Monitor.monitor_cond_since: IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED}
def __init__(self, remote, schema_helper, probe_interval=None, def __init__(self, remote, schema_helper, probe_interval=None,
leader_only=True): leader_only=True):
@@ -176,10 +243,12 @@ class Idl(object):
remotes = self._parse_remotes(remote) remotes = self._parse_remotes(remote)
self._session = ovs.jsonrpc.Session.open_multiple(remotes, self._session = ovs.jsonrpc.Session.open_multiple(remotes,
probe_interval=probe_interval) probe_interval=probe_interval)
self._request_id = None
self._monitor_request_id = None self._monitor_request_id = None
self._last_seqno = None self._last_seqno = None
self.change_seqno = 0 self.change_seqno = 0
self.uuid = uuid.uuid1() self.uuid = uuid.uuid1()
self.last_id = str(uuid.UUID(int=0))
# Server monitor. # Server monitor.
self._server_schema_request_id = None self._server_schema_request_id = None
@@ -206,6 +275,9 @@ class Idl(object):
self.txn = None self.txn = None
self._outstanding_txns = {} self._outstanding_txns = {}
self.cond_changed = False
self.cond_seqno = 0
for table in schema.tables.values(): for table in schema.tables.values():
for column in table.columns.values(): for column in table.columns.values():
if not hasattr(column, 'alert'): if not hasattr(column, 'alert'):
@@ -213,8 +285,7 @@ class Idl(object):
table.need_table = False table.need_table = False
table.rows = custom_index.IndexedRows(table) table.rows = custom_index.IndexedRows(table)
table.idl = self table.idl = self
table.condition = [True] table.condition = ConditionState()
table.cond_changed = False
def _parse_remotes(self, remote): def _parse_remotes(self, remote):
# If remote is - # If remote is -
@@ -252,6 +323,38 @@ class Idl(object):
update.""" update."""
self._session.close() self._session.close()
def ack_conditions(self):
"""Mark all requested table conditions as acked"""
for table in self.tables.values():
table.condition.ack()
def sync_conditions(self):
"""Synchronize condition state when the FSM is restarted
If a non-zero last_id is available for the DB, then upon reconnect
the IDL should first request acked conditions to avoid missing updates
about records that were added before the transaction with
txn-id == last_id. If there were requested condition changes in flight
and the IDL client didn't set new conditions, then reset the requested
conditions to new to trigger a follow-up monitor_cond_change request.
"""
ack_all = self.last_id == str(uuid.UUID(int=0))
for table in self.tables.values():
if ack_all:
table.condition.request()
table.condition.ack()
else:
table.condition.reset()
self.cond_changed = True
def restart_fsm(self):
# Resync data DB table conditions to avoid missing updated due to
# conditions that were in flight or changed locally while the
# connection was down.
self.sync_conditions()
self.__send_server_schema_request()
self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
def run(self): def run(self):
"""Processes a batch of messages from the database server. Returns """Processes a batch of messages from the database server. Returns
True if the database as seen through the IDL changed, False if it did True if the database as seen through the IDL changed, False if it did
@@ -286,7 +389,7 @@ class Idl(object):
if seqno != self._last_seqno: if seqno != self._last_seqno:
self._last_seqno = seqno self._last_seqno = seqno
self.__txn_abort_all() self.__txn_abort_all()
self.__send_server_schema_request() self.restart_fsm()
if self.lock_name: if self.lock_name:
self.__send_lock_request() self.__send_lock_request()
break break
@@ -294,8 +397,20 @@ class Idl(object):
msg = self._session.recv() msg = self._session.recv()
if msg is None: if msg is None:
break break
is_response = msg.type in (ovs.jsonrpc.Message.T_REPLY,
ovs.jsonrpc.Message.T_ERROR)
if is_response and self._request_id and self._request_id == msg.id:
self._request_id = None
# process_response follows
if (msg.type == ovs.jsonrpc.Message.T_NOTIFY if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
and msg.method == "update3"
and len(msg.params) == 3):
# Database contents changed.
self.__parse_update(msg.params[2], OVSDB_UPDATE3)
self.last_id = msg.params[1]
elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
and msg.method == "update2" and msg.method == "update2"
and len(msg.params) == 2): and len(msg.params) == 2):
# Database contents changed. # Database contents changed.
@@ -320,11 +435,18 @@ class Idl(object):
try: try:
self.change_seqno += 1 self.change_seqno += 1
self._monitor_request_id = None self._monitor_request_id = None
self.__clear() if (self.state ==
if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED: self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED):
# If 'found' is false, clear table rows for new dump
if not msg.result[0]:
self.__clear()
self.__parse_update(msg.result[2], OVSDB_UPDATE3)
elif self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
self.__clear()
self.__parse_update(msg.result, OVSDB_UPDATE2) self.__parse_update(msg.result, OVSDB_UPDATE2)
else: else:
assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
self.__clear()
self.__parse_update(msg.result, OVSDB_UPDATE) self.__parse_update(msg.result, OVSDB_UPDATE)
self.state = self.IDL_S_MONITORING self.state = self.IDL_S_MONITORING
@@ -398,11 +520,17 @@ class Idl(object):
elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo": elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
# Reply to our echo request. Ignore it. # Reply to our echo request. Ignore it.
pass pass
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
self.state == (
self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED) and
self._monitor_request_id == msg.id):
if msg.error == "unknown method":
self.__send_monitor_request(Monitor.monitor_cond)
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
self._monitor_request_id == msg.id): self._monitor_request_id == msg.id):
if msg.error == "unknown method": if msg.error == "unknown method":
self.__send_monitor_request() self.__send_monitor_request(Monitor.monitor)
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
self._server_schema_request_id is not None and self._server_schema_request_id is not None and
self._server_schema_request_id == msg.id): self._server_schema_request_id == msg.id):
@@ -418,6 +546,13 @@ class Idl(object):
and self.__txn_process_reply(msg)): and self.__txn_process_reply(msg)):
# __txn_process_reply() did everything needed. # __txn_process_reply() did everything needed.
pass pass
elif (msg.type == ovs.jsonrpc.Message.T_REPLY and
self.state == self.IDL_S_MONITORING):
# Mark the last requested conditions as acked and if further
# condition changes were pending, send them now.
self.ack_conditions()
self.send_cond_change()
self.cond_seqno += 1
else: else:
# This can happen if a transaction is destroyed before we # This can happen if a transaction is destroyed before we
# receive the reply, so keep the log level low. # receive the reply, so keep the log level low.
@@ -427,14 +562,36 @@ class Idl(object):
return initial_change_seqno != self.change_seqno return initial_change_seqno != self.change_seqno
def send_cond_change(self): def compose_cond_change(self):
if not self._session.is_connected(): if not self.cond_changed:
return return
change_requests = {}
for table in self.tables.values(): for table in self.tables.values():
if table.cond_changed: # Always use the most recent conditions set by the IDL client when
self.__send_cond_change(table, table.condition) # requesting monitor_cond_change
table.cond_changed = False if table.condition.new is not None:
change_requests[table.name] = [
{"where": table.condition.new}]
table.condition.request()
if not change_requests:
return
self.cond_changed = False
old_uuid = str(self.uuid)
self.uuid = uuid.uuid1()
params = [old_uuid, str(self.uuid), change_requests]
return ovs.jsonrpc.Message.create_request(
"monitor_cond_change", params)
def send_cond_change(self):
if not self._session.is_connected() or self._request_id is not None:
return
msg = self.compose_cond_change()
if msg:
self.send_request(msg)
def cond_change(self, table_name, cond): def cond_change(self, table_name, cond):
"""Sets the condition for 'table_name' to 'cond', which should be a """Sets the condition for 'table_name' to 'cond', which should be a
@@ -450,13 +607,28 @@ class Idl(object):
if cond == []: if cond == []:
cond = [False] cond = [False]
if table.condition != cond:
table.condition = cond # Compare the new condition to the last known condition
table.cond_changed = True if table.condition.latest != cond:
table.condition.init(cond)
self.cond_changed = True
# New condition will be sent out after all already requested ones
# are acked.
if table.condition.new:
any_reqs = any(t.condition.request for t in self.tables.values())
return self.cond_seqno + int(any_reqs) + 1
# Already requested conditions should be up to date at
# self.cond_seqno + 1 while acked conditions are already up to date
return self.cond_seqno + int(bool(table.condition.requested))
def wait(self, poller): def wait(self, poller):
"""Arranges for poller.block() to wake up when self.run() has something """Arranges for poller.block() to wake up when self.run() has something
to do or when activity occurs on a transaction on 'self'.""" to do or when activity occurs on a transaction on 'self'."""
if self.cond_changed:
poller.immediate_wake()
return
self._session.wait(poller) self._session.wait(poller)
self._session.recv_wait(poller) self._session.recv_wait(poller)
@@ -531,14 +703,6 @@ class Idl(object):
to doing nothing to avoid overhead where it is not needed. to doing nothing to avoid overhead where it is not needed.
""" """
def __send_cond_change(self, table, cond):
monitor_cond_change = {table.name: [{"where": cond}]}
old_uuid = str(self.uuid)
self.uuid = uuid.uuid1()
params = [old_uuid, str(self.uuid), monitor_cond_change]
msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
self._session.send(msg)
def __clear(self): def __clear(self):
changed = False changed = False
@@ -547,6 +711,8 @@ class Idl(object):
changed = True changed = True
table.rows = custom_index.IndexedRows(table) table.rows = custom_index.IndexedRows(table)
self.cond_seqno = 0
if changed: if changed:
self.change_seqno += 1 self.change_seqno += 1
@@ -601,11 +767,18 @@ class Idl(object):
self._db_change_aware_request_id = msg.id self._db_change_aware_request_id = msg.id
self._session.send(msg) self._session.send(msg)
def __send_monitor_request(self): def send_request(self, request):
if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED, self._request_id = request.id
self.IDL_S_INITIAL]): if self._session.is_connected():
return self._session.send(request)
def __send_monitor_request(self, max_version=Monitor.monitor_cond_since):
if self.state == self.IDL_S_INITIAL:
self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
method = "monitor_cond" method = "monitor_cond"
elif self.state == self.IDL_S_SERVER_MONITOR_REQUESTED:
self.state = self.monitor_map[Monitor(max_version)]
method = Monitor(max_version).name
else: else:
self.state = self.IDL_S_DATA_MONITOR_REQUESTED self.state = self.IDL_S_DATA_MONITOR_REQUESTED
method = "monitor" method = "monitor"
@@ -619,22 +792,24 @@ class Idl(object):
(column not in self.readonly[table.name])): (column not in self.readonly[table.name])):
columns.append(column) columns.append(column)
monitor_request = {"columns": columns} monitor_request = {"columns": columns}
if method == "monitor_cond" and table.condition != [True]: if method in ("monitor_cond", "monitor_cond_since") and (
monitor_request["where"] = table.condition not ConditionState.is_true(table.condition.acked)):
table.cond_change = False monitor_request["where"] = table.condition.acked
monitor_requests[table.name] = [monitor_request] monitor_requests[table.name] = [monitor_request]
msg = ovs.jsonrpc.Message.create_request( args = [self._db.name, str(self.uuid), monitor_requests]
method, [self._db.name, str(self.uuid), monitor_requests]) if method == "monitor_cond_since":
args.append(str(self.last_id))
msg = ovs.jsonrpc.Message.create_request(method, args)
self._monitor_request_id = msg.id self._monitor_request_id = msg.id
self._session.send(msg) self.send_request(msg)
def __send_server_schema_request(self): def __send_server_schema_request(self):
self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
msg = ovs.jsonrpc.Message.create_request( msg = ovs.jsonrpc.Message.create_request(
"get_schema", [self._server_db_name, str(self.uuid)]) "get_schema", [self._server_db_name, str(self.uuid)])
self._server_schema_request_id = msg.id self._server_schema_request_id = msg.id
self._session.send(msg) self.send_request(msg)
def __send_server_monitor_request(self): def __send_server_monitor_request(self):
self.state = self.IDL_S_SERVER_MONITOR_REQUESTED self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
@@ -654,7 +829,7 @@ class Idl(object):
str(self.server_monitor_uuid), str(self.server_monitor_uuid),
monitor_requests]) monitor_requests])
self._server_monitor_request_id = msg.id self._server_monitor_request_id = msg.id
self._session.send(msg) self.send_request(msg)
def __parse_update(self, update, version, tables=None): def __parse_update(self, update, version, tables=None):
try: try:
@@ -698,7 +873,7 @@ class Idl(object):
self.cooperative_yield() self.cooperative_yield()
if version == OVSDB_UPDATE2: if version in (OVSDB_UPDATE2, OVSDB_UPDATE3):
changes = self.__process_update2(table, uuid, row_update) changes = self.__process_update2(table, uuid, row_update)
if changes: if changes:
notices.append(changes) notices.append(changes)

View File

@@ -2319,7 +2319,7 @@ m4_define([OVSDB_CHECK_CLUSTER_IDL],
# Checks that monitor_cond_since works fine when disconnects happen # Checks that monitor_cond_since works fine when disconnects happen
# with cond_change requests in flight (i.e., IDL is properly updated). # with cond_change requests in flight (i.e., IDL is properly updated).
OVSDB_CHECK_CLUSTER_IDL_C([simple idl, monitor_cond_since, cluster disconnect], OVSDB_CHECK_CLUSTER_IDL([simple idl, monitor_cond_since, cluster disconnect],
3, 3,
[['["idltest", [['["idltest",
{"op": "insert", {"op": "insert",