2
0
mirror of https://github.com/openvswitch/ovs synced 2025-09-05 08:45:23 +00:00

python: Monitor Database table to manage lifecycle of IDL client.

The Python IDL implementation supports ovsdb cluster connections.
This patch is a follow up to commit 31e434fc98, it adds the option of
connecting to the leader (the default) in the Raft-based cluster. It mimics
the exisiting C IDL support for clusters introduced in commit 1b1d2e6daa.

The _Server database schema is first requested, then a monitor of the
Database table in the _Server Database. Method __check_server_db verifies
the eligibility of the server. If the attempt to obtain a monitor of the
_Server database fails and a cluster id was not provided this implementation
proceeds to request the data monitor. If a cluster id was provided via the
set_cluster_id method then the connection is aborted and a connection to a
different node is instead attempted, until a valid cluster node is found.
Thus, when supplied, cluster id is interpreted as the intention to only
allow connections to a clustered database. If not supplied, connections to
standalone nodes, or nodes that do not have the _Server database are
allowed. change_seqno is not incremented in the case of Database table
updates.

Acked-by: Numan Siddique <nusiddiq@redhat.com>
Signed-off-by: Ted Elhourani <ted.elhourani@nutanix.com>
Signed-off-by: Ben Pfaff <blp@ovn.org>
This commit is contained in:
Ted Elhourani
2019-01-25 19:10:01 +00:00
committed by Ben Pfaff
parent 2a6d9168d6
commit c39751e445
4 changed files with 372 additions and 46 deletions

View File

