2
0
mirror of https://github.com/openvswitch/ovs synced 2025-09-01 14:55:18 +00:00

python: Monitor Database table to manage lifecycle of IDL client.

The Python IDL implementation supports ovsdb cluster connections.
This patch is a follow up to commit 31e434fc98, it adds the option of
connecting to the leader (the default) in the Raft-based cluster. It mimics
the exisiting C IDL support for clusters introduced in commit 1b1d2e6daa.

The _Server database schema is first requested, then a monitor of the
Database table in the _Server Database. Method __check_server_db verifies
the eligibility of the server. If the attempt to obtain a monitor of the
_Server database fails and a cluster id was not provided this implementation
proceeds to request the data monitor. If a cluster id was provided via the
set_cluster_id method then the connection is aborted and a connection to a
different node is instead attempted, until a valid cluster node is found.
Thus, when supplied, cluster id is interpreted as the intention to only
allow connections to a clustered database. If not supplied, connections to
standalone nodes, or nodes that do not have the _Server database are
allowed. change_seqno is not incremented in the case of Database table
updates.

Acked-by: Numan Siddique <nusiddiq@redhat.com>
Signed-off-by: Ted Elhourani <ted.elhourani@nutanix.com>
Signed-off-by: Ben Pfaff <blp@ovn.org>
This commit is contained in:
Ted Elhourani
2019-01-25 19:10:01 +00:00
committed by Ben Pfaff
parent 2a6d9168d6
commit c39751e445
4 changed files with 372 additions and 46 deletions

View File

