mirror of
https://github.com/openvswitch/ovs
synced 2025-08-31 22:35:15 +00:00
python: idl: Add monitor_cond_since support.
Add support for monitor_cond_since / update3 to python-ovs to allow more efficient reconnections when connecting to clustered OVSDB servers. Signed-off-by: Terry Wilson <twilson@redhat.com> Acked-by: Dumitru Ceara <dceara@redhat.com> Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
This commit is contained in:
committed by
Ilya Maximets
parent
0d1ffb7756
commit
46d44cf3be
4
NEWS
4
NEWS
@@ -17,6 +17,10 @@ Post-v2.16.0
|
||||
- Python:
|
||||
* For SSL support, the use of the pyOpenSSL library has been replaced
|
||||
with the native 'ssl' module.
|
||||
- OVSDB:
|
||||
* Python library for OVSDB clients now also supports faster
|
||||
resynchronization with a clustered database after a brief disconnection,
|
||||
i.e. 'monitor_cond_since' monitoring method.
|
||||
- ovs-dpctl and 'ovs-appctl dpctl/':
|
||||
* New commands 'cache-get-size' and 'cache-set-size' that allows to
|
||||
get or configure linux kernel datapath cache sizes.
|
||||
|
@@ -13,6 +13,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
import enum
|
||||
import functools
|
||||
import uuid
|
||||
|
||||
@@ -36,6 +37,7 @@ ROW_DELETE = "delete"
|
||||
|
||||
OVSDB_UPDATE = 0
|
||||
OVSDB_UPDATE2 = 1
|
||||
OVSDB_UPDATE3 = 2
|
||||
|
||||
CLUSTERED = "clustered"
|
||||
RELAY = "relay"
|
||||
@@ -75,6 +77,65 @@ class ColumnDefaultDict(dict):
|
||||
return item in self.keys()
|
||||
|
||||
|
||||
class Monitor(enum.IntEnum):
|
||||
monitor = OVSDB_UPDATE
|
||||
monitor_cond = OVSDB_UPDATE2
|
||||
monitor_cond_since = OVSDB_UPDATE3
|
||||
|
||||
|
||||
class ConditionState(object):
|
||||
def __init__(self):
|
||||
self._ack_cond = None
|
||||
self._req_cond = None
|
||||
self._new_cond = [True]
|
||||
|
||||
def __iter__(self):
|
||||
return iter([self._new_cond, self._req_cond, self._ack_cond])
|
||||
|
||||
@property
|
||||
def new(self):
|
||||
"""The latest freshly initialized condition change"""
|
||||
return self._new_cond
|
||||
|
||||
@property
|
||||
def acked(self):
|
||||
"""The last condition change that has been accepted by the server"""
|
||||
return self._ack_cond
|
||||
|
||||
@property
|
||||
def requested(self):
|
||||
"""A condition that's been requested, but not acked by the server"""
|
||||
return self._req_cond
|
||||
|
||||
@property
|
||||
def latest(self):
|
||||
"""The most recent condition change"""
|
||||
return next(cond for cond in self if cond is not None)
|
||||
|
||||
@staticmethod
|
||||
def is_true(condition):
|
||||
return condition == [True]
|
||||
|
||||
def init(self, cond):
|
||||
"""Signal that a condition change is being initiated"""
|
||||
self._new_cond = cond
|
||||
|
||||
def ack(self):
|
||||
"""Signal that a condition change has been acked"""
|
||||
if self._req_cond is not None:
|
||||
self._ack_cond, self._req_cond = (self._req_cond, None)
|
||||
|
||||
def request(self):
|
||||
"""Signal that a condition change has been requested"""
|
||||
if self._new_cond is not None:
|
||||
self._req_cond, self._new_cond = (self._new_cond, None)
|
||||
|
||||
def reset(self):
|
||||
"""Reset a requested condition change back to new"""
|
||||
if self._req_cond is not None and self._new_cond is None:
|
||||
self._new_cond, self._req_cond = (self._req_cond, None)
|
||||
|
||||
|
||||
class Idl(object):
|
||||
"""Open vSwitch Database Interface Definition Language (OVSDB IDL).
|
||||
|
||||
@@ -132,7 +193,13 @@ class Idl(object):
|
||||
IDL_S_SERVER_MONITOR_REQUESTED = 2
|
||||
IDL_S_DATA_MONITOR_REQUESTED = 3
|
||||
IDL_S_DATA_MONITOR_COND_REQUESTED = 4
|
||||
IDL_S_MONITORING = 5
|
||||
IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED = 5
|
||||
IDL_S_MONITORING = 6
|
||||
|
||||
monitor_map = {
|
||||
Monitor.monitor: IDL_S_SERVER_MONITOR_REQUESTED,
|
||||
Monitor.monitor_cond: IDL_S_DATA_MONITOR_COND_REQUESTED,
|
||||
Monitor.monitor_cond_since: IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED}
|
||||
|
||||
def __init__(self, remote, schema_helper, probe_interval=None,
|
||||
leader_only=True):
|
||||
@@ -176,10 +243,12 @@ class Idl(object):
|
||||
remotes = self._parse_remotes(remote)
|
||||
self._session = ovs.jsonrpc.Session.open_multiple(remotes,
|
||||
probe_interval=probe_interval)
|
||||
self._request_id = None
|
||||
self._monitor_request_id = None
|
||||
self._last_seqno = None
|
||||
self.change_seqno = 0
|
||||
self.uuid = uuid.uuid1()
|
||||
self.last_id = str(uuid.UUID(int=0))
|
||||
|
||||
# Server monitor.
|
||||
self._server_schema_request_id = None
|
||||
@@ -206,6 +275,9 @@ class Idl(object):
|
||||
self.txn = None
|
||||
self._outstanding_txns = {}
|
||||
|
||||
self.cond_changed = False
|
||||
self.cond_seqno = 0
|
||||
|
||||
for table in schema.tables.values():
|
||||
for column in table.columns.values():
|
||||
if not hasattr(column, 'alert'):
|
||||
@@ -213,8 +285,7 @@ class Idl(object):
|
||||
table.need_table = False
|
||||
table.rows = custom_index.IndexedRows(table)
|
||||
table.idl = self
|
||||
table.condition = [True]
|
||||
table.cond_changed = False
|
||||
table.condition = ConditionState()
|
||||
|
||||
def _parse_remotes(self, remote):
|
||||
# If remote is -
|
||||
@@ -252,6 +323,38 @@ class Idl(object):
|
||||
update."""
|
||||
self._session.close()
|
||||
|
||||
def ack_conditions(self):
|
||||
"""Mark all requested table conditions as acked"""
|
||||
for table in self.tables.values():
|
||||
table.condition.ack()
|
||||
|
||||
def sync_conditions(self):
|
||||
"""Synchronize condition state when the FSM is restarted
|
||||
|
||||
If a non-zero last_id is available for the DB, then upon reconnect
|
||||
the IDL should first request acked conditions to avoid missing updates
|
||||
about records that were added before the transaction with
|
||||
txn-id == last_id. If there were requested condition changes in flight
|
||||
and the IDL client didn't set new conditions, then reset the requested
|
||||
conditions to new to trigger a follow-up monitor_cond_change request.
|
||||
"""
|
||||
ack_all = self.last_id == str(uuid.UUID(int=0))
|
||||
for table in self.tables.values():
|
||||
if ack_all:
|
||||
table.condition.request()
|
||||
table.condition.ack()
|
||||
else:
|
||||
table.condition.reset()
|
||||
self.cond_changed = True
|
||||
|
||||
def restart_fsm(self):
|
||||
# Resync data DB table conditions to avoid missing updated due to
|
||||
# conditions that were in flight or changed locally while the
|
||||
# connection was down.
|
||||
self.sync_conditions()
|
||||
self.__send_server_schema_request()
|
||||
self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
|
||||
|
||||
def run(self):
|
||||
"""Processes a batch of messages from the database server. Returns
|
||||
True if the database as seen through the IDL changed, False if it did
|
||||
@@ -286,7 +389,7 @@ class Idl(object):
|
||||
if seqno != self._last_seqno:
|
||||
self._last_seqno = seqno
|
||||
self.__txn_abort_all()
|
||||
self.__send_server_schema_request()
|
||||
self.restart_fsm()
|
||||
if self.lock_name:
|
||||
self.__send_lock_request()
|
||||
break
|
||||
@@ -294,8 +397,20 @@ class Idl(object):
|
||||
msg = self._session.recv()
|
||||
if msg is None:
|
||||
break
|
||||
is_response = msg.type in (ovs.jsonrpc.Message.T_REPLY,
|
||||
ovs.jsonrpc.Message.T_ERROR)
|
||||
|
||||
if is_response and self._request_id and self._request_id == msg.id:
|
||||
self._request_id = None
|
||||
# process_response follows
|
||||
|
||||
if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
|
||||
and msg.method == "update3"
|
||||
and len(msg.params) == 3):
|
||||
# Database contents changed.
|
||||
self.__parse_update(msg.params[2], OVSDB_UPDATE3)
|
||||
self.last_id = msg.params[1]
|
||||
elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
|
||||
and msg.method == "update2"
|
||||
and len(msg.params) == 2):
|
||||
# Database contents changed.
|
||||
@@ -320,11 +435,18 @@ class Idl(object):
|
||||
try:
|
||||
self.change_seqno += 1
|
||||
self._monitor_request_id = None
|
||||
self.__clear()
|
||||
if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
|
||||
if (self.state ==
|
||||
self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED):
|
||||
# If 'found' is false, clear table rows for new dump
|
||||
if not msg.result[0]:
|
||||
self.__clear()
|
||||
self.__parse_update(msg.result[2], OVSDB_UPDATE3)
|
||||
elif self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
|
||||
self.__clear()
|
||||
self.__parse_update(msg.result, OVSDB_UPDATE2)
|
||||
else:
|
||||
assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
|
||||
self.__clear()
|
||||
self.__parse_update(msg.result, OVSDB_UPDATE)
|
||||
self.state = self.IDL_S_MONITORING
|
||||
|
||||
@@ -398,11 +520,17 @@ class Idl(object):
|
||||
elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
|
||||
# Reply to our echo request. Ignore it.
|
||||
pass
|
||||
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
|
||||
self.state == (
|
||||
self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED) and
|
||||
self._monitor_request_id == msg.id):
|
||||
if msg.error == "unknown method":
|
||||
self.__send_monitor_request(Monitor.monitor_cond)
|
||||
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
|
||||
self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
|
||||
self._monitor_request_id == msg.id):
|
||||
if msg.error == "unknown method":
|
||||
self.__send_monitor_request()
|
||||
self.__send_monitor_request(Monitor.monitor)
|
||||
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
|
||||
self._server_schema_request_id is not None and
|
||||
self._server_schema_request_id == msg.id):
|
||||
@@ -418,6 +546,13 @@ class Idl(object):
|
||||
and self.__txn_process_reply(msg)):
|
||||
# __txn_process_reply() did everything needed.
|
||||
pass
|
||||
elif (msg.type == ovs.jsonrpc.Message.T_REPLY and
|
||||
self.state == self.IDL_S_MONITORING):
|
||||
# Mark the last requested conditions as acked and if further
|
||||
# condition changes were pending, send them now.
|
||||
self.ack_conditions()
|
||||
self.send_cond_change()
|
||||
self.cond_seqno += 1
|
||||
else:
|
||||
# This can happen if a transaction is destroyed before we
|
||||
# receive the reply, so keep the log level low.
|
||||
@@ -427,14 +562,36 @@ class Idl(object):
|
||||
|
||||
return initial_change_seqno != self.change_seqno
|
||||
|
||||
def send_cond_change(self):
|
||||
if not self._session.is_connected():
|
||||
def compose_cond_change(self):
|
||||
if not self.cond_changed:
|
||||
return
|
||||
|
||||
change_requests = {}
|
||||
for table in self.tables.values():
|
||||
if table.cond_changed:
|
||||
self.__send_cond_change(table, table.condition)
|
||||
table.cond_changed = False
|
||||
# Always use the most recent conditions set by the IDL client when
|
||||
# requesting monitor_cond_change
|
||||
if table.condition.new is not None:
|
||||
change_requests[table.name] = [
|
||||
{"where": table.condition.new}]
|
||||
table.condition.request()
|
||||
|
||||
if not change_requests:
|
||||
return
|
||||
|
||||
self.cond_changed = False
|
||||
old_uuid = str(self.uuid)
|
||||
self.uuid = uuid.uuid1()
|
||||
params = [old_uuid, str(self.uuid), change_requests]
|
||||
return ovs.jsonrpc.Message.create_request(
|
||||
"monitor_cond_change", params)
|
||||
|
||||
def send_cond_change(self):
|
||||
if not self._session.is_connected() or self._request_id is not None:
|
||||
return
|
||||
|
||||
msg = self.compose_cond_change()
|
||||
if msg:
|
||||
self.send_request(msg)
|
||||
|
||||
def cond_change(self, table_name, cond):
|
||||
"""Sets the condition for 'table_name' to 'cond', which should be a
|
||||
@@ -450,13 +607,28 @@ class Idl(object):
|
||||
|
||||
if cond == []:
|
||||
cond = [False]
|
||||
if table.condition != cond:
|
||||
table.condition = cond
|
||||
table.cond_changed = True
|
||||
|
||||
# Compare the new condition to the last known condition
|
||||
if table.condition.latest != cond:
|
||||
table.condition.init(cond)
|
||||
self.cond_changed = True
|
||||
|
||||
# New condition will be sent out after all already requested ones
|
||||
# are acked.
|
||||
if table.condition.new:
|
||||
any_reqs = any(t.condition.request for t in self.tables.values())
|
||||
return self.cond_seqno + int(any_reqs) + 1
|
||||
|
||||
# Already requested conditions should be up to date at
|
||||
# self.cond_seqno + 1 while acked conditions are already up to date
|
||||
return self.cond_seqno + int(bool(table.condition.requested))
|
||||
|
||||
def wait(self, poller):
|
||||
"""Arranges for poller.block() to wake up when self.run() has something
|
||||
to do or when activity occurs on a transaction on 'self'."""
|
||||
if self.cond_changed:
|
||||
poller.immediate_wake()
|
||||
return
|
||||
self._session.wait(poller)
|
||||
self._session.recv_wait(poller)
|
||||
|
||||
@@ -531,14 +703,6 @@ class Idl(object):
|
||||
to doing nothing to avoid overhead where it is not needed.
|
||||
"""
|
||||
|
||||
def __send_cond_change(self, table, cond):
|
||||
monitor_cond_change = {table.name: [{"where": cond}]}
|
||||
old_uuid = str(self.uuid)
|
||||
self.uuid = uuid.uuid1()
|
||||
params = [old_uuid, str(self.uuid), monitor_cond_change]
|
||||
msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
|
||||
self._session.send(msg)
|
||||
|
||||
def __clear(self):
|
||||
changed = False
|
||||
|
||||
@@ -547,6 +711,8 @@ class Idl(object):
|
||||
changed = True
|
||||
table.rows = custom_index.IndexedRows(table)
|
||||
|
||||
self.cond_seqno = 0
|
||||
|
||||
if changed:
|
||||
self.change_seqno += 1
|
||||
|
||||
@@ -601,11 +767,18 @@ class Idl(object):
|
||||
self._db_change_aware_request_id = msg.id
|
||||
self._session.send(msg)
|
||||
|
||||
def __send_monitor_request(self):
|
||||
if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
|
||||
self.IDL_S_INITIAL]):
|
||||
def send_request(self, request):
|
||||
self._request_id = request.id
|
||||
if self._session.is_connected():
|
||||
return self._session.send(request)
|
||||
|
||||
def __send_monitor_request(self, max_version=Monitor.monitor_cond_since):
|
||||
if self.state == self.IDL_S_INITIAL:
|
||||
self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
|
||||
method = "monitor_cond"
|
||||
elif self.state == self.IDL_S_SERVER_MONITOR_REQUESTED:
|
||||
self.state = self.monitor_map[Monitor(max_version)]
|
||||
method = Monitor(max_version).name
|
||||
else:
|
||||
self.state = self.IDL_S_DATA_MONITOR_REQUESTED
|
||||
method = "monitor"
|
||||
@@ -619,22 +792,24 @@ class Idl(object):
|
||||
(column not in self.readonly[table.name])):
|
||||
columns.append(column)
|
||||
monitor_request = {"columns": columns}
|
||||
if method == "monitor_cond" and table.condition != [True]:
|
||||
monitor_request["where"] = table.condition
|
||||
table.cond_change = False
|
||||
if method in ("monitor_cond", "monitor_cond_since") and (
|
||||
not ConditionState.is_true(table.condition.acked)):
|
||||
monitor_request["where"] = table.condition.acked
|
||||
monitor_requests[table.name] = [monitor_request]
|
||||
|
||||
msg = ovs.jsonrpc.Message.create_request(
|
||||
method, [self._db.name, str(self.uuid), monitor_requests])
|
||||
args = [self._db.name, str(self.uuid), monitor_requests]
|
||||
if method == "monitor_cond_since":
|
||||
args.append(str(self.last_id))
|
||||
msg = ovs.jsonrpc.Message.create_request(method, args)
|
||||
self._monitor_request_id = msg.id
|
||||
self._session.send(msg)
|
||||
self.send_request(msg)
|
||||
|
||||
def __send_server_schema_request(self):
|
||||
self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
|
||||
msg = ovs.jsonrpc.Message.create_request(
|
||||
"get_schema", [self._server_db_name, str(self.uuid)])
|
||||
self._server_schema_request_id = msg.id
|
||||
self._session.send(msg)
|
||||
self.send_request(msg)
|
||||
|
||||
def __send_server_monitor_request(self):
|
||||
self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
|
||||
@@ -654,7 +829,7 @@ class Idl(object):
|
||||
str(self.server_monitor_uuid),
|
||||
monitor_requests])
|
||||
self._server_monitor_request_id = msg.id
|
||||
self._session.send(msg)
|
||||
self.send_request(msg)
|
||||
|
||||
def __parse_update(self, update, version, tables=None):
|
||||
try:
|
||||
@@ -698,7 +873,7 @@ class Idl(object):
|
||||
|
||||
self.cooperative_yield()
|
||||
|
||||
if version == OVSDB_UPDATE2:
|
||||
if version in (OVSDB_UPDATE2, OVSDB_UPDATE3):
|
||||
changes = self.__process_update2(table, uuid, row_update)
|
||||
if changes:
|
||||
notices.append(changes)
|
||||
|
@@ -2319,7 +2319,7 @@ m4_define([OVSDB_CHECK_CLUSTER_IDL],
|
||||
|
||||
# Checks that monitor_cond_since works fine when disconnects happen
|
||||
# with cond_change requests in flight (i.e., IDL is properly updated).
|
||||
OVSDB_CHECK_CLUSTER_IDL_C([simple idl, monitor_cond_since, cluster disconnect],
|
||||
OVSDB_CHECK_CLUSTER_IDL([simple idl, monitor_cond_since, cluster disconnect],
|
||||
3,
|
||||
[['["idltest",
|
||||
{"op": "insert",
|
||||
|
Reference in New Issue
Block a user