@@ -38,6 +38,8 @@ ROW_DELETE = "delete"
OVSDB_UPDATE = 0
OVSDB_UPDATE2 = 1
CLUSTERED = "clustered"
class Idl(object):
"""Open vSwitch Database Interface Definition Language (OVSDB IDL).
@@ -92,10 +94,13 @@ class Idl(object):
"""
IDL_S_INITIAL = 0
IDL_S_MONITOR_REQUESTED = 1
IDL_S_MONITOR_COND_REQUESTED = 2
IDL_S_SERVER_SCHEMA_REQUESTED = 1
IDL_S_SERVER_MONITOR_REQUESTED = 2
IDL_S_DATA_MONITOR_REQUESTED = 3
IDL_S_DATA_MONITOR_COND_REQUESTED = 4
def __init__(self, remote, schema_helper, probe_interval=None):
def __init__(self, remote, schema_helper, probe_interval=None,
leader_only=True):
"""Creates and returns a connection to the database named 'db_name' on
'remote', which should be in a form acceptable to
ovs.jsonrpc.session.open(). The connection will maintain an in-memory
@@ -119,6 +124,9 @@ class Idl(object):
The IDL uses and modifies 'schema' directly.
If 'leader_only' is set to True (default value) the IDL will only
monitor and transact with the leader of the cluster.
If "probe_interval" is zero it disables the connection keepalive
feature. If non-zero the value will be forced to at least 1000
milliseconds. If None it will just use the default value in OVS.
@@ -137,6 +145,20 @@ class Idl(object):
self._last_seqno = None
self.change_seqno = 0
self.uuid = uuid.uuid1()
# Server monitor.
self._server_schema_request_id = None
self._server_monitor_request_id = None
self._db_change_aware_request_id = None
self._server_db_name = '_Server'
self._server_db_table = 'Database'
self.server_tables = None
self._server_db = None
self.server_monitor_uuid = uuid.uuid1()
self.leader_only = leader_only
self.cluster_id = None
self._min_index = 0
self.state = self.IDL_S_INITIAL
# Database locking.
@@ -172,6 +194,12 @@ class Idl(object):
remotes.append(r)
return remotes
def set_cluster_id(self, cluster_id):
"""Set the id of the cluster that this idl must connect to."""
self.cluster_id = cluster_id
if self.state != self.IDL_S_INITIAL:
self.force_reconnect()
def index_create(self, table, name):
"""Create a named multi-column index on a table"""
return self.tables[table].rows.index_create(name)
@@ -222,7 +250,7 @@ class Idl(object):
if seqno != self._last_seqno:
self._last_seqno = seqno
self.__txn_abort_all()
self.__send_monitor_request()
self.__send_server_schema_request()
if self.lock_name:
self.__send_lock_request()
break
@@ -230,6 +258,7 @@ class Idl(object):
msg = self._session.recv()
if msg is None:
break
if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
and msg.method == "update2"
and len(msg.params) == 2):
@@ -239,7 +268,15 @@ class Idl(object):
and msg.method == "update"
and len(msg.params) == 2):
# Database contents changed.
self.__parse_update(msg.params[1], OVSDB_UPDATE)
if msg.params[0] == str(self.server_monitor_uuid):
self.__parse_update(msg.params[1], OVSDB_UPDATE,
tables=self.server_tables)
self.change_seqno = initial_change_seqno
if not self.__check_server_db():
self.force_reconnect()
break
else:
self.__parse_update(msg.params[1], OVSDB_UPDATE)
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._monitor_request_id is not None
and self._monitor_request_id == msg.id):
@@ -248,16 +285,66 @@ class Idl(object):
self.change_seqno += 1
self._monitor_request_id = None
self.__clear()
if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
self.__parse_update(msg.result, OVSDB_UPDATE2)
else:
assert self.state == self.IDL_S_MONITOR_REQUESTED
assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
self.__parse_update(msg.result, OVSDB_UPDATE)
except error.Error as e:
vlog.err("%s: parse error in received schema: %s"
% (self._session.get_name(), e))
self.__error()
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._server_schema_request_id is not None
and self._server_schema_request_id == msg.id):
# Reply to our "get_schema" of _Server request.
try:
self._server_schema_request_id = None
sh = SchemaHelper(None, msg.result)
sh.register_table(self._server_db_table)
schema = sh.get_idl_schema()
self._server_db = schema
self.server_tables = schema.tables
self.__send_server_monitor_request()
except error.Error as e:
vlog.err("%s: error receiving server schema: %s"
% (self._session.get_name(), e))
if self.cluster_id:
self.__error()
break
else:
self.change_seqno = initial_change_seqno
self.__send_monitor_request()
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._server_monitor_request_id is not None
and self._server_monitor_request_id == msg.id):
# Reply to our "monitor" of _Server request.
try:
self._server_monitor_request_id = None
self.__parse_update(msg.result, OVSDB_UPDATE,
tables=self.server_tables)
self.change_seqno = initial_change_seqno
if self.__check_server_db():
self.__send_monitor_request()
self.__send_db_change_aware()
else:
self.force_reconnect()
break
except error.Error as e:
vlog.err("%s: parse error in received schema: %s"
% (self._session.get_name(), e))
if self.cluster_id:
self.__error()
break
else:
self.change_seqno = initial_change_seqno
self.__send_monitor_request()
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._db_change_aware_request_id is not None
and self._db_change_aware_request_id == msg.id):
# Reply to us notifying the server of our change awarness.
self._db_change_aware_request_id = None
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._lock_request_id is not None
and self._lock_request_id == msg.id):
@@ -275,10 +362,20 @@ class Idl(object):
# Reply to our echo request. Ignore it.
pass
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
self.state == self.IDL_S_MONITOR_COND_REQUESTED and
self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
self._monitor_request_id == msg.id):
if msg.error == "unknown method":
self.__send_monitor_request()
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
self._server_schema_request_id is not None and
self._server_schema_request_id == msg.id):
self._server_schema_request_id = None
if self.cluster_id:
self.force_reconnect()
break
else:
self.change_seqno = initial_change_seqno
self.__send_monitor_request()
elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
ovs.jsonrpc.Message.T_REPLY)
and self.__txn_process_reply(msg)):
@@ -342,6 +439,9 @@ class Idl(object):
In the meantime, the contents of the IDL will not change."""
self._session.force_reconnect()
def session_name(self):
return self._session.get_name()
def set_lock(self, lock_name):
"""If 'lock_name' is not None, configures the IDL to obtain the named
lock from the database server and to avoid modifying the database when
@@ -440,12 +540,19 @@ class Idl(object):
if not new_has_lock:
self.is_lock_contended = True
def __send_db_change_aware(self):
msg = ovs.jsonrpc.Message.create_request("set_db_change_aware",
[True])
self._db_change_aware_request_id = msg.id
self._session.send(msg)
def __send_monitor_request(self):
if self.state == self.IDL_S_INITIAL:
self.state = self.IDL_S_MONITOR_COND_REQUESTED
if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
self.IDL_S_INITIAL]):
self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
method = "monitor_cond"
else:
self.state = self.IDL_S_MONITOR_REQUESTED
self.state = self.IDL_S_DATA_MONITOR_REQUESTED
method = "monitor"
monitor_requests = {}
@@ -467,20 +574,50 @@ class Idl(object):
self._monitor_request_id = msg.id
self._session.send(msg)
def __parse_update(self, update, version):
def __send_server_schema_request(self):
self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
msg = ovs.jsonrpc.Message.create_request(
"get_schema", [self._server_db_name, str(self.uuid)])
self._server_schema_request_id = msg.id
self._session.send(msg)
def __send_server_monitor_request(self):
self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
monitor_requests = {}
table = self.server_tables[self._server_db_table]
columns = [column for column in six.iterkeys(table.columns)]
for column in six.itervalues(table.columns):
if not hasattr(column, 'alert'):
column.alert = True
table.rows = custom_index.IndexedRows(table)
table.need_table = False
table.idl = self
monitor_request = {"columns": columns}
monitor_requests[table.name] = [monitor_request]
msg = ovs.jsonrpc.Message.create_request(
'monitor', [self._server_db.name,
str(self.server_monitor_uuid),
monitor_requests])
self._server_monitor_request_id = msg.id
self._session.send(msg)
def __parse_update(self, update, version, tables=None):
try:
self.__do_parse_update(update, version)
if not tables:
self.__do_parse_update(update, version, self.tables)
else:
self.__do_parse_update(update, version, tables)
except error.Error as e:
vlog.err("%s: error parsing update: %s"
% (self._session.get_name(), e))
def __do_parse_update(self, table_updates, version):
def __do_parse_update(self, table_updates, version, tables):
if not isinstance(table_updates, dict):
raise error.Error("<table-updates> is not an object",
table_updates)
for table_name, table_update in six.iteritems(table_updates):
table = self.tables.get(table_name)
table = tables.get(table_name)
if not table:
raise error.Error('<table-updates> includes unknown '
'table "%s"' % table_name)
@@ -605,6 +742,58 @@ class Idl(object):
self.notify(op, row, Row.from_json(self, table, uuid, old))
return changed
def __check_server_db(self):
"""Returns True if this is a valid server database, False otherwise."""
session_name = self.session_name()
if self._server_db_table not in self.server_tables:
vlog.info("%s: server does not have %s table in its %s database"
% (session_name, self._server_db_table,
self._server_db_name))
return False
rows = self.server_tables[self._server_db_table].rows
database = None
for row in six.itervalues(rows):
if self.cluster_id:
if self.cluster_id in \
map(lambda x: str(x)[:4], row.cid):
database = row
break
elif row.name == self._db.name:
database = row
break
if not database:
vlog.info("%s: server does not have %s database"
% (session_name, self._db.name))
return False
if (database.model == CLUSTERED and
self._session.get_num_of_remotes() > 1):
if not database.schema:
vlog.info('%s: clustered database server has not yet joined '
'cluster; trying another server' % session_name)
return False
if not database.connected:
vlog.info('%s: clustered database server is disconnected '
'from cluster; trying another server' % session_name)
return False
if (self.leader_only and
not database.leader):
vlog.info('%s: clustered database server is not cluster '
'leader; trying another server' % session_name)
return False
if database.index:
if database.index[0] < self._min_index:
vlog.warn('%s: clustered database server has stale data; '
'trying another server' % session_name)
return False
self._min_index = database.index[0]
return True
def __column_name(self, column):
if column.type.key.type == ovs.db.types.UuidType:
return ovs.ovsuuid.to_json(column.type.key.type.default)

View File

@@ -344,6 +344,9 @@ class Reconnect(object):
else:
self.info_level("%s: error listening for connections"
% self.name)
elif self.state == Reconnect.Reconnect:
self.info_level("%s: connection closed by client"
% self.name)
elif self.backoff < self.max_backoff:
if self.passive:
type_ = "listen"