@@ -38,6 +38,8 @@ ROW_DELETE = "delete"
OVSDB_UPDATE = 0 OVSDB_UPDATE = 0
OVSDB_UPDATE2 = 1 OVSDB_UPDATE2 = 1
CLUSTERED = "clustered"
class Idl(object): class Idl(object):
"""Open vSwitch Database Interface Definition Language (OVSDB IDL). """Open vSwitch Database Interface Definition Language (OVSDB IDL).
@@ -92,10 +94,13 @@ class Idl(object):
""" """
IDL_S_INITIAL = 0 IDL_S_INITIAL = 0
IDL_S_MONITOR_REQUESTED = 1 IDL_S_SERVER_SCHEMA_REQUESTED = 1
IDL_S_MONITOR_COND_REQUESTED = 2 IDL_S_SERVER_MONITOR_REQUESTED = 2
IDL_S_DATA_MONITOR_REQUESTED = 3
IDL_S_DATA_MONITOR_COND_REQUESTED = 4
def __init__(self, remote, schema_helper, probe_interval=None): def __init__(self, remote, schema_helper, probe_interval=None,
leader_only=True):
"""Creates and returns a connection to the database named 'db_name' on """Creates and returns a connection to the database named 'db_name' on
'remote', which should be in a form acceptable to 'remote', which should be in a form acceptable to
ovs.jsonrpc.session.open(). The connection will maintain an in-memory ovs.jsonrpc.session.open(). The connection will maintain an in-memory
@@ -119,6 +124,9 @@ class Idl(object):
The IDL uses and modifies 'schema' directly. The IDL uses and modifies 'schema' directly.
If 'leader_only' is set to True (default value) the IDL will only
monitor and transact with the leader of the cluster.
If "probe_interval" is zero it disables the connection keepalive If "probe_interval" is zero it disables the connection keepalive
feature. If non-zero the value will be forced to at least 1000 feature. If non-zero the value will be forced to at least 1000
milliseconds. If None it will just use the default value in OVS. milliseconds. If None it will just use the default value in OVS.
@@ -137,6 +145,20 @@ class Idl(object):
self._last_seqno = None self._last_seqno = None
self.change_seqno = 0 self.change_seqno = 0
self.uuid = uuid.uuid1() self.uuid = uuid.uuid1()
# Server monitor.
self._server_schema_request_id = None
self._server_monitor_request_id = None
self._db_change_aware_request_id = None
self._server_db_name = '_Server'
self._server_db_table = 'Database'
self.server_tables = None
self._server_db = None
self.server_monitor_uuid = uuid.uuid1()
self.leader_only = leader_only
self.cluster_id = None
self._min_index = 0
self.state = self.IDL_S_INITIAL self.state = self.IDL_S_INITIAL
# Database locking. # Database locking.
@@ -172,6 +194,12 @@ class Idl(object):
remotes.append(r) remotes.append(r)
return remotes return remotes
def set_cluster_id(self, cluster_id):
"""Set the id of the cluster that this idl must connect to."""
self.cluster_id = cluster_id
if self.state != self.IDL_S_INITIAL:
self.force_reconnect()
def index_create(self, table, name): def index_create(self, table, name):
"""Create a named multi-column index on a table""" """Create a named multi-column index on a table"""
return self.tables[table].rows.index_create(name) return self.tables[table].rows.index_create(name)
@@ -222,7 +250,7 @@ class Idl(object):
if seqno != self._last_seqno: if seqno != self._last_seqno:
self._last_seqno = seqno self._last_seqno = seqno
self.__txn_abort_all() self.__txn_abort_all()
self.__send_monitor_request() self.__send_server_schema_request()
if self.lock_name: if self.lock_name:
self.__send_lock_request() self.__send_lock_request()
break break
@@ -230,6 +258,7 @@ class Idl(object):
msg = self._session.recv() msg = self._session.recv()
if msg is None: if msg is None:
break break
if (msg.type == ovs.jsonrpc.Message.T_NOTIFY if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
and msg.method == "update2" and msg.method == "update2"
and len(msg.params) == 2): and len(msg.params) == 2):
@@ -239,7 +268,15 @@ class Idl(object):
and msg.method == "update" and msg.method == "update"
and len(msg.params) == 2): and len(msg.params) == 2):
# Database contents changed. # Database contents changed.
self.__parse_update(msg.params[1], OVSDB_UPDATE) if msg.params[0] == str(self.server_monitor_uuid):
self.__parse_update(msg.params[1], OVSDB_UPDATE,
tables=self.server_tables)
self.change_seqno = initial_change_seqno
if not self.__check_server_db():
self.force_reconnect()
break
else:
self.__parse_update(msg.params[1], OVSDB_UPDATE)
elif (msg.type == ovs.jsonrpc.Message.T_REPLY elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._monitor_request_id is not None and self._monitor_request_id is not None
and self._monitor_request_id == msg.id): and self._monitor_request_id == msg.id):
@@ -248,16 +285,66 @@ class Idl(object):
self.change_seqno += 1 self.change_seqno += 1
self._monitor_request_id = None self._monitor_request_id = None
self.__clear() self.__clear()
if self.state == self.IDL_S_MONITOR_COND_REQUESTED: if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
self.__parse_update(msg.result, OVSDB_UPDATE2) self.__parse_update(msg.result, OVSDB_UPDATE2)
else: else:
assert self.state == self.IDL_S_MONITOR_REQUESTED assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
self.__parse_update(msg.result, OVSDB_UPDATE) self.__parse_update(msg.result, OVSDB_UPDATE)
except error.Error as e: except error.Error as e:
vlog.err("%s: parse error in received schema: %s" vlog.err("%s: parse error in received schema: %s"
% (self._session.get_name(), e)) % (self._session.get_name(), e))
self.__error() self.__error()
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._server_schema_request_id is not None
and self._server_schema_request_id == msg.id):
# Reply to our "get_schema" of _Server request.
try:
self._server_schema_request_id = None
sh = SchemaHelper(None, msg.result)
sh.register_table(self._server_db_table)
schema = sh.get_idl_schema()
self._server_db = schema
self.server_tables = schema.tables
self.__send_server_monitor_request()
except error.Error as e:
vlog.err("%s: error receiving server schema: %s"
% (self._session.get_name(), e))
if self.cluster_id:
self.__error()
break
else:
self.change_seqno = initial_change_seqno
self.__send_monitor_request()
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._server_monitor_request_id is not None
and self._server_monitor_request_id == msg.id):
# Reply to our "monitor" of _Server request.
try:
self._server_monitor_request_id = None
self.__parse_update(msg.result, OVSDB_UPDATE,
tables=self.server_tables)
self.change_seqno = initial_change_seqno
if self.__check_server_db():
self.__send_monitor_request()
self.__send_db_change_aware()
else:
self.force_reconnect()
break
except error.Error as e:
vlog.err("%s: parse error in received schema: %s"
% (self._session.get_name(), e))
if self.cluster_id:
self.__error()
break
else:
self.change_seqno = initial_change_seqno
self.__send_monitor_request()
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._db_change_aware_request_id is not None
and self._db_change_aware_request_id == msg.id):
# Reply to us notifying the server of our change awarness.
self._db_change_aware_request_id = None
elif (msg.type == ovs.jsonrpc.Message.T_REPLY elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._lock_request_id is not None and self._lock_request_id is not None
and self._lock_request_id == msg.id): and self._lock_request_id == msg.id):
@@ -275,10 +362,20 @@ class Idl(object):
# Reply to our echo request. Ignore it. # Reply to our echo request. Ignore it.
pass pass
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
self.state == self.IDL_S_MONITOR_COND_REQUESTED and self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
self._monitor_request_id == msg.id): self._monitor_request_id == msg.id):
if msg.error == "unknown method": if msg.error == "unknown method":
self.__send_monitor_request() self.__send_monitor_request()
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
self._server_schema_request_id is not None and
self._server_schema_request_id == msg.id):
self._server_schema_request_id = None
if self.cluster_id:
self.force_reconnect()
break
else:
self.change_seqno = initial_change_seqno
self.__send_monitor_request()
elif (msg.type in (ovs.jsonrpc.Message.T_ERROR, elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
ovs.jsonrpc.Message.T_REPLY) ovs.jsonrpc.Message.T_REPLY)
and self.__txn_process_reply(msg)): and self.__txn_process_reply(msg)):
@@ -342,6 +439,9 @@ class Idl(object):
In the meantime, the contents of the IDL will not change.""" In the meantime, the contents of the IDL will not change."""
self._session.force_reconnect() self._session.force_reconnect()
def session_name(self):
return self._session.get_name()
def set_lock(self, lock_name): def set_lock(self, lock_name):
"""If 'lock_name' is not None, configures the IDL to obtain the named """If 'lock_name' is not None, configures the IDL to obtain the named
lock from the database server and to avoid modifying the database when lock from the database server and to avoid modifying the database when
@@ -440,12 +540,19 @@ class Idl(object):
if not new_has_lock: if not new_has_lock:
self.is_lock_contended = True self.is_lock_contended = True
def __send_db_change_aware(self):
msg = ovs.jsonrpc.Message.create_request("set_db_change_aware",
[True])
self._db_change_aware_request_id = msg.id
self._session.send(msg)
def __send_monitor_request(self): def __send_monitor_request(self):
if self.state == self.IDL_S_INITIAL: if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
self.state = self.IDL_S_MONITOR_COND_REQUESTED self.IDL_S_INITIAL]):
self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
method = "monitor_cond" method = "monitor_cond"
else: else:
self.state = self.IDL_S_MONITOR_REQUESTED self.state = self.IDL_S_DATA_MONITOR_REQUESTED
method = "monitor" method = "monitor"
monitor_requests = {} monitor_requests = {}
@@ -467,20 +574,50 @@ class Idl(object):
self._monitor_request_id = msg.id self._monitor_request_id = msg.id
self._session.send(msg) self._session.send(msg)
def __parse_update(self, update, version): def __send_server_schema_request(self):
self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
msg = ovs.jsonrpc.Message.create_request(
"get_schema", [self._server_db_name, str(self.uuid)])
self._server_schema_request_id = msg.id
self._session.send(msg)
def __send_server_monitor_request(self):
self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
monitor_requests = {}
table = self.server_tables[self._server_db_table]
columns = [column for column in six.iterkeys(table.columns)]
for column in six.itervalues(table.columns):
if not hasattr(column, 'alert'):
column.alert = True
table.rows = custom_index.IndexedRows(table)
table.need_table = False
table.idl = self
monitor_request = {"columns": columns}
monitor_requests[table.name] = [monitor_request]
msg = ovs.jsonrpc.Message.create_request(
'monitor', [self._server_db.name,
str(self.server_monitor_uuid),
monitor_requests])
self._server_monitor_request_id = msg.id
self._session.send(msg)
def __parse_update(self, update, version, tables=None):
try: try:
self.__do_parse_update(update, version) if not tables:
self.__do_parse_update(update, version, self.tables)
else:
self.__do_parse_update(update, version, tables)
except error.Error as e: except error.Error as e:
vlog.err("%s: error parsing update: %s" vlog.err("%s: error parsing update: %s"
% (self._session.get_name(), e)) % (self._session.get_name(), e))
def __do_parse_update(self, table_updates, version): def __do_parse_update(self, table_updates, version, tables):
if not isinstance(table_updates, dict): if not isinstance(table_updates, dict):
raise error.Error("<table-updates> is not an object", raise error.Error("<table-updates> is not an object",
table_updates) table_updates)
for table_name, table_update in six.iteritems(table_updates): for table_name, table_update in six.iteritems(table_updates):
table = self.tables.get(table_name) table = tables.get(table_name)
if not table: if not table:
raise error.Error('<table-updates> includes unknown ' raise error.Error('<table-updates> includes unknown '
'table "%s"' % table_name) 'table "%s"' % table_name)
@@ -605,6 +742,58 @@ class Idl(object):
self.notify(op, row, Row.from_json(self, table, uuid, old)) self.notify(op, row, Row.from_json(self, table, uuid, old))
return changed return changed
def __check_server_db(self):
"""Returns True if this is a valid server database, False otherwise."""
session_name = self.session_name()
if self._server_db_table not in self.server_tables:
vlog.info("%s: server does not have %s table in its %s database"
% (session_name, self._server_db_table,
self._server_db_name))
return False
rows = self.server_tables[self._server_db_table].rows
database = None
for row in six.itervalues(rows):
if self.cluster_id:
if self.cluster_id in \
map(lambda x: str(x)[:4], row.cid):
database = row
break
elif row.name == self._db.name:
database = row
break
if not database:
vlog.info("%s: server does not have %s database"
% (session_name, self._db.name))
return False
if (database.model == CLUSTERED and
self._session.get_num_of_remotes() > 1):
if not database.schema:
vlog.info('%s: clustered database server has not yet joined '
'cluster; trying another server' % session_name)
return False
if not database.connected:
vlog.info('%s: clustered database server is disconnected '
'from cluster; trying another server' % session_name)
return False
if (self.leader_only and
not database.leader):
vlog.info('%s: clustered database server is not cluster '
'leader; trying another server' % session_name)
return False
if database.index:
if database.index[0] < self._min_index:
vlog.warn('%s: clustered database server has stale data; '
'trying another server' % session_name)
return False
self._min_index = database.index[0]
return True
def __column_name(self, column): def __column_name(self, column):
if column.type.key.type == ovs.db.types.UuidType: if column.type.key.type == ovs.db.types.UuidType:
return ovs.ovsuuid.to_json(column.type.key.type.default) return ovs.ovsuuid.to_json(column.type.key.type.default)

