mirror of
https://github.com/openvswitch/ovs
synced 2025-09-04 00:05:15 +00:00
python: idl: Add monitor_cond_since support.
Add support for monitor_cond_since / update3 to python-ovs to allow more efficient reconnections when connecting to clustered OVSDB servers. Signed-off-by: Terry Wilson <twilson@redhat.com> Acked-by: Dumitru Ceara <dceara@redhat.com> Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
This commit is contained in:
committed by
Ilya Maximets
parent
0d1ffb7756
commit
46d44cf3be
4
NEWS
4
NEWS
@@ -17,6 +17,10 @@ Post-v2.16.0
|
|||||||
- Python:
|
- Python:
|
||||||
* For SSL support, the use of the pyOpenSSL library has been replaced
|
* For SSL support, the use of the pyOpenSSL library has been replaced
|
||||||
with the native 'ssl' module.
|
with the native 'ssl' module.
|
||||||
|
- OVSDB:
|
||||||
|
* Python library for OVSDB clients now also supports faster
|
||||||
|
resynchronization with a clustered database after a brief disconnection,
|
||||||
|
i.e. 'monitor_cond_since' monitoring method.
|
||||||
- ovs-dpctl and 'ovs-appctl dpctl/':
|
- ovs-dpctl and 'ovs-appctl dpctl/':
|
||||||
* New commands 'cache-get-size' and 'cache-set-size' that allows to
|
* New commands 'cache-get-size' and 'cache-set-size' that allows to
|
||||||
get or configure linux kernel datapath cache sizes.
|
get or configure linux kernel datapath cache sizes.
|
||||||
|
@@ -13,6 +13,7 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import collections
|
import collections
|
||||||
|
import enum
|
||||||
import functools
|
import functools
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
@@ -36,6 +37,7 @@ ROW_DELETE = "delete"
|
|||||||
|
|
||||||
OVSDB_UPDATE = 0
|
OVSDB_UPDATE = 0
|
||||||
OVSDB_UPDATE2 = 1
|
OVSDB_UPDATE2 = 1
|
||||||
|
OVSDB_UPDATE3 = 2
|
||||||
|
|
||||||
CLUSTERED = "clustered"
|
CLUSTERED = "clustered"
|
||||||
RELAY = "relay"
|
RELAY = "relay"
|
||||||
@@ -75,6 +77,65 @@ class ColumnDefaultDict(dict):
|
|||||||
return item in self.keys()
|
return item in self.keys()
|
||||||
|
|
||||||
|
|
||||||
|
class Monitor(enum.IntEnum):
|
||||||
|
monitor = OVSDB_UPDATE
|
||||||
|
monitor_cond = OVSDB_UPDATE2
|
||||||
|
monitor_cond_since = OVSDB_UPDATE3
|
||||||
|
|
||||||
|
|
||||||
|
class ConditionState(object):
|
||||||
|
def __init__(self):
|
||||||
|
self._ack_cond = None
|
||||||
|
self._req_cond = None
|
||||||
|
self._new_cond = [True]
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return iter([self._new_cond, self._req_cond, self._ack_cond])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def new(self):
|
||||||
|
"""The latest freshly initialized condition change"""
|
||||||
|
return self._new_cond
|
||||||
|
|
||||||
|
@property
|
||||||
|
def acked(self):
|
||||||
|
"""The last condition change that has been accepted by the server"""
|
||||||
|
return self._ack_cond
|
||||||
|
|
||||||
|
@property
|
||||||
|
def requested(self):
|
||||||
|
"""A condition that's been requested, but not acked by the server"""
|
||||||
|
return self._req_cond
|
||||||
|
|
||||||
|
@property
|
||||||
|
def latest(self):
|
||||||
|
"""The most recent condition change"""
|
||||||
|
return next(cond for cond in self if cond is not None)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def is_true(condition):
|
||||||
|
return condition == [True]
|
||||||
|
|
||||||
|
def init(self, cond):
|
||||||
|
"""Signal that a condition change is being initiated"""
|
||||||
|
self._new_cond = cond
|
||||||
|
|
||||||
|
def ack(self):
|
||||||
|
"""Signal that a condition change has been acked"""
|
||||||
|
if self._req_cond is not None:
|
||||||
|
self._ack_cond, self._req_cond = (self._req_cond, None)
|
||||||
|
|
||||||
|
def request(self):
|
||||||
|
"""Signal that a condition change has been requested"""
|
||||||
|
if self._new_cond is not None:
|
||||||
|
self._req_cond, self._new_cond = (self._new_cond, None)
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
"""Reset a requested condition change back to new"""
|
||||||
|
if self._req_cond is not None and self._new_cond is None:
|
||||||
|
self._new_cond, self._req_cond = (self._req_cond, None)
|
||||||
|
|
||||||
|
|
||||||
class Idl(object):
|
class Idl(object):
|
||||||
"""Open vSwitch Database Interface Definition Language (OVSDB IDL).
|
"""Open vSwitch Database Interface Definition Language (OVSDB IDL).
|
||||||
|
|
||||||
@@ -132,7 +193,13 @@ class Idl(object):
|
|||||||
IDL_S_SERVER_MONITOR_REQUESTED = 2
|
IDL_S_SERVER_MONITOR_REQUESTED = 2
|
||||||
IDL_S_DATA_MONITOR_REQUESTED = 3
|
IDL_S_DATA_MONITOR_REQUESTED = 3
|
||||||
IDL_S_DATA_MONITOR_COND_REQUESTED = 4
|
IDL_S_DATA_MONITOR_COND_REQUESTED = 4
|
||||||
IDL_S_MONITORING = 5
|
IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED = 5
|
||||||
|
IDL_S_MONITORING = 6
|
||||||
|
|
||||||
|
monitor_map = {
|
||||||
|
Monitor.monitor: IDL_S_SERVER_MONITOR_REQUESTED,
|
||||||
|
Monitor.monitor_cond: IDL_S_DATA_MONITOR_COND_REQUESTED,
|
||||||
|
Monitor.monitor_cond_since: IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED}
|
||||||
|
|
||||||
def __init__(self, remote, schema_helper, probe_interval=None,
|
def __init__(self, remote, schema_helper, probe_interval=None,
|
||||||
leader_only=True):
|
leader_only=True):
|
||||||
@@ -176,10 +243,12 @@ class Idl(object):
|
|||||||
remotes = self._parse_remotes(remote)
|
remotes = self._parse_remotes(remote)
|
||||||
self._session = ovs.jsonrpc.Session.open_multiple(remotes,
|
self._session = ovs.jsonrpc.Session.open_multiple(remotes,
|
||||||
probe_interval=probe_interval)
|
probe_interval=probe_interval)
|
||||||
|
self._request_id = None
|
||||||
self._monitor_request_id = None
|
self._monitor_request_id = None
|
||||||
self._last_seqno = None
|
self._last_seqno = None
|
||||||
self.change_seqno = 0
|
self.change_seqno = 0
|
||||||
self.uuid = uuid.uuid1()
|
self.uuid = uuid.uuid1()
|
||||||
|
self.last_id = str(uuid.UUID(int=0))
|
||||||
|
|
||||||
# Server monitor.
|
# Server monitor.
|
||||||
self._server_schema_request_id = None
|
self._server_schema_request_id = None
|
||||||
@@ -206,6 +275,9 @@ class Idl(object):
|
|||||||
self.txn = None
|
self.txn = None
|
||||||
self._outstanding_txns = {}
|
self._outstanding_txns = {}
|
||||||
|
|
||||||
|
self.cond_changed = False
|
||||||
|
self.cond_seqno = 0
|
||||||
|
|
||||||
for table in schema.tables.values():
|
for table in schema.tables.values():
|
||||||
for column in table.columns.values():
|
for column in table.columns.values():
|
||||||
if not hasattr(column, 'alert'):
|
if not hasattr(column, 'alert'):
|
||||||
@@ -213,8 +285,7 @@ class Idl(object):
|
|||||||
table.need_table = False
|
table.need_table = False
|
||||||
table.rows = custom_index.IndexedRows(table)
|
table.rows = custom_index.IndexedRows(table)
|
||||||
table.idl = self
|
table.idl = self
|
||||||
table.condition = [True]
|
table.condition = ConditionState()
|
||||||
table.cond_changed = False
|
|
||||||
|
|
||||||
def _parse_remotes(self, remote):
|
def _parse_remotes(self, remote):
|
||||||
# If remote is -
|
# If remote is -
|
||||||
@@ -252,6 +323,38 @@ class Idl(object):
|
|||||||
update."""
|
update."""
|
||||||
self._session.close()
|
self._session.close()
|
||||||
|
|
||||||
|
def ack_conditions(self):
|
||||||
|
"""Mark all requested table conditions as acked"""
|
||||||
|
for table in self.tables.values():
|
||||||
|
table.condition.ack()
|
||||||
|
|
||||||
|
def sync_conditions(self):
|
||||||
|
"""Synchronize condition state when the FSM is restarted
|
||||||
|
|
||||||
|
If a non-zero last_id is available for the DB, then upon reconnect
|
||||||
|
the IDL should first request acked conditions to avoid missing updates
|
||||||
|
about records that were added before the transaction with
|
||||||
|
txn-id == last_id. If there were requested condition changes in flight
|
||||||
|
and the IDL client didn't set new conditions, then reset the requested
|
||||||
|
conditions to new to trigger a follow-up monitor_cond_change request.
|
||||||
|
"""
|
||||||
|
ack_all = self.last_id == str(uuid.UUID(int=0))
|
||||||
|
for table in self.tables.values():
|
||||||
|
if ack_all:
|
||||||
|
table.condition.request()
|
||||||
|
table.condition.ack()
|
||||||
|
else:
|
||||||
|
table.condition.reset()
|
||||||
|
self.cond_changed = True
|
||||||
|
|
||||||
|
def restart_fsm(self):
|
||||||
|
# Resync data DB table conditions to avoid missing updated due to
|
||||||
|
# conditions that were in flight or changed locally while the
|
||||||
|
# connection was down.
|
||||||
|
self.sync_conditions()
|
||||||
|
self.__send_server_schema_request()
|
||||||
|
self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""Processes a batch of messages from the database server. Returns
|
"""Processes a batch of messages from the database server. Returns
|
||||||
True if the database as seen through the IDL changed, False if it did
|
True if the database as seen through the IDL changed, False if it did
|
||||||
@@ -286,7 +389,7 @@ class Idl(object):
|
|||||||
if seqno != self._last_seqno:
|
if seqno != self._last_seqno:
|
||||||
self._last_seqno = seqno
|
self._last_seqno = seqno
|
||||||
self.__txn_abort_all()
|
self.__txn_abort_all()
|
||||||
self.__send_server_schema_request()
|
self.restart_fsm()
|
||||||
if self.lock_name:
|
if self.lock_name:
|
||||||
self.__send_lock_request()
|
self.__send_lock_request()
|
||||||
break
|
break
|
||||||
@@ -294,8 +397,20 @@ class Idl(object):
|
|||||||
msg = self._session.recv()
|
msg = self._session.recv()
|
||||||
if msg is None:
|
if msg is None:
|
||||||
break
|
break
|
||||||
|
is_response = msg.type in (ovs.jsonrpc.Message.T_REPLY,
|
||||||
|
ovs.jsonrpc.Message.T_ERROR)
|
||||||
|
|
||||||
|
if is_response and self._request_id and self._request_id == msg.id:
|
||||||
|
self._request_id = None
|
||||||
|
# process_response follows
|
||||||
|
|
||||||
if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
|
if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
|
||||||
|
and msg.method == "update3"
|
||||||
|
and len(msg.params) == 3):
|
||||||
|
# Database contents changed.
|
||||||
|
self.__parse_update(msg.params[2], OVSDB_UPDATE3)
|
||||||
|
self.last_id = msg.params[1]
|
||||||
|
elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
|
||||||
and msg.method == "update2"
|
and msg.method == "update2"
|
||||||
and len(msg.params) == 2):
|
and len(msg.params) == 2):
|
||||||
# Database contents changed.
|
# Database contents changed.
|
||||||
@@ -320,11 +435,18 @@ class Idl(object):
|
|||||||
try:
|
try:
|
||||||
self.change_seqno += 1
|
self.change_seqno += 1
|
||||||
self._monitor_request_id = None
|
self._monitor_request_id = None
|
||||||
self.__clear()
|
if (self.state ==
|
||||||
if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
|
self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED):
|
||||||
|
# If 'found' is false, clear table rows for new dump
|
||||||
|
if not msg.result[0]:
|
||||||
|
self.__clear()
|
||||||
|
self.__parse_update(msg.result[2], OVSDB_UPDATE3)
|
||||||
|
elif self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
|
||||||
|
self.__clear()
|
||||||
self.__parse_update(msg.result, OVSDB_UPDATE2)
|
self.__parse_update(msg.result, OVSDB_UPDATE2)
|
||||||
else:
|
else:
|
||||||
assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
|
assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
|
||||||
|
self.__clear()
|
||||||
self.__parse_update(msg.result, OVSDB_UPDATE)
|
self.__parse_update(msg.result, OVSDB_UPDATE)
|
||||||
self.state = self.IDL_S_MONITORING
|
self.state = self.IDL_S_MONITORING
|
||||||
|
|
||||||
@@ -398,11 +520,17 @@ class Idl(object):
|
|||||||
elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
|
elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
|
||||||
# Reply to our echo request. Ignore it.
|
# Reply to our echo request. Ignore it.
|
||||||
pass
|
pass
|
||||||
|
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
|
||||||
|
self.state == (
|
||||||
|
self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED) and
|
||||||
|
self._monitor_request_id == msg.id):
|
||||||
|
if msg.error == "unknown method":
|
||||||
|
self.__send_monitor_request(Monitor.monitor_cond)
|
||||||
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
|
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
|
||||||
self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
|
self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
|
||||||
self._monitor_request_id == msg.id):
|
self._monitor_request_id == msg.id):
|
||||||
if msg.error == "unknown method":
|
if msg.error == "unknown method":
|
||||||
self.__send_monitor_request()
|
self.__send_monitor_request(Monitor.monitor)
|
||||||
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
|
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
|
||||||
self._server_schema_request_id is not None and
|
self._server_schema_request_id is not None and
|
||||||
self._server_schema_request_id == msg.id):
|
self._server_schema_request_id == msg.id):
|
||||||
@@ -418,6 +546,13 @@ class Idl(object):
|
|||||||
and self.__txn_process_reply(msg)):
|
and self.__txn_process_reply(msg)):
|
||||||
# __txn_process_reply() did everything needed.
|
# __txn_process_reply() did everything needed.
|
||||||
pass
|
pass
|
||||||
|
elif (msg.type == ovs.jsonrpc.Message.T_REPLY and
|
||||||
|
self.state == self.IDL_S_MONITORING):
|
||||||
|
# Mark the last requested conditions as acked and if further
|
||||||
|
# condition changes were pending, send them now.
|
||||||
|
self.ack_conditions()
|
||||||
|
self.send_cond_change()
|
||||||
|
self.cond_seqno += 1
|
||||||
else:
|
else:
|
||||||
# This can happen if a transaction is destroyed before we
|
# This can happen if a transaction is destroyed before we
|
||||||
# receive the reply, so keep the log level low.
|
# receive the reply, so keep the log level low.
|
||||||
@@ -427,14 +562,36 @@ class Idl(object):
|
|||||||
|
|
||||||
return initial_change_seqno != self.change_seqno
|
return initial_change_seqno != self.change_seqno
|
||||||
|
|
||||||
def send_cond_change(self):
|
def compose_cond_change(self):
|
||||||
if not self._session.is_connected():
|
if not self.cond_changed:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
change_requests = {}
|
||||||
for table in self.tables.values():
|
for table in self.tables.values():
|
||||||
if table.cond_changed:
|
# Always use the most recent conditions set by the IDL client when
|
||||||
self.__send_cond_change(table, table.condition)
|
# requesting monitor_cond_change
|
||||||
table.cond_changed = False
|
if table.condition.new is not None:
|
||||||
|
change_requests[table.name] = [
|
||||||
|
{"where": table.condition.new}]
|
||||||
|
table.condition.request()
|
||||||
|
|
||||||
|
if not change_requests:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.cond_changed = False
|
||||||
|
old_uuid = str(self.uuid)
|
||||||
|
self.uuid = uuid.uuid1()
|
||||||
|
params = [old_uuid, str(self.uuid), change_requests]
|
||||||
|
return ovs.jsonrpc.Message.create_request(
|
||||||
|
"monitor_cond_change", params)
|
||||||
|
|
||||||
|
def send_cond_change(self):
|
||||||
|
if not self._session.is_connected() or self._request_id is not None:
|
||||||
|
return
|
||||||
|
|
||||||
|
msg = self.compose_cond_change()
|
||||||
|
if msg:
|
||||||
|
self.send_request(msg)
|
||||||
|
|
||||||
def cond_change(self, table_name, cond):
|
def cond_change(self, table_name, cond):
|
||||||
"""Sets the condition for 'table_name' to 'cond', which should be a
|
"""Sets the condition for 'table_name' to 'cond', which should be a
|
||||||
@@ -450,13 +607,28 @@ class Idl(object):
|
|||||||
|
|
||||||
if cond == []:
|
if cond == []:
|
||||||
cond = [False]
|
cond = [False]
|
||||||
if table.condition != cond:
|
|
||||||
table.condition = cond
|
# Compare the new condition to the last known condition
|
||||||
table.cond_changed = True
|
if table.condition.latest != cond:
|
||||||
|
table.condition.init(cond)
|
||||||
|
self.cond_changed = True
|
||||||
|
|
||||||
|
# New condition will be sent out after all already requested ones
|
||||||
|
# are acked.
|
||||||
|
if table.condition.new:
|
||||||
|
any_reqs = any(t.condition.request for t in self.tables.values())
|
||||||
|
return self.cond_seqno + int(any_reqs) + 1
|
||||||
|
|
||||||
|
# Already requested conditions should be up to date at
|
||||||
|
# self.cond_seqno + 1 while acked conditions are already up to date
|
||||||
|
return self.cond_seqno + int(bool(table.condition.requested))
|
||||||
|
|
||||||
def wait(self, poller):
|
def wait(self, poller):
|
||||||
"""Arranges for poller.block() to wake up when self.run() has something
|
"""Arranges for poller.block() to wake up when self.run() has something
|
||||||
to do or when activity occurs on a transaction on 'self'."""
|
to do or when activity occurs on a transaction on 'self'."""
|
||||||
|
if self.cond_changed:
|
||||||
|
poller.immediate_wake()
|
||||||
|
return
|
||||||
self._session.wait(poller)
|
self._session.wait(poller)
|
||||||
self._session.recv_wait(poller)
|
self._session.recv_wait(poller)
|
||||||
|
|
||||||
@@ -531,14 +703,6 @@ class Idl(object):
|
|||||||
to doing nothing to avoid overhead where it is not needed.
|
to doing nothing to avoid overhead where it is not needed.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __send_cond_change(self, table, cond):
|
|
||||||
monitor_cond_change = {table.name: [{"where": cond}]}
|
|
||||||
old_uuid = str(self.uuid)
|
|
||||||
self.uuid = uuid.uuid1()
|
|
||||||
params = [old_uuid, str(self.uuid), monitor_cond_change]
|
|
||||||
msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
|
|
||||||
self._session.send(msg)
|
|
||||||
|
|
||||||
def __clear(self):
|
def __clear(self):
|
||||||
changed = False
|
changed = False
|
||||||
|
|
||||||
@@ -547,6 +711,8 @@ class Idl(object):
|
|||||||
changed = True
|
changed = True
|
||||||
table.rows = custom_index.IndexedRows(table)
|
table.rows = custom_index.IndexedRows(table)
|
||||||
|
|
||||||
|
self.cond_seqno = 0
|
||||||
|
|
||||||
if changed:
|
if changed:
|
||||||
self.change_seqno += 1
|
self.change_seqno += 1
|
||||||
|
|
||||||
@@ -601,11 +767,18 @@ class Idl(object):
|
|||||||
self._db_change_aware_request_id = msg.id
|
self._db_change_aware_request_id = msg.id
|
||||||
self._session.send(msg)
|
self._session.send(msg)
|
||||||
|
|
||||||
def __send_monitor_request(self):
|
def send_request(self, request):
|
||||||
if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
|
self._request_id = request.id
|
||||||
self.IDL_S_INITIAL]):
|
if self._session.is_connected():
|
||||||
|
return self._session.send(request)
|
||||||
|
|
||||||
|
def __send_monitor_request(self, max_version=Monitor.monitor_cond_since):
|
||||||
|
if self.state == self.IDL_S_INITIAL:
|
||||||
self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
|
self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
|
||||||
method = "monitor_cond"
|
method = "monitor_cond"
|
||||||
|
elif self.state == self.IDL_S_SERVER_MONITOR_REQUESTED:
|
||||||
|
self.state = self.monitor_map[Monitor(max_version)]
|
||||||
|
method = Monitor(max_version).name
|
||||||
else:
|
else:
|
||||||
self.state = self.IDL_S_DATA_MONITOR_REQUESTED
|
self.state = self.IDL_S_DATA_MONITOR_REQUESTED
|
||||||
method = "monitor"
|
method = "monitor"
|
||||||
@@ -619,22 +792,24 @@ class Idl(object):
|
|||||||
(column not in self.readonly[table.name])):
|
(column not in self.readonly[table.name])):
|
||||||
columns.append(column)
|
columns.append(column)
|
||||||
monitor_request = {"columns": columns}
|
monitor_request = {"columns": columns}
|
||||||
if method == "monitor_cond" and table.condition != [True]:
|
if method in ("monitor_cond", "monitor_cond_since") and (
|
||||||
monitor_request["where"] = table.condition
|
not ConditionState.is_true(table.condition.acked)):
|
||||||
table.cond_change = False
|
monitor_request["where"] = table.condition.acked
|
||||||
monitor_requests[table.name] = [monitor_request]
|
monitor_requests[table.name] = [monitor_request]
|
||||||
|
|
||||||
msg = ovs.jsonrpc.Message.create_request(
|
args = [self._db.name, str(self.uuid), monitor_requests]
|
||||||
method, [self._db.name, str(self.uuid), monitor_requests])
|
if method == "monitor_cond_since":
|
||||||
|
args.append(str(self.last_id))
|
||||||
|
msg = ovs.jsonrpc.Message.create_request(method, args)
|
||||||
self._monitor_request_id = msg.id
|
self._monitor_request_id = msg.id
|
||||||
self._session.send(msg)
|
self.send_request(msg)
|
||||||
|
|
||||||
def __send_server_schema_request(self):
|
def __send_server_schema_request(self):
|
||||||
self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
|
self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
|
||||||
msg = ovs.jsonrpc.Message.create_request(
|
msg = ovs.jsonrpc.Message.create_request(
|
||||||
"get_schema", [self._server_db_name, str(self.uuid)])
|
"get_schema", [self._server_db_name, str(self.uuid)])
|
||||||
self._server_schema_request_id = msg.id
|
self._server_schema_request_id = msg.id
|
||||||
self._session.send(msg)
|
self.send_request(msg)
|
||||||
|
|
||||||
def __send_server_monitor_request(self):
|
def __send_server_monitor_request(self):
|
||||||
self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
|
self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
|
||||||
@@ -654,7 +829,7 @@ class Idl(object):
|
|||||||
str(self.server_monitor_uuid),
|
str(self.server_monitor_uuid),
|
||||||
monitor_requests])
|
monitor_requests])
|
||||||
self._server_monitor_request_id = msg.id
|
self._server_monitor_request_id = msg.id
|
||||||
self._session.send(msg)
|
self.send_request(msg)
|
||||||
|
|
||||||
def __parse_update(self, update, version, tables=None):
|
def __parse_update(self, update, version, tables=None):
|
||||||
try:
|
try:
|
||||||
@@ -698,7 +873,7 @@ class Idl(object):
|
|||||||
|
|
||||||
self.cooperative_yield()
|
self.cooperative_yield()
|
||||||
|
|
||||||
if version == OVSDB_UPDATE2:
|
if version in (OVSDB_UPDATE2, OVSDB_UPDATE3):
|
||||||
changes = self.__process_update2(table, uuid, row_update)
|
changes = self.__process_update2(table, uuid, row_update)
|
||||||
if changes:
|
if changes:
|
||||||
notices.append(changes)
|
notices.append(changes)
|
||||||
|
@@ -2319,7 +2319,7 @@ m4_define([OVSDB_CHECK_CLUSTER_IDL],
|
|||||||
|
|
||||||
# Checks that monitor_cond_since works fine when disconnects happen
|
# Checks that monitor_cond_since works fine when disconnects happen
|
||||||
# with cond_change requests in flight (i.e., IDL is properly updated).
|
# with cond_change requests in flight (i.e., IDL is properly updated).
|
||||||
OVSDB_CHECK_CLUSTER_IDL_C([simple idl, monitor_cond_since, cluster disconnect],
|
OVSDB_CHECK_CLUSTER_IDL([simple idl, monitor_cond_since, cluster disconnect],
|
||||||
3,
|
3,
|
||||||
[['["idltest",
|
[['["idltest",
|
||||||
{"op": "insert",
|
{"op": "insert",
|
||||||
|
Reference in New Issue
Block a user