mirror of
https://github.com/openvswitch/ovs
synced 2025-09-01 14:55:18 +00:00
python: Monitor Database table to manage lifecycle of IDL client.
The Python IDL implementation supports ovsdb cluster connections. This patch is a follow up to commit31e434fc98
, it adds the option of connecting to the leader (the default) in the Raft-based cluster. It mimics the exisiting C IDL support for clusters introduced in commit1b1d2e6daa
. The _Server database schema is first requested, then a monitor of the Database table in the _Server Database. Method __check_server_db verifies the eligibility of the server. If the attempt to obtain a monitor of the _Server database fails and a cluster id was not provided this implementation proceeds to request the data monitor. If a cluster id was provided via the set_cluster_id method then the connection is aborted and a connection to a different node is instead attempted, until a valid cluster node is found. Thus, when supplied, cluster id is interpreted as the intention to only allow connections to a clustered database. If not supplied, connections to standalone nodes, or nodes that do not have the _Server database are allowed. change_seqno is not incremented in the case of Database table updates. Acked-by: Numan Siddique <nusiddiq@redhat.com> Signed-off-by: Ted Elhourani <ted.elhourani@nutanix.com> Signed-off-by: Ben Pfaff <blp@ovn.org>
This commit is contained in:
@@ -38,6 +38,8 @@ ROW_DELETE = "delete"
|
|||||||
OVSDB_UPDATE = 0
|
OVSDB_UPDATE = 0
|
||||||
OVSDB_UPDATE2 = 1
|
OVSDB_UPDATE2 = 1
|
||||||
|
|
||||||
|
CLUSTERED = "clustered"
|
||||||
|
|
||||||
|
|
||||||
class Idl(object):
|
class Idl(object):
|
||||||
"""Open vSwitch Database Interface Definition Language (OVSDB IDL).
|
"""Open vSwitch Database Interface Definition Language (OVSDB IDL).
|
||||||
@@ -92,10 +94,13 @@ class Idl(object):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
IDL_S_INITIAL = 0
|
IDL_S_INITIAL = 0
|
||||||
IDL_S_MONITOR_REQUESTED = 1
|
IDL_S_SERVER_SCHEMA_REQUESTED = 1
|
||||||
IDL_S_MONITOR_COND_REQUESTED = 2
|
IDL_S_SERVER_MONITOR_REQUESTED = 2
|
||||||
|
IDL_S_DATA_MONITOR_REQUESTED = 3
|
||||||
|
IDL_S_DATA_MONITOR_COND_REQUESTED = 4
|
||||||
|
|
||||||
def __init__(self, remote, schema_helper, probe_interval=None):
|
def __init__(self, remote, schema_helper, probe_interval=None,
|
||||||
|
leader_only=True):
|
||||||
"""Creates and returns a connection to the database named 'db_name' on
|
"""Creates and returns a connection to the database named 'db_name' on
|
||||||
'remote', which should be in a form acceptable to
|
'remote', which should be in a form acceptable to
|
||||||
ovs.jsonrpc.session.open(). The connection will maintain an in-memory
|
ovs.jsonrpc.session.open(). The connection will maintain an in-memory
|
||||||
@@ -119,6 +124,9 @@ class Idl(object):
|
|||||||
|
|
||||||
The IDL uses and modifies 'schema' directly.
|
The IDL uses and modifies 'schema' directly.
|
||||||
|
|
||||||
|
If 'leader_only' is set to True (default value) the IDL will only
|
||||||
|
monitor and transact with the leader of the cluster.
|
||||||
|
|
||||||
If "probe_interval" is zero it disables the connection keepalive
|
If "probe_interval" is zero it disables the connection keepalive
|
||||||
feature. If non-zero the value will be forced to at least 1000
|
feature. If non-zero the value will be forced to at least 1000
|
||||||
milliseconds. If None it will just use the default value in OVS.
|
milliseconds. If None it will just use the default value in OVS.
|
||||||
@@ -137,6 +145,20 @@ class Idl(object):
|
|||||||
self._last_seqno = None
|
self._last_seqno = None
|
||||||
self.change_seqno = 0
|
self.change_seqno = 0
|
||||||
self.uuid = uuid.uuid1()
|
self.uuid = uuid.uuid1()
|
||||||
|
|
||||||
|
# Server monitor.
|
||||||
|
self._server_schema_request_id = None
|
||||||
|
self._server_monitor_request_id = None
|
||||||
|
self._db_change_aware_request_id = None
|
||||||
|
self._server_db_name = '_Server'
|
||||||
|
self._server_db_table = 'Database'
|
||||||
|
self.server_tables = None
|
||||||
|
self._server_db = None
|
||||||
|
self.server_monitor_uuid = uuid.uuid1()
|
||||||
|
self.leader_only = leader_only
|
||||||
|
self.cluster_id = None
|
||||||
|
self._min_index = 0
|
||||||
|
|
||||||
self.state = self.IDL_S_INITIAL
|
self.state = self.IDL_S_INITIAL
|
||||||
|
|
||||||
# Database locking.
|
# Database locking.
|
||||||
@@ -172,6 +194,12 @@ class Idl(object):
|
|||||||
remotes.append(r)
|
remotes.append(r)
|
||||||
return remotes
|
return remotes
|
||||||
|
|
||||||
|
def set_cluster_id(self, cluster_id):
|
||||||
|
"""Set the id of the cluster that this idl must connect to."""
|
||||||
|
self.cluster_id = cluster_id
|
||||||
|
if self.state != self.IDL_S_INITIAL:
|
||||||
|
self.force_reconnect()
|
||||||
|
|
||||||
def index_create(self, table, name):
|
def index_create(self, table, name):
|
||||||
"""Create a named multi-column index on a table"""
|
"""Create a named multi-column index on a table"""
|
||||||
return self.tables[table].rows.index_create(name)
|
return self.tables[table].rows.index_create(name)
|
||||||
@@ -222,7 +250,7 @@ class Idl(object):
|
|||||||
if seqno != self._last_seqno:
|
if seqno != self._last_seqno:
|
||||||
self._last_seqno = seqno
|
self._last_seqno = seqno
|
||||||
self.__txn_abort_all()
|
self.__txn_abort_all()
|
||||||
self.__send_monitor_request()
|
self.__send_server_schema_request()
|
||||||
if self.lock_name:
|
if self.lock_name:
|
||||||
self.__send_lock_request()
|
self.__send_lock_request()
|
||||||
break
|
break
|
||||||
@@ -230,6 +258,7 @@ class Idl(object):
|
|||||||
msg = self._session.recv()
|
msg = self._session.recv()
|
||||||
if msg is None:
|
if msg is None:
|
||||||
break
|
break
|
||||||
|
|
||||||
if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
|
if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
|
||||||
and msg.method == "update2"
|
and msg.method == "update2"
|
||||||
and len(msg.params) == 2):
|
and len(msg.params) == 2):
|
||||||
@@ -239,7 +268,15 @@ class Idl(object):
|
|||||||
and msg.method == "update"
|
and msg.method == "update"
|
||||||
and len(msg.params) == 2):
|
and len(msg.params) == 2):
|
||||||
# Database contents changed.
|
# Database contents changed.
|
||||||
self.__parse_update(msg.params[1], OVSDB_UPDATE)
|
if msg.params[0] == str(self.server_monitor_uuid):
|
||||||
|
self.__parse_update(msg.params[1], OVSDB_UPDATE,
|
||||||
|
tables=self.server_tables)
|
||||||
|
self.change_seqno = initial_change_seqno
|
||||||
|
if not self.__check_server_db():
|
||||||
|
self.force_reconnect()
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
self.__parse_update(msg.params[1], OVSDB_UPDATE)
|
||||||
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
|
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
|
||||||
and self._monitor_request_id is not None
|
and self._monitor_request_id is not None
|
||||||
and self._monitor_request_id == msg.id):
|
and self._monitor_request_id == msg.id):
|
||||||
@@ -248,16 +285,66 @@ class Idl(object):
|
|||||||
self.change_seqno += 1
|
self.change_seqno += 1
|
||||||
self._monitor_request_id = None
|
self._monitor_request_id = None
|
||||||
self.__clear()
|
self.__clear()
|
||||||
if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
|
if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
|
||||||
self.__parse_update(msg.result, OVSDB_UPDATE2)
|
self.__parse_update(msg.result, OVSDB_UPDATE2)
|
||||||
else:
|
else:
|
||||||
assert self.state == self.IDL_S_MONITOR_REQUESTED
|
assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
|
||||||
self.__parse_update(msg.result, OVSDB_UPDATE)
|
self.__parse_update(msg.result, OVSDB_UPDATE)
|
||||||
|
|
||||||
except error.Error as e:
|
except error.Error as e:
|
||||||
vlog.err("%s: parse error in received schema: %s"
|
vlog.err("%s: parse error in received schema: %s"
|
||||||
% (self._session.get_name(), e))
|
% (self._session.get_name(), e))
|
||||||
self.__error()
|
self.__error()
|
||||||
|
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
|
||||||
|
and self._server_schema_request_id is not None
|
||||||
|
and self._server_schema_request_id == msg.id):
|
||||||
|
# Reply to our "get_schema" of _Server request.
|
||||||
|
try:
|
||||||
|
self._server_schema_request_id = None
|
||||||
|
sh = SchemaHelper(None, msg.result)
|
||||||
|
sh.register_table(self._server_db_table)
|
||||||
|
schema = sh.get_idl_schema()
|
||||||
|
self._server_db = schema
|
||||||
|
self.server_tables = schema.tables
|
||||||
|
self.__send_server_monitor_request()
|
||||||
|
except error.Error as e:
|
||||||
|
vlog.err("%s: error receiving server schema: %s"
|
||||||
|
% (self._session.get_name(), e))
|
||||||
|
if self.cluster_id:
|
||||||
|
self.__error()
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
self.change_seqno = initial_change_seqno
|
||||||
|
self.__send_monitor_request()
|
||||||
|
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
|
||||||
|
and self._server_monitor_request_id is not None
|
||||||
|
and self._server_monitor_request_id == msg.id):
|
||||||
|
# Reply to our "monitor" of _Server request.
|
||||||
|
try:
|
||||||
|
self._server_monitor_request_id = None
|
||||||
|
self.__parse_update(msg.result, OVSDB_UPDATE,
|
||||||
|
tables=self.server_tables)
|
||||||
|
self.change_seqno = initial_change_seqno
|
||||||
|
if self.__check_server_db():
|
||||||
|
self.__send_monitor_request()
|
||||||
|
self.__send_db_change_aware()
|
||||||
|
else:
|
||||||
|
self.force_reconnect()
|
||||||
|
break
|
||||||
|
except error.Error as e:
|
||||||
|
vlog.err("%s: parse error in received schema: %s"
|
||||||
|
% (self._session.get_name(), e))
|
||||||
|
if self.cluster_id:
|
||||||
|
self.__error()
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
self.change_seqno = initial_change_seqno
|
||||||
|
self.__send_monitor_request()
|
||||||
|
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
|
||||||
|
and self._db_change_aware_request_id is not None
|
||||||
|
and self._db_change_aware_request_id == msg.id):
|
||||||
|
# Reply to us notifying the server of our change awarness.
|
||||||
|
self._db_change_aware_request_id = None
|
||||||
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
|
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
|
||||||
and self._lock_request_id is not None
|
and self._lock_request_id is not None
|
||||||
and self._lock_request_id == msg.id):
|
and self._lock_request_id == msg.id):
|
||||||
@@ -275,10 +362,20 @@ class Idl(object):
|
|||||||
# Reply to our echo request. Ignore it.
|
# Reply to our echo request. Ignore it.
|
||||||
pass
|
pass
|
||||||
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
|
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
|
||||||
self.state == self.IDL_S_MONITOR_COND_REQUESTED and
|
self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
|
||||||
self._monitor_request_id == msg.id):
|
self._monitor_request_id == msg.id):
|
||||||
if msg.error == "unknown method":
|
if msg.error == "unknown method":
|
||||||
self.__send_monitor_request()
|
self.__send_monitor_request()
|
||||||
|
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
|
||||||
|
self._server_schema_request_id is not None and
|
||||||
|
self._server_schema_request_id == msg.id):
|
||||||
|
self._server_schema_request_id = None
|
||||||
|
if self.cluster_id:
|
||||||
|
self.force_reconnect()
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
self.change_seqno = initial_change_seqno
|
||||||
|
self.__send_monitor_request()
|
||||||
elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
|
elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
|
||||||
ovs.jsonrpc.Message.T_REPLY)
|
ovs.jsonrpc.Message.T_REPLY)
|
||||||
and self.__txn_process_reply(msg)):
|
and self.__txn_process_reply(msg)):
|
||||||
@@ -342,6 +439,9 @@ class Idl(object):
|
|||||||
In the meantime, the contents of the IDL will not change."""
|
In the meantime, the contents of the IDL will not change."""
|
||||||
self._session.force_reconnect()
|
self._session.force_reconnect()
|
||||||
|
|
||||||
|
def session_name(self):
|
||||||
|
return self._session.get_name()
|
||||||
|
|
||||||
def set_lock(self, lock_name):
|
def set_lock(self, lock_name):
|
||||||
"""If 'lock_name' is not None, configures the IDL to obtain the named
|
"""If 'lock_name' is not None, configures the IDL to obtain the named
|
||||||
lock from the database server and to avoid modifying the database when
|
lock from the database server and to avoid modifying the database when
|
||||||
@@ -440,12 +540,19 @@ class Idl(object):
|
|||||||
if not new_has_lock:
|
if not new_has_lock:
|
||||||
self.is_lock_contended = True
|
self.is_lock_contended = True
|
||||||
|
|
||||||
|
def __send_db_change_aware(self):
|
||||||
|
msg = ovs.jsonrpc.Message.create_request("set_db_change_aware",
|
||||||
|
[True])
|
||||||
|
self._db_change_aware_request_id = msg.id
|
||||||
|
self._session.send(msg)
|
||||||
|
|
||||||
def __send_monitor_request(self):
|
def __send_monitor_request(self):
|
||||||
if self.state == self.IDL_S_INITIAL:
|
if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
|
||||||
self.state = self.IDL_S_MONITOR_COND_REQUESTED
|
self.IDL_S_INITIAL]):
|
||||||
|
self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
|
||||||
method = "monitor_cond"
|
method = "monitor_cond"
|
||||||
else:
|
else:
|
||||||
self.state = self.IDL_S_MONITOR_REQUESTED
|
self.state = self.IDL_S_DATA_MONITOR_REQUESTED
|
||||||
method = "monitor"
|
method = "monitor"
|
||||||
|
|
||||||
monitor_requests = {}
|
monitor_requests = {}
|
||||||
@@ -467,20 +574,50 @@ class Idl(object):
|
|||||||
self._monitor_request_id = msg.id
|
self._monitor_request_id = msg.id
|
||||||
self._session.send(msg)
|
self._session.send(msg)
|
||||||
|
|
||||||
def __parse_update(self, update, version):
|
def __send_server_schema_request(self):
|
||||||
|
self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
|
||||||
|
msg = ovs.jsonrpc.Message.create_request(
|
||||||
|
"get_schema", [self._server_db_name, str(self.uuid)])
|
||||||
|
self._server_schema_request_id = msg.id
|
||||||
|
self._session.send(msg)
|
||||||
|
|
||||||
|
def __send_server_monitor_request(self):
|
||||||
|
self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
|
||||||
|
monitor_requests = {}
|
||||||
|
table = self.server_tables[self._server_db_table]
|
||||||
|
columns = [column for column in six.iterkeys(table.columns)]
|
||||||
|
for column in six.itervalues(table.columns):
|
||||||
|
if not hasattr(column, 'alert'):
|
||||||
|
column.alert = True
|
||||||
|
table.rows = custom_index.IndexedRows(table)
|
||||||
|
table.need_table = False
|
||||||
|
table.idl = self
|
||||||
|
monitor_request = {"columns": columns}
|
||||||
|
monitor_requests[table.name] = [monitor_request]
|
||||||
|
msg = ovs.jsonrpc.Message.create_request(
|
||||||
|
'monitor', [self._server_db.name,
|
||||||
|
str(self.server_monitor_uuid),
|
||||||
|
monitor_requests])
|
||||||
|
self._server_monitor_request_id = msg.id
|
||||||
|
self._session.send(msg)
|
||||||
|
|
||||||
|
def __parse_update(self, update, version, tables=None):
|
||||||
try:
|
try:
|
||||||
self.__do_parse_update(update, version)
|
if not tables:
|
||||||
|
self.__do_parse_update(update, version, self.tables)
|
||||||
|
else:
|
||||||
|
self.__do_parse_update(update, version, tables)
|
||||||
except error.Error as e:
|
except error.Error as e:
|
||||||
vlog.err("%s: error parsing update: %s"
|
vlog.err("%s: error parsing update: %s"
|
||||||
% (self._session.get_name(), e))
|
% (self._session.get_name(), e))
|
||||||
|
|
||||||
def __do_parse_update(self, table_updates, version):
|
def __do_parse_update(self, table_updates, version, tables):
|
||||||
if not isinstance(table_updates, dict):
|
if not isinstance(table_updates, dict):
|
||||||
raise error.Error("<table-updates> is not an object",
|
raise error.Error("<table-updates> is not an object",
|
||||||
table_updates)
|
table_updates)
|
||||||
|
|
||||||
for table_name, table_update in six.iteritems(table_updates):
|
for table_name, table_update in six.iteritems(table_updates):
|
||||||
table = self.tables.get(table_name)
|
table = tables.get(table_name)
|
||||||
if not table:
|
if not table:
|
||||||
raise error.Error('<table-updates> includes unknown '
|
raise error.Error('<table-updates> includes unknown '
|
||||||
'table "%s"' % table_name)
|
'table "%s"' % table_name)
|
||||||
@@ -605,6 +742,58 @@ class Idl(object):
|
|||||||
self.notify(op, row, Row.from_json(self, table, uuid, old))
|
self.notify(op, row, Row.from_json(self, table, uuid, old))
|
||||||
return changed
|
return changed
|
||||||
|
|
||||||
|
def __check_server_db(self):
|
||||||
|
"""Returns True if this is a valid server database, False otherwise."""
|
||||||
|
session_name = self.session_name()
|
||||||
|
|
||||||
|
if self._server_db_table not in self.server_tables:
|
||||||
|
vlog.info("%s: server does not have %s table in its %s database"
|
||||||
|
% (session_name, self._server_db_table,
|
||||||
|
self._server_db_name))
|
||||||
|
return False
|
||||||
|
|
||||||
|
rows = self.server_tables[self._server_db_table].rows
|
||||||
|
|
||||||
|
database = None
|
||||||
|
for row in six.itervalues(rows):
|
||||||
|
if self.cluster_id:
|
||||||
|
if self.cluster_id in \
|
||||||
|
map(lambda x: str(x)[:4], row.cid):
|
||||||
|
database = row
|
||||||
|
break
|
||||||
|
elif row.name == self._db.name:
|
||||||
|
database = row
|
||||||
|
break
|
||||||
|
|
||||||
|
if not database:
|
||||||
|
vlog.info("%s: server does not have %s database"
|
||||||
|
% (session_name, self._db.name))
|
||||||
|
return False
|
||||||
|
|
||||||
|
if (database.model == CLUSTERED and
|
||||||
|
self._session.get_num_of_remotes() > 1):
|
||||||
|
if not database.schema:
|
||||||
|
vlog.info('%s: clustered database server has not yet joined '
|
||||||
|
'cluster; trying another server' % session_name)
|
||||||
|
return False
|
||||||
|
if not database.connected:
|
||||||
|
vlog.info('%s: clustered database server is disconnected '
|
||||||
|
'from cluster; trying another server' % session_name)
|
||||||
|
return False
|
||||||
|
if (self.leader_only and
|
||||||
|
not database.leader):
|
||||||
|
vlog.info('%s: clustered database server is not cluster '
|
||||||
|
'leader; trying another server' % session_name)
|
||||||
|
return False
|
||||||
|
if database.index:
|
||||||
|
if database.index[0] < self._min_index:
|
||||||
|
vlog.warn('%s: clustered database server has stale data; '
|
||||||
|
'trying another server' % session_name)
|
||||||
|
return False
|
||||||
|
self._min_index = database.index[0]
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
def __column_name(self, column):
|
def __column_name(self, column):
|
||||||
if column.type.key.type == ovs.db.types.UuidType:
|
if column.type.key.type == ovs.db.types.UuidType:
|
||||||
return ovs.ovsuuid.to_json(column.type.key.type.default)
|
return ovs.ovsuuid.to_json(column.type.key.type.default)
|
||||||
|
@@ -344,6 +344,9 @@ class Reconnect(object):
|
|||||||
else:
|
else:
|
||||||
self.info_level("%s: error listening for connections"
|
self.info_level("%s: error listening for connections"
|
||||||
% self.name)
|
% self.name)
|
||||||
|
elif self.state == Reconnect.Reconnect:
|
||||||
|
self.info_level("%s: connection closed by client"
|
||||||
|
% self.name)
|
||||||
elif self.backoff < self.max_backoff:
|
elif self.backoff < self.max_backoff:
|
||||||
if self.passive:
|
if self.passive:
|
||||||
type_ = "listen"
|
type_ = "listen"
|
||||||
|
@@ -11,7 +11,42 @@ ovsdb_start_idltest () {
|
|||||||
ovsdb-server -vconsole:warn --log-file --detach --no-chdir --pidfile --remote=punix:socket ${1:+--remote=$1} db || return $?
|
ovsdb-server -vconsole:warn --log-file --detach --no-chdir --pidfile --remote=punix:socket ${1:+--remote=$1} db || return $?
|
||||||
on_exit 'kill `cat ovsdb-server.pid`'
|
on_exit 'kill `cat ovsdb-server.pid`'
|
||||||
}
|
}
|
||||||
])
|
|
||||||
|
# ovsdb_cluster_start_idltest [REMOTE] [SCHEMA]
|
||||||
|
#
|
||||||
|
# Creates a database using SCHEMA (default: idltest.ovsschema) and
|
||||||
|
# starts a database cluster listening on punix:socket and REMOTE (if
|
||||||
|
# specified).
|
||||||
|
ovsdb_cluster_start_idltest () {
|
||||||
|
local n=$1
|
||||||
|
ovsdb-tool create-cluster s1.db $abs_srcdir/idltest.ovsschema unix:s1.raft || return $?
|
||||||
|
cid=`ovsdb-tool db-cid s1.db`
|
||||||
|
schema_name=`ovsdb-tool schema-name $abs_srcdir/idltest.ovsschema`
|
||||||
|
for i in `seq 2 $n`; do
|
||||||
|
ovsdb-tool join-cluster s$i.db $schema_name unix:s$i.raft unix:s1.raft || return $?
|
||||||
|
done
|
||||||
|
for i in `seq $n`; do
|
||||||
|
ovsdb-server -vraft -vconsole:warn --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb ${2:+--remote=$2} s$i.db || return $?
|
||||||
|
done
|
||||||
|
on_exit 'kill `cat s*.pid`'
|
||||||
|
}
|
||||||
|
|
||||||
|
# ovsdb_cluster_leader [REMOTES] [DATABASE]
|
||||||
|
#
|
||||||
|
# Returns the leader of the DATABASE cluster.
|
||||||
|
ovsdb_cluster_leader () {
|
||||||
|
remotes=$(echo $1 | tr "," "\n")
|
||||||
|
for remote in $remotes; do
|
||||||
|
ovsdb-client dump $remote _Server Database name leader | grep $2 | grep -q true
|
||||||
|
if [[ $? == 0 ]]; then
|
||||||
|
port=$(echo $remote | cut -d':' -f 3)
|
||||||
|
log=$(grep --include=s\*.log -rlnw -e "listening on port $port" ./)
|
||||||
|
pid=$(echo $log | sed 's/\(.*\.\)log/\1pid/' )
|
||||||
|
echo "${remote}|${pid}"
|
||||||
|
return
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
}])
|
||||||
|
|
||||||
# OVSDB_CHECK_IDL_C(TITLE, [PRE-IDL-TXN], TRANSACTIONS, OUTPUT, [KEYWORDS],
|
# OVSDB_CHECK_IDL_C(TITLE, [PRE-IDL-TXN], TRANSACTIONS, OUTPUT, [KEYWORDS],
|
||||||
# [FILTER])
|
# [FILTER])
|
||||||
@@ -1466,40 +1501,44 @@ OVSDB_CHECK_IDL_NOTIFY([simple idl verify notify],
|
|||||||
"where": [["i", "==", 0]]}]' \
|
"where": [["i", "==", 0]]}]' \
|
||||||
'reconnect']],
|
'reconnect']],
|
||||||
[[000: empty
|
[[000: empty
|
||||||
001: {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]}
|
000: event:create, row={uuid=<0>}, updates=None
|
||||||
002: event:create, row={i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates=None
|
000: event:create, row={uuid=<1>}, updates=None
|
||||||
002: event:create, row={i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None
|
001: {"error":null,"result":[{"uuid":["uuid","<2>"]},{"uuid":["uuid","<3>"]}]}
|
||||||
002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
|
002: event:create, row={i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates=None
|
||||||
002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
|
002: event:create, row={i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None
|
||||||
|
002: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
|
||||||
|
002: i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
|
||||||
003: {"error":null,"result":[{"count":2}]}
|
003: {"error":null,"result":[{"count":2}]}
|
||||||
004: event:update, row={i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={b=true uuid=<0>}
|
004: event:update, row={i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={b=true uuid=<2>}
|
||||||
004: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
|
004: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
|
||||||
004: i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
|
004: i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
|
||||||
005: {"error":null,"result":[{"count":2}]}
|
005: {"error":null,"result":[{"count":2}]}
|
||||||
006: event:update, row={i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates={r=0 uuid=<1>}
|
006: event:update, row={i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates={r=0 uuid=<3>}
|
||||||
006: event:update, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={r=2 uuid=<0>}
|
006: event:update, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={r=2 uuid=<2>}
|
||||||
006: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
|
006: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
|
||||||
006: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
|
006: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
|
||||||
007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]}
|
007: {"error":null,"result":[{"uuid":["uuid","<8>"]}]}
|
||||||
008: event:create, row={i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None
|
008: event:create, row={i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None
|
||||||
008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
|
008: i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
|
||||||
008: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
|
008: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
|
||||||
008: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
|
008: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
|
||||||
009: {"error":null,"result":[{"count":2}]}
|
009: {"error":null,"result":[{"count":2}]}
|
||||||
010: event:update, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates={s= uuid=<6>}
|
010: event:update, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates={s= uuid=<8>}
|
||||||
010: event:update, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates={s= uuid=<1>}
|
010: event:update, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates={s= uuid=<3>}
|
||||||
010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
|
010: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
|
||||||
010: i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
|
010: i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
|
||||||
010: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
|
010: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
|
||||||
011: {"error":null,"result":[{"count":1}]}
|
011: {"error":null,"result":[{"count":1}]}
|
||||||
012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates=None
|
012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates=None
|
||||||
012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
|
012: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
|
||||||
012: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
|
012: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
|
||||||
013: reconnect
|
013: reconnect
|
||||||
014: event:create, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None
|
014: event:create, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None
|
||||||
014: event:create, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None
|
014: event:create, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None
|
||||||
014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
|
014: event:create, row={uuid=<0>}, updates=None
|
||||||
014: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
|
014: event:create, row={uuid=<1>}, updates=None
|
||||||
|
014: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
|
||||||
|
014: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
|
||||||
015: done
|
015: done
|
||||||
]])
|
]])
|
||||||
|
|
||||||
@@ -1853,3 +1892,33 @@ m4_define([CHECK_STREAM_OPEN_BLOCK_PY],
|
|||||||
|
|
||||||
CHECK_STREAM_OPEN_BLOCK_PY([Check PY2 Stream open block], [$HAVE_PYTHON2], [$PYTHON2])
|
CHECK_STREAM_OPEN_BLOCK_PY([Check PY2 Stream open block], [$HAVE_PYTHON2], [$PYTHON2])
|
||||||
CHECK_STREAM_OPEN_BLOCK_PY([Check PY3 Stream open block], [$HAVE_PYTHON3], [$PYTHON3])
|
CHECK_STREAM_OPEN_BLOCK_PY([Check PY3 Stream open block], [$HAVE_PYTHON3], [$PYTHON3])
|
||||||
|
|
||||||
|
# same as OVSDB_CHECK_IDL but uses Python IDL implementation with tcp
|
||||||
|
# with multiple remotes to assert the idl connects to the leader of the Raft cluster
|
||||||
|
m4_define([OVSDB_CHECK_IDL_LEADER_ONLY_PYN],
|
||||||
|
[AT_SETUP([$1])
|
||||||
|
AT_SKIP_IF([test $7 = no])
|
||||||
|
AT_KEYWORDS([ovsdb server idl Python leader_only with tcp socket])
|
||||||
|
m4_define([LPBK],[127.0.0.1])
|
||||||
|
AT_CHECK([ovsdb_cluster_start_idltest $2 "ptcp:0:"LPBK])
|
||||||
|
PARSE_LISTENING_PORT([s2.log], [TCP_PORT_1])
|
||||||
|
PARSE_LISTENING_PORT([s3.log], [TCP_PORT_2])
|
||||||
|
PARSE_LISTENING_PORT([s1.log], [TCP_PORT_3])
|
||||||
|
remotes=tcp:LPBK:$TCP_PORT_1,tcp:LPBK:$TCP_PORT_2,tcp:LPBK:$TCP_PORT_3
|
||||||
|
pids=$(cat s2.pid s3.pid s1.pid | tr '\n' ',')
|
||||||
|
echo $pids
|
||||||
|
AT_CHECK([$8 $srcdir/test-ovsdb.py -t30 idl-cluster $srcdir/idltest.ovsschema $remotes $pids $3],
|
||||||
|
[0], [stdout], [ignore])
|
||||||
|
remote=$(ovsdb_cluster_leader $remotes "idltest")
|
||||||
|
leader=$(echo $remote | cut -d'|' -f 1)
|
||||||
|
AT_CHECK([grep -F -- "${leader}" stdout], [0], [ignore])
|
||||||
|
AT_CLEANUP])
|
||||||
|
|
||||||
|
m4_define([OVSDB_CHECK_IDL_LEADER_ONLY_PY],
|
||||||
|
[OVSDB_CHECK_IDL_LEADER_ONLY_PYN([$1 - Python2 (leader only)], [$2], [$3], [$4], [$5], [$6],
|
||||||
|
[$HAVE_PYTHON], [$PYTHON])
|
||||||
|
OVSDB_CHECK_IDL_LEADER_ONLY_PYN([$1 - Python3 (leader only)], [$2], [$3], [$4], [$5], [$6],
|
||||||
|
[$HAVE_PYTHON3], [$PYTHON3])])
|
||||||
|
|
||||||
|
OVSDB_CHECK_IDL_LEADER_ONLY_PY([Check Python IDL connects to leader], 3, ['remote'])
|
||||||
|
OVSDB_CHECK_IDL_LEADER_ONLY_PY([Check Python IDL reconnects to leader], 3, ['remote' '+remotestop' 'remote'])
|
||||||
|
@@ -758,6 +758,70 @@ def do_idl_passive(schema_file, remote, *commands):
|
|||||||
print("%03d: done" % step)
|
print("%03d: done" % step)
|
||||||
|
|
||||||
|
|
||||||
|
def do_idl_cluster(schema_file, remote, pid, *commands):
|
||||||
|
schema_helper = ovs.db.idl.SchemaHelper(schema_file)
|
||||||
|
|
||||||
|
if remote.startswith("ssl:"):
|
||||||
|
if len(commands) < 3:
|
||||||
|
sys.stderr.write("SSL connection requires private key, "
|
||||||
|
"certificate for private key, and peer CA "
|
||||||
|
"certificate as arguments\n")
|
||||||
|
sys.exit(1)
|
||||||
|
ovs.stream.Stream.ssl_set_private_key_file(commands[0])
|
||||||
|
ovs.stream.Stream.ssl_set_certificate_file(commands[1])
|
||||||
|
ovs.stream.Stream.ssl_set_ca_cert_file(commands[2])
|
||||||
|
commands = commands[3:]
|
||||||
|
|
||||||
|
schema_helper.register_all()
|
||||||
|
idl = ovs.db.idl.Idl(remote, schema_helper)
|
||||||
|
|
||||||
|
step = 0
|
||||||
|
seqno = 0
|
||||||
|
commands = list(commands)
|
||||||
|
for command in commands:
|
||||||
|
if command.startswith("+"):
|
||||||
|
# The previous transaction didn't change anything.
|
||||||
|
command = command[1:]
|
||||||
|
else:
|
||||||
|
# Wait for update.
|
||||||
|
while idl.change_seqno == seqno and not idl.run():
|
||||||
|
poller = ovs.poller.Poller()
|
||||||
|
idl.wait(poller)
|
||||||
|
poller.block()
|
||||||
|
step += 1
|
||||||
|
|
||||||
|
seqno = idl.change_seqno
|
||||||
|
|
||||||
|
if command == "reconnect":
|
||||||
|
print("%03d: reconnect" % step)
|
||||||
|
sys.stdout.flush()
|
||||||
|
step += 1
|
||||||
|
idl.force_reconnect()
|
||||||
|
elif command == "remote":
|
||||||
|
print("%03d: %s" % (step, idl.session_name()))
|
||||||
|
sys.stdout.flush()
|
||||||
|
step += 1
|
||||||
|
elif command == "remotestop":
|
||||||
|
r = idl.session_name()
|
||||||
|
remotes = remote.split(',')
|
||||||
|
i = remotes.index(r)
|
||||||
|
pids = pid.split(',')
|
||||||
|
command = None
|
||||||
|
try:
|
||||||
|
command = "kill %s" % pids[i]
|
||||||
|
except ValueError as error:
|
||||||
|
sys.stderr.write("Cannot find pid of remote: %s\n"
|
||||||
|
% os.strerror(error))
|
||||||
|
sys.exit(1)
|
||||||
|
os.popen(command)
|
||||||
|
print("%03d: stop %s" % (step, pids[i]))
|
||||||
|
sys.stdout.flush()
|
||||||
|
step += 1
|
||||||
|
|
||||||
|
idl.close()
|
||||||
|
print("%03d: done" % step)
|
||||||
|
|
||||||
|
|
||||||
def usage():
|
def usage():
|
||||||
print("""\
|
print("""\
|
||||||
%(program_name)s: test utility for Open vSwitch database Python bindings
|
%(program_name)s: test utility for Open vSwitch database Python bindings
|
||||||
@@ -861,7 +925,8 @@ def main(argv):
|
|||||||
"parse-table": (do_parse_table, (2, 3)),
|
"parse-table": (do_parse_table, (2, 3)),
|
||||||
"parse-schema": (do_parse_schema, 1),
|
"parse-schema": (do_parse_schema, 1),
|
||||||
"idl": (do_idl, (2,)),
|
"idl": (do_idl, (2,)),
|
||||||
"idl_passive": (do_idl_passive, (2,))}
|
"idl_passive": (do_idl_passive, (2,)),
|
||||||
|
"idl-cluster": (do_idl_cluster, (3,))}
|
||||||
|
|
||||||
command_name = args[0]
|
command_name = args[0]
|
||||||
args = args[1:]
|
args = args[1:]
|
||||||
|
Reference in New Issue
Block a user