View File

@@ -344,6 +344,9 @@ class Reconnect(object):
else: else:
self.info_level("%s: error listening for connections" self.info_level("%s: error listening for connections"
% self.name) % self.name)
elif self.state == Reconnect.Reconnect:
self.info_level("%s: connection closed by client"
% self.name)
elif self.backoff < self.max_backoff: elif self.backoff < self.max_backoff:
if self.passive: if self.passive:
type_ = "listen" type_ = "listen"

View File

@@ -11,7 +11,42 @@ ovsdb_start_idltest () {
ovsdb-server -vconsole:warn --log-file --detach --no-chdir --pidfile --remote=punix:socket ${1:+--remote=$1} db || return $? ovsdb-server -vconsole:warn --log-file --detach --no-chdir --pidfile --remote=punix:socket ${1:+--remote=$1} db || return $?
on_exit 'kill `cat ovsdb-server.pid`' on_exit 'kill `cat ovsdb-server.pid`'
} }
])
# ovsdb_cluster_start_idltest [REMOTE] [SCHEMA]
#
# Creates a database using SCHEMA (default: idltest.ovsschema) and
# starts a database cluster listening on punix:socket and REMOTE (if
# specified).
ovsdb_cluster_start_idltest () {
local n=$1
ovsdb-tool create-cluster s1.db $abs_srcdir/idltest.ovsschema unix:s1.raft || return $?
cid=`ovsdb-tool db-cid s1.db`
schema_name=`ovsdb-tool schema-name $abs_srcdir/idltest.ovsschema`
for i in `seq 2 $n`; do
ovsdb-tool join-cluster s$i.db $schema_name unix:s$i.raft unix:s1.raft || return $?
done
for i in `seq $n`; do
ovsdb-server -vraft -vconsole:warn --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb ${2:+--remote=$2} s$i.db || return $?
done
on_exit 'kill `cat s*.pid`'
}
# ovsdb_cluster_leader [REMOTES] [DATABASE]
#
# Returns the leader of the DATABASE cluster.
ovsdb_cluster_leader () {
remotes=$(echo $1 | tr "," "\n")
for remote in $remotes; do
ovsdb-client dump $remote _Server Database name leader | grep $2 | grep -q true
if [[ $? == 0 ]]; then
port=$(echo $remote | cut -d':' -f 3)
log=$(grep --include=s\*.log -rlnw -e "listening on port $port" ./)
pid=$(echo $log | sed 's/\(.*\.\)log/\1pid/' )
echo "${remote}|${pid}"
return
fi
done
}])
# OVSDB_CHECK_IDL_C(TITLE, [PRE-IDL-TXN], TRANSACTIONS, OUTPUT, [KEYWORDS], # OVSDB_CHECK_IDL_C(TITLE, [PRE-IDL-TXN], TRANSACTIONS, OUTPUT, [KEYWORDS],
# [FILTER]) # [FILTER])
@@ -1466,40 +1501,44 @@ OVSDB_CHECK_IDL_NOTIFY([simple idl verify notify],
"where": [["i", "==", 0]]}]' \ "where": [["i", "==", 0]]}]' \
'reconnect']], 'reconnect']],
[[000: empty [[000: empty
001: {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]} 000: event:create, row={uuid=<0>}, updates=None
002: event:create, row={i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates=None 000: event:create, row={uuid=<1>}, updates=None
002: event:create, row={i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None 001: {"error":null,"result":[{"uuid":["uuid","<2>"]},{"uuid":["uuid","<3>"]}]}
002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> 002: event:create, row={i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates=None
002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> 002: event:create, row={i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None
002: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
002: i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
003: {"error":null,"result":[{"count":2}]} 003: {"error":null,"result":[{"count":2}]}
004: event:update, row={i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={b=true uuid=<0>} 004: event:update, row={i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={b=true uuid=<2>}
004: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> 004: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
004: i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> 004: i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
005: {"error":null,"result":[{"count":2}]} 005: {"error":null,"result":[{"count":2}]}
006: event:update, row={i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates={r=0 uuid=<1>} 006: event:update, row={i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates={r=0 uuid=<3>}
006: event:update, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={r=2 uuid=<0>} 006: event:update, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={r=2 uuid=<2>}
006: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> 006: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
006: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> 006: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]} 007: {"error":null,"result":[{"uuid":["uuid","<8>"]}]}
008: event:create, row={i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None 008: event:create, row={i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None
008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> 008: i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
008: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> 008: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
008: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> 008: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
009: {"error":null,"result":[{"count":2}]} 009: {"error":null,"result":[{"count":2}]}
010: event:update, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates={s= uuid=<6>} 010: event:update, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates={s= uuid=<8>}
010: event:update, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates={s= uuid=<1>} 010: event:update, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates={s= uuid=<3>}
010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> 010: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
010: i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> 010: i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
010: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> 010: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
011: {"error":null,"result":[{"count":1}]} 011: {"error":null,"result":[{"count":1}]}
012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates=None 012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates=None
012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> 012: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
012: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> 012: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
013: reconnect 013: reconnect
014: event:create, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None 014: event:create, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None
014: event:create, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None 014: event:create, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None
014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6> 014: event:create, row={uuid=<0>}, updates=None
014: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0> 014: event:create, row={uuid=<1>}, updates=None
014: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
014: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
015: done 015: done
]]) ]])
@@ -1853,3 +1892,33 @@ m4_define([CHECK_STREAM_OPEN_BLOCK_PY],
CHECK_STREAM_OPEN_BLOCK_PY([Check PY2 Stream open block], [$HAVE_PYTHON2], [$PYTHON2]) CHECK_STREAM_OPEN_BLOCK_PY([Check PY2 Stream open block], [$HAVE_PYTHON2], [$PYTHON2])
CHECK_STREAM_OPEN_BLOCK_PY([Check PY3 Stream open block], [$HAVE_PYTHON3], [$PYTHON3]) CHECK_STREAM_OPEN_BLOCK_PY([Check PY3 Stream open block], [$HAVE_PYTHON3], [$PYTHON3])
# same as OVSDB_CHECK_IDL but uses Python IDL implementation with tcp
# with multiple remotes to assert the idl connects to the leader of the Raft cluster
m4_define([OVSDB_CHECK_IDL_LEADER_ONLY_PYN],
[AT_SETUP([$1])
AT_SKIP_IF([test $7 = no])
AT_KEYWORDS([ovsdb server idl Python leader_only with tcp socket])
m4_define([LPBK],[127.0.0.1])
AT_CHECK([ovsdb_cluster_start_idltest $2 "ptcp:0:"LPBK])
PARSE_LISTENING_PORT([s2.log], [TCP_PORT_1])
PARSE_LISTENING_PORT([s3.log], [TCP_PORT_2])
PARSE_LISTENING_PORT([s1.log], [TCP_PORT_3])
remotes=tcp:LPBK:$TCP_PORT_1,tcp:LPBK:$TCP_PORT_2,tcp:LPBK:$TCP_PORT_3
pids=$(cat s2.pid s3.pid s1.pid | tr '\n' ',')
echo $pids
AT_CHECK([$8 $srcdir/test-ovsdb.py -t30 idl-cluster $srcdir/idltest.ovsschema $remotes $pids $3],
[0], [stdout], [ignore])
remote=$(ovsdb_cluster_leader $remotes "idltest")
leader=$(echo $remote | cut -d'|' -f 1)
AT_CHECK([grep -F -- "${leader}" stdout], [0], [ignore])
AT_CLEANUP])
m4_define([OVSDB_CHECK_IDL_LEADER_ONLY_PY],
[OVSDB_CHECK_IDL_LEADER_ONLY_PYN([$1 - Python2 (leader only)], [$2], [$3], [$4], [$5], [$6],
[$HAVE_PYTHON], [$PYTHON])
OVSDB_CHECK_IDL_LEADER_ONLY_PYN([$1 - Python3 (leader only)], [$2], [$3], [$4], [$5], [$6],
[$HAVE_PYTHON3], [$PYTHON3])])
OVSDB_CHECK_IDL_LEADER_ONLY_PY([Check Python IDL connects to leader], 3, ['remote'])
OVSDB_CHECK_IDL_LEADER_ONLY_PY([Check Python IDL reconnects to leader], 3, ['remote' '+remotestop' 'remote'])

View File

@@ -758,6 +758,70 @@ def do_idl_passive(schema_file, remote, *commands):
print("%03d: done" % step) print("%03d: done" % step)
def do_idl_cluster(schema_file, remote, pid, *commands):
schema_helper = ovs.db.idl.SchemaHelper(schema_file)
if remote.startswith("ssl:"):
if len(commands) < 3:
sys.stderr.write("SSL connection requires private key, "
"certificate for private key, and peer CA "
"certificate as arguments\n")
sys.exit(1)
ovs.stream.Stream.ssl_set_private_key_file(commands[0])
ovs.stream.Stream.ssl_set_certificate_file(commands[1])
ovs.stream.Stream.ssl_set_ca_cert_file(commands[2])
commands = commands[3:]
schema_helper.register_all()
idl = ovs.db.idl.Idl(remote, schema_helper)
step = 0
seqno = 0
commands = list(commands)
for command in commands:
if command.startswith("+"):
# The previous transaction didn't change anything.
command = command[1:]
else:
# Wait for update.
while idl.change_seqno == seqno and not idl.run():
poller = ovs.poller.Poller()
idl.wait(poller)
poller.block()
step += 1
seqno = idl.change_seqno
if command == "reconnect":
print("%03d: reconnect" % step)
sys.stdout.flush()
step += 1
idl.force_reconnect()
elif command == "remote":
print("%03d: %s" % (step, idl.session_name()))
sys.stdout.flush()
step += 1
elif command == "remotestop":
r = idl.session_name()
remotes = remote.split(',')
i = remotes.index(r)
pids = pid.split(',')
command = None
try:
command = "kill %s" % pids[i]
except ValueError as error:
sys.stderr.write("Cannot find pid of remote: %s\n"
% os.strerror(error))
sys.exit(1)
os.popen(command)
print("%03d: stop %s" % (step, pids[i]))
sys.stdout.flush()
step += 1
idl.close()
print("%03d: done" % step)
def usage(): def usage():
print("""\ print("""\
%(program_name)s: test utility for Open vSwitch database Python bindings %(program_name)s: test utility for Open vSwitch database Python bindings
@@ -861,7 +925,8 @@ def main(argv):
"parse-table": (do_parse_table, (2, 3)), "parse-table": (do_parse_table, (2, 3)),
"parse-schema": (do_parse_schema, 1), "parse-schema": (do_parse_schema, 1),
"idl": (do_idl, (2,)), "idl": (do_idl, (2,)),
"idl_passive": (do_idl_passive, (2,))} "idl_passive": (do_idl_passive, (2,)),
"idl-cluster": (do_idl_cluster, (3,))}
command_name = args[0] command_name = args[0]
args = args[1:] args = args[1:]