mirror of
https://github.com/openvswitch/ovs
synced 2025-09-04 00:05:15 +00:00
python: move Python idl to work with monitor_cond
Python idl works now with "monitor_cond" method. Add test for backward compatibility with old "monitor" method. Signed-off-by: Liran Schour <lirans@il.ibm.com> Signed-off-by: Ben Pfaff <blp@ovn.org>
This commit is contained in:
@@ -162,7 +162,7 @@ class Atom(object):
|
||||
% (self.to_string(), base.enum.to_string()))
|
||||
elif base.type in [ovs.db.types.IntegerType, ovs.db.types.RealType]:
|
||||
if ((base.min is None or self.value >= base.min) and
|
||||
(base.max is None or self.value <= base.max)):
|
||||
(base.max is None or self.value <= base.max)):
|
||||
pass
|
||||
elif base.min is not None and base.max is not None:
|
||||
raise ConstraintViolation(
|
||||
@@ -171,7 +171,7 @@ class Atom(object):
|
||||
elif base.min is not None:
|
||||
raise ConstraintViolation(
|
||||
"%s is less than minimum allowed value %.15g"
|
||||
% (self.to_string(), base.min))
|
||||
% (self.to_string(), base.min))
|
||||
else:
|
||||
raise ConstraintViolation(
|
||||
"%s is greater than maximum allowed value %.15g"
|
||||
@@ -415,6 +415,18 @@ class Datum(object):
|
||||
s.append(tail)
|
||||
return ''.join(s)
|
||||
|
||||
def diff(self, datum):
|
||||
if self.type.n_max > 1 or len(self.values) == 0:
|
||||
for k, v in six.iteritems(datum.values):
|
||||
if k in self.values and v == self.values[k]:
|
||||
del self.values[k]
|
||||
else:
|
||||
self.values[k] = v
|
||||
else:
|
||||
return datum
|
||||
|
||||
return self
|
||||
|
||||
def as_list(self):
|
||||
if self.type.is_map():
|
||||
return [[k.value, v.value] for k, v in six.iteritems(self.values)]
|
||||
|
@@ -33,6 +33,9 @@ ROW_CREATE = "create"
|
||||
ROW_UPDATE = "update"
|
||||
ROW_DELETE = "delete"
|
||||
|
||||
OVSDB_UPDATE = 0
|
||||
OVSDB_UPDATE2 = 1
|
||||
|
||||
|
||||
class Idl(object):
|
||||
"""Open vSwitch Database Interface Definition Language (OVSDB IDL).
|
||||
@@ -86,6 +89,10 @@ class Idl(object):
|
||||
currently being constructed, if there is one, or None otherwise.
|
||||
"""
|
||||
|
||||
IDL_S_INITIAL = 0
|
||||
IDL_S_MONITOR_REQUESTED = 1
|
||||
IDL_S_MONITOR_COND_REQUESTED = 2
|
||||
|
||||
def __init__(self, remote, schema):
|
||||
"""Creates and returns a connection to the database named 'db_name' on
|
||||
'remote', which should be in a form acceptable to
|
||||
@@ -116,6 +123,8 @@ class Idl(object):
|
||||
self._monitor_request_id = None
|
||||
self._last_seqno = None
|
||||
self.change_seqno = 0
|
||||
self.uuid = uuid.uuid1()
|
||||
self.state = self.IDL_S_INITIAL
|
||||
|
||||
# Database locking.
|
||||
self.lock_name = None # Name of lock we need, None if none.
|
||||
@@ -134,6 +143,7 @@ class Idl(object):
|
||||
table.need_table = False
|
||||
table.rows = {}
|
||||
table.idl = self
|
||||
table.condition = []
|
||||
|
||||
def close(self):
|
||||
"""Closes the connection to the database. The IDL will no longer
|
||||
@@ -180,11 +190,15 @@ class Idl(object):
|
||||
if msg is None:
|
||||
break
|
||||
if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
|
||||
and msg.method == "update"
|
||||
and len(msg.params) == 2
|
||||
and msg.params[0] is None):
|
||||
and msg.method == "update2"
|
||||
and len(msg.params) == 2):
|
||||
# Database contents changed.
|
||||
self.__parse_update(msg.params[1])
|
||||
self.__parse_update(msg.params[1], OVSDB_UPDATE2)
|
||||
elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
|
||||
and msg.method == "update"
|
||||
and len(msg.params) == 2):
|
||||
# Database contents changed.
|
||||
self.__parse_update(msg.params[1], OVSDB_UPDATE)
|
||||
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
|
||||
and self._monitor_request_id is not None
|
||||
and self._monitor_request_id == msg.id):
|
||||
@@ -193,10 +207,15 @@ class Idl(object):
|
||||
self.change_seqno += 1
|
||||
self._monitor_request_id = None
|
||||
self.__clear()
|
||||
self.__parse_update(msg.result)
|
||||
if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
|
||||
self.__parse_update(msg.result, OVSDB_UPDATE2)
|
||||
else:
|
||||
assert self.state == self.IDL_S_MONITOR_REQUESTED
|
||||
self.__parse_update(msg.result, OVSDB_UPDATE)
|
||||
|
||||
except error.Error as e:
|
||||
vlog.err("%s: parse error in received schema: %s"
|
||||
% (self._session.get_name(), e))
|
||||
% (self._session.get_name(), e))
|
||||
self.__error()
|
||||
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
|
||||
and self._lock_request_id is not None
|
||||
@@ -214,6 +233,11 @@ class Idl(object):
|
||||
elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
|
||||
# Reply to our echo request. Ignore it.
|
||||
pass
|
||||
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
|
||||
self.state == self.IDL_S_MONITOR_COND_REQUESTED and
|
||||
self._monitor_request_id == msg.id):
|
||||
if msg.error == "unknown method":
|
||||
self.__send_monitor_request()
|
||||
elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
|
||||
ovs.jsonrpc.Message.T_REPLY)
|
||||
and self.__txn_process_reply(msg)):
|
||||
@@ -228,6 +252,19 @@ class Idl(object):
|
||||
|
||||
return initial_change_seqno != self.change_seqno
|
||||
|
||||
def cond_change(self, table_name, cond):
|
||||
"""Change conditions for this IDL session. If session is not already
|
||||
connected, add condtion to table and submit it on send_monitor_request.
|
||||
Otherwise send monitor_cond_change method with the requested
|
||||
changes."""
|
||||
table = self.tables.get(table_name)
|
||||
if not table:
|
||||
raise error.Error('Unknown table "%s"' % table_name)
|
||||
if self._session.is_connected():
|
||||
self.__send_cond_change(table, cond)
|
||||
else:
|
||||
table.condition = cond
|
||||
|
||||
def wait(self, poller):
|
||||
"""Arranges for poller.block() to wake up when self.run() has something
|
||||
to do or when activity occurs on a transaction on 'self'."""
|
||||
@@ -279,10 +316,18 @@ class Idl(object):
|
||||
:type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
|
||||
:param row: The row as it is after the operation has occured
|
||||
:type row: Row
|
||||
:param updates: For updates, a Row object with just the changed columns
|
||||
:param updates: For updates, row with only updated columns
|
||||
:type updates: Row
|
||||
"""
|
||||
|
||||
def __send_cond_change(self, table, cond):
|
||||
monitor_cond_change = {table.name: [{"where": cond}]}
|
||||
old_uuid = str(self.uuid)
|
||||
self.uuid = uuid.uuid1()
|
||||
params = [old_uuid, str(self.uuid), monitor_cond_change]
|
||||
msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
|
||||
self._session.send(msg)
|
||||
|
||||
def __clear(self):
|
||||
changed = False
|
||||
|
||||
@@ -340,28 +385,39 @@ class Idl(object):
|
||||
self.is_lock_contended = True
|
||||
|
||||
def __send_monitor_request(self):
|
||||
if self.state == self.IDL_S_INITIAL:
|
||||
self.state = self.IDL_S_MONITOR_COND_REQUESTED
|
||||
method = "monitor_cond"
|
||||
else:
|
||||
self.state = self.IDL_S_MONITOR_REQUESTED
|
||||
method = "monitor"
|
||||
|
||||
monitor_requests = {}
|
||||
for table in six.itervalues(self.tables):
|
||||
columns = []
|
||||
for column in six.iterkeys(table.columns):
|
||||
if ((table.name not in self.readonly) or
|
||||
(table.name in self.readonly) and
|
||||
(column not in self.readonly[table.name])):
|
||||
(table.name in self.readonly) and
|
||||
(column not in self.readonly[table.name])):
|
||||
columns.append(column)
|
||||
monitor_requests[table.name] = {"columns": columns}
|
||||
if method == "monitor_cond" and table.condition:
|
||||
monitor_requests[table.name]["where"] = table.condition
|
||||
table.condition = None
|
||||
|
||||
msg = ovs.jsonrpc.Message.create_request(
|
||||
"monitor", [self._db.name, None, monitor_requests])
|
||||
method, [self._db.name, str(self.uuid), monitor_requests])
|
||||
self._monitor_request_id = msg.id
|
||||
self._session.send(msg)
|
||||
|
||||
def __parse_update(self, update):
|
||||
def __parse_update(self, update, version):
|
||||
try:
|
||||
self.__do_parse_update(update)
|
||||
self.__do_parse_update(update, version)
|
||||
except error.Error as e:
|
||||
vlog.err("%s: error parsing update: %s"
|
||||
% (self._session.get_name(), e))
|
||||
|
||||
def __do_parse_update(self, table_updates):
|
||||
def __do_parse_update(self, table_updates, version):
|
||||
if not isinstance(table_updates, dict):
|
||||
raise error.Error("<table-updates> is not an object",
|
||||
table_updates)
|
||||
@@ -390,6 +446,11 @@ class Idl(object):
|
||||
'is not an object'
|
||||
% (table_name, uuid_string))
|
||||
|
||||
if version == OVSDB_UPDATE2:
|
||||
if self.__process_update2(table, uuid, row_update):
|
||||
self.change_seqno += 1
|
||||
continue
|
||||
|
||||
parser = ovs.db.parser.Parser(row_update, "row-update")
|
||||
old = parser.get_optional("old", [dict])
|
||||
new = parser.get_optional("new", [dict])
|
||||
@@ -402,6 +463,45 @@ class Idl(object):
|
||||
if self.__process_update(table, uuid, old, new):
|
||||
self.change_seqno += 1
|
||||
|
||||
def __process_update2(self, table, uuid, row_update):
|
||||
row = table.rows.get(uuid)
|
||||
changed = False
|
||||
if "delete" in row_update:
|
||||
if row:
|
||||
del table.rows[uuid]
|
||||
self.notify(ROW_DELETE, row)
|
||||
changed = True
|
||||
else:
|
||||
# XXX rate-limit
|
||||
vlog.warn("cannot delete missing row %s from table"
|
||||
"%s" % (uuid, table.name))
|
||||
elif "insert" in row_update or "initial" in row_update:
|
||||
if row:
|
||||
vlog.warn("cannot add existing row %s from table"
|
||||
" %s" % (uuid, table.name))
|
||||
del table.rows[uuid]
|
||||
row = self.__create_row(table, uuid)
|
||||
if "insert" in row_update:
|
||||
row_update = row_update['insert']
|
||||
else:
|
||||
row_update = row_update['initial']
|
||||
self.__add_default(table, row_update)
|
||||
if self.__row_update(table, row, row_update):
|
||||
changed = True
|
||||
self.notify(ROW_CREATE, row)
|
||||
elif "modify" in row_update:
|
||||
if not row:
|
||||
raise error.Error('Modify non-existing row')
|
||||
|
||||
self.__apply_diff(table, row, row_update['modify'])
|
||||
self.notify(ROW_UPDATE, row,
|
||||
Row.from_json(self, table, uuid, row_update['modify']))
|
||||
changed = True
|
||||
else:
|
||||
raise error.Error('<row-update> unknown operation',
|
||||
row_update)
|
||||
return changed
|
||||
|
||||
def __process_update(self, table, uuid, old, new):
|
||||
"""Returns True if a column changed, False otherwise."""
|
||||
row = table.rows.get(uuid)
|
||||
@@ -442,6 +542,42 @@ class Idl(object):
|
||||
self.notify(op, row, Row.from_json(self, table, uuid, old))
|
||||
return changed
|
||||
|
||||
def __column_name(self, column):
|
||||
if column.type.key.type == ovs.db.types.UuidType:
|
||||
return ovs.ovsuuid.to_json(column.type.key.type.default)
|
||||
else:
|
||||
return column.type.key.type.default
|
||||
|
||||
def __add_default(self, table, row_update):
|
||||
for column in six.itervalues(table.columns):
|
||||
if column.name not in row_update:
|
||||
if ((table.name not in self.readonly) or
|
||||
(table.name in self.readonly) and
|
||||
(column.name not in self.readonly[table.name])):
|
||||
if column.type.n_min != 0 and not column.type.is_map():
|
||||
row_update[column.name] = self.__column_name(column)
|
||||
|
||||
def __apply_diff(self, table, row, row_diff):
|
||||
for column_name, datum_json in six.iteritems(row_diff):
|
||||
column = table.columns.get(column_name)
|
||||
if not column:
|
||||
# XXX rate-limit
|
||||
vlog.warn("unknown column %s updating table %s"
|
||||
% (column_name, table.name))
|
||||
continue
|
||||
|
||||
try:
|
||||
datum = ovs.db.data.Datum.from_json(column.type, datum_json)
|
||||
except error.Error as e:
|
||||
# XXX rate-limit
|
||||
vlog.warn("error parsing column %s in table %s: %s"
|
||||
% (column_name, table.name, e))
|
||||
continue
|
||||
|
||||
datum = row._data[column_name].diff(datum)
|
||||
if datum != row._data[column_name]:
|
||||
row._data[column_name] = datum
|
||||
|
||||
def __row_update(self, table, row, row_json):
|
||||
changed = False
|
||||
for column_name, datum_json in six.iteritems(row_json):
|
||||
@@ -608,7 +744,7 @@ class Row(object):
|
||||
assert self._idl.txn
|
||||
|
||||
if ((self._table.name in self._idl.readonly) and
|
||||
(column_name in self._idl.readonly[self._table.name])):
|
||||
(column_name in self._idl.readonly[self._table.name])):
|
||||
vlog.warn("attempting to write to readonly column %s"
|
||||
% column_name)
|
||||
return
|
||||
@@ -844,8 +980,8 @@ class Transaction(object):
|
||||
def _substitute_uuids(self, json):
|
||||
if isinstance(json, (list, tuple)):
|
||||
if (len(json) == 2
|
||||
and json[0] == 'uuid'
|
||||
and ovs.ovsuuid.is_valid_string(json[1])):
|
||||
and json[0] == 'uuid'
|
||||
and ovs.ovsuuid.is_valid_string(json[1])):
|
||||
uuid = ovs.ovsuuid.from_string(json[1])
|
||||
row = self._txn_rows.get(uuid, None)
|
||||
if row and row._data is None:
|
||||
@@ -982,14 +1118,14 @@ class Transaction(object):
|
||||
for column_name, datum in six.iteritems(row._changes):
|
||||
if row._data is not None or not datum.is_default():
|
||||
row_json[column_name] = (
|
||||
self._substitute_uuids(datum.to_json()))
|
||||
self._substitute_uuids(datum.to_json()))
|
||||
|
||||
# If anything really changed, consider it an update.
|
||||
# We can't suppress not-really-changed values earlier
|
||||
# or transactions would become nonatomic (see the big
|
||||
# comment inside Transaction._write()).
|
||||
if (not any_updates and row._data is not None and
|
||||
row._data[column_name] != datum):
|
||||
row._data[column_name] != datum):
|
||||
any_updates = True
|
||||
|
||||
if row._data is None or row_json:
|
||||
|
@@ -708,6 +708,103 @@ OVSDB_CHECK_IDL_FETCH_COLUMNS([simple idl, initially populated],
|
||||
003: done
|
||||
]])
|
||||
|
||||
m4_define([OVSDB_CHECK_IDL_WO_MONITOR_COND_PY],
|
||||
[AT_SETUP([$1 - Python])
|
||||
AT_SKIP_IF([test $HAVE_PYTHON = no])
|
||||
AT_KEYWORDS([ovsdb server idl Python monitor $4])
|
||||
AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema],
|
||||
[0], [stdout], [ignore])
|
||||
AT_CHECK([ovsdb-server '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
|
||||
AT_CHECK([ovs-appctl -t "`pwd`"/unixctl ovsdb-server/disable-monitor-cond])
|
||||
AT_CHECK([$PYTHON $srcdir/test-ovsdb.py -t10 idl $srcdir/idltest.ovsschema unix:socket $2],
|
||||
[0], [stdout], [ignore], [kill `cat pid`])
|
||||
AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$5],,, [[| $5]]),
|
||||
[0], [$3], [], [kill `cat pid`])
|
||||
OVSDB_SERVER_SHUTDOWN
|
||||
AT_CLEANUP])
|
||||
|
||||
|
||||
m4_define([OVSDB_CHECK_IDL_WO_MONITOR_COND],
|
||||
[OVSDB_CHECK_IDL_WO_MONITOR_COND_PY($@)])
|
||||
|
||||
|
||||
OVSDB_CHECK_IDL_WO_MONITOR_COND([simple idl disable monitor-cond],
|
||||
[['["idltest",
|
||||
{"op": "insert",
|
||||
"table": "simple",
|
||||
"row": {"i": 1,
|
||||
"r": 2.0,
|
||||
"b": true,
|
||||
"s": "mystring",
|
||||
"u": ["uuid", "84f5c8f5-ac76-4dbc-a24f-8860eb407fc1"],
|
||||
"ia": ["set", [1, 2, 3]],
|
||||
"ra": ["set", [-0.5]],
|
||||
"ba": ["set", [true]],
|
||||
"sa": ["set", ["abc", "def"]],
|
||||
"ua": ["set", [["uuid", "69443985-7806-45e2-b35f-574a04e720f9"],
|
||||
["uuid", "aad11ef0-816a-4b01-93e6-03b8b4256b98"]]]}},
|
||||
{"op": "insert",
|
||||
"table": "simple",
|
||||
"row": {}}]' \
|
||||
'["idltest",
|
||||
{"op": "update",
|
||||
"table": "simple",
|
||||
"where": [],
|
||||
"row": {"b": true}}]' \
|
||||
'["idltest",
|
||||
{"op": "update",
|
||||
"table": "simple",
|
||||
"where": [],
|
||||
"row": {"r": 123.5}}]' \
|
||||
'["idltest",
|
||||
{"op": "insert",
|
||||
"table": "simple",
|
||||
"row": {"i": -1,
|
||||
"r": 125,
|
||||
"b": false,
|
||||
"s": "",
|
||||
"ia": ["set", [1]],
|
||||
"ra": ["set", [1.5]],
|
||||
"ba": ["set", [false]],
|
||||
"sa": ["set", []],
|
||||
"ua": ["set", []]}}]' \
|
||||
'["idltest",
|
||||
{"op": "update",
|
||||
"table": "simple",
|
||||
"where": [["i", "<", 1]],
|
||||
"row": {"s": "newstring"}}]' \
|
||||
'["idltest",
|
||||
{"op": "delete",
|
||||
"table": "simple",
|
||||
"where": [["i", "==", 0]]}]' \
|
||||
'reconnect']],
|
||||
[[000: empty
|
||||
001: {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]}
|
||||
002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
|
||||
002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
|
||||
003: {"error":null,"result":[{"count":2}]}
|
||||
004: i=0 r=0 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
|
||||
004: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
|
||||
005: {"error":null,"result":[{"count":2}]}
|
||||
006: i=0 r=123.5 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
|
||||
006: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
|
||||
007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]}
|
||||
008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
|
||||
008: i=0 r=123.5 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
|
||||
008: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
|
||||
009: {"error":null,"result":[{"count":2}]}
|
||||
010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
|
||||
010: i=0 r=123.5 b=true s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
|
||||
010: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
|
||||
011: {"error":null,"result":[{"count":1}]}
|
||||
012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
|
||||
012: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
|
||||
013: reconnect
|
||||
014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
|
||||
014: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
|
||||
015: done
|
||||
]])
|
||||
|
||||
m4_define([OVSDB_CHECK_IDL_TRACK_C],
|
||||
[AT_SETUP([$1 - C])
|
||||
AT_KEYWORDS([ovsdb server idl tracking positive $5])
|
||||
|
Reference in New Issue
Block a user