2
0
mirror of https://github.com/openvswitch/ovs synced 2025-09-01 06:45:17 +00:00

python jsonrpc: Allow jsonrpc_session to have more than one remote.

Python IDL implementation doesn't have the support to connect to the
cluster dbs. This patch adds this support. We are still missing the
support in python idl class to connect to the cluster master. That
support will be added in an upcoming patch.

This patch is similar to the commit 8cf6bbb184 which added multiple remote
support in the C jsonrpc implementation.

Acked-by: Mark Michelson <mmichels@redhat.com>
Signed-off-by: Numan Siddique <nusiddiq@redhat.com>
Signed-off-by: Ben Pfaff <blp@ovn.org>
This commit is contained in:
Numan Siddique
2018-08-07 17:08:11 +05:30
committed by Ben Pfaff
parent c1aa16d191
commit 31e434fc98
4 changed files with 114 additions and 13 deletions

View File

@@ -101,6 +101,9 @@ class Idl(object):
ovs.jsonrpc.session.open(). The connection will maintain an in-memory ovs.jsonrpc.session.open(). The connection will maintain an in-memory
replica of the remote database. replica of the remote database.
'remote' can be comma separated multiple remotes and each remote
should be in a form acceptable to ovs.jsonrpc.session.open().
'schema_helper' should be an instance of the SchemaHelper class which 'schema_helper' should be an instance of the SchemaHelper class which
generates schema for the remote database. The caller may have cut it generates schema for the remote database. The caller may have cut it
down by removing tables or columns that are not of interest. The IDL down by removing tables or columns that are not of interest. The IDL
@@ -127,7 +130,8 @@ class Idl(object):
self.tables = schema.tables self.tables = schema.tables
self.readonly = schema.readonly self.readonly = schema.readonly
self._db = schema self._db = schema
self._session = ovs.jsonrpc.Session.open(remote, remotes = self._parse_remotes(remote)
self._session = ovs.jsonrpc.Session.open_multiple(remotes,
probe_interval=probe_interval) probe_interval=probe_interval)
self._monitor_request_id = None self._monitor_request_id = None
self._last_seqno = None self._last_seqno = None
@@ -155,6 +159,19 @@ class Idl(object):
table.condition = [True] table.condition = [True]
table.cond_changed = False table.cond_changed = False
def _parse_remotes(self, remote):
# If remote is -
# "tcp:10.0.0.1:6641,unix:/tmp/db.sock,t,s,tcp:10.0.0.2:6642"
# this function returns
# ["tcp:10.0.0.1:6641", "unix:/tmp/db.sock,t,s", tcp:10.0.0.2:6642"]
remotes = []
for r in remote.split(','):
if remotes and r.find(":") == -1:
remotes[-1] += "," + r
else:
remotes.append(r)
return remotes
def index_create(self, table, name): def index_create(self, table, name):
"""Create a named multi-column index on a table""" """Create a named multi-column index on a table"""
return self.tables[table].rows.index_create(name) return self.tables[table].rows.index_create(name)

View File

@@ -14,6 +14,7 @@
import codecs import codecs
import errno import errno
import os import os
import random
import sys import sys
import ovs.json import ovs.json
@@ -368,12 +369,17 @@ class Connection(object):
class Session(object): class Session(object):
"""A JSON-RPC session with reconnection.""" """A JSON-RPC session with reconnection."""
def __init__(self, reconnect, rpc): def __init__(self, reconnect, rpc, remotes):
self.reconnect = reconnect self.reconnect = reconnect
self.rpc = rpc self.rpc = rpc
self.stream = None self.stream = None
self.pstream = None self.pstream = None
self.seqno = 0 self.seqno = 0
if type(remotes) != list:
remotes = [remotes]
self.remotes = remotes
random.shuffle(self.remotes)
self.next_remote = 0
@staticmethod @staticmethod
def open(name, probe_interval=None): def open(name, probe_interval=None):
@@ -393,28 +399,38 @@ class Session(object):
feature. If non-zero the value will be forced to at least 1000 feature. If non-zero the value will be forced to at least 1000
milliseconds. If None it will just use the default value in OVS. milliseconds. If None it will just use the default value in OVS.
""" """
reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec()) return Session.open_multiple([name], probe_interval=probe_interval)
reconnect.set_name(name)
reconnect.enable(ovs.timeval.msec())
if ovs.stream.PassiveStream.is_valid_name(name): @staticmethod
def open_multiple(remotes, probe_interval=None):
reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
session = Session(reconnect, None, remotes)
session.pick_remote()
reconnect.enable(ovs.timeval.msec())
reconnect.set_backoff_free_tries(len(remotes))
if ovs.stream.PassiveStream.is_valid_name(reconnect.get_name()):
reconnect.set_passive(True, ovs.timeval.msec()) reconnect.set_passive(True, ovs.timeval.msec())
if not ovs.stream.stream_or_pstream_needs_probes(name): if not ovs.stream.stream_or_pstream_needs_probes(reconnect.get_name()):
reconnect.set_probe_interval(0) reconnect.set_probe_interval(0)
elif probe_interval is not None: elif probe_interval is not None:
reconnect.set_probe_interval(probe_interval) reconnect.set_probe_interval(probe_interval)
return Session(reconnect, None) return session
@staticmethod @staticmethod
def open_unreliably(jsonrpc): def open_unreliably(jsonrpc):
reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec()) reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
session = Session(reconnect, None, [jsonrpc.name])
reconnect.set_quiet(True) reconnect.set_quiet(True)
reconnect.set_name(jsonrpc.name) session.pick_remote()
reconnect.set_max_tries(0) reconnect.set_max_tries(0)
reconnect.connected(ovs.timeval.msec()) reconnect.connected(ovs.timeval.msec())
return Session(reconnect, jsonrpc) return session
def pick_remote(self):
self.reconnect.set_name(self.remotes[self.next_remote])
self.next_remote = (self.next_remote + 1) % len(self.remotes)
def close(self): def close(self):
if self.rpc is not None: if self.rpc is not None:
@@ -448,6 +464,8 @@ class Session(object):
self.reconnect.connecting(ovs.timeval.msec()) self.reconnect.connecting(ovs.timeval.msec())
else: else:
self.reconnect.connect_failed(ovs.timeval.msec(), error) self.reconnect.connect_failed(ovs.timeval.msec(), error)
self.stream = None
self.pick_remote()
elif self.pstream is None: elif self.pstream is None:
error, self.pstream = ovs.stream.PassiveStream.open(name) error, self.pstream = ovs.stream.PassiveStream.open(name)
if not error: if not error:
@@ -490,6 +508,7 @@ class Session(object):
if error != 0: if error != 0:
self.reconnect.disconnected(ovs.timeval.msec(), error) self.reconnect.disconnected(ovs.timeval.msec(), error)
self.__disconnect() self.__disconnect()
self.pick_remote()
elif self.stream is not None: elif self.stream is not None:
self.stream.run() self.stream.run()
error = self.stream.connect() error = self.stream.connect()
@@ -499,6 +518,7 @@ class Session(object):
self.stream = None self.stream = None
elif error != errno.EAGAIN: elif error != errno.EAGAIN:
self.reconnect.connect_failed(ovs.timeval.msec(), error) self.reconnect.connect_failed(ovs.timeval.msec(), error)
self.pick_remote()
self.stream.close() self.stream.close()
self.stream = None self.stream = None
@@ -583,3 +603,6 @@ class Session(object):
def force_reconnect(self): def force_reconnect(self):
self.reconnect.force_reconnect(ovs.timeval.msec()) self.reconnect.force_reconnect(ovs.timeval.msec())
def get_num_of_remotes(self):
return len(self.remotes)

View File

@@ -106,6 +106,32 @@ m4_define([OVSDB_CHECK_IDL_TCP_PY],
OVSDB_CHECK_IDL_TCP_PYN([$1 - Python3], [$2], [$3], [$4], [$5], [$6], OVSDB_CHECK_IDL_TCP_PYN([$1 - Python3], [$2], [$3], [$4], [$5], [$6],
[$HAVE_PYTHON3], [$PYTHON3])]) [$HAVE_PYTHON3], [$PYTHON3])])
# same as OVSDB_CHECK_IDL but uses the Python IDL implementation with tcp
# with multiple remotes with only one remote reachable
m4_define([OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PYN],
[AT_SETUP([$1 - tcp])
AT_SKIP_IF([test $7 = no])
AT_KEYWORDS([ovsdb server idl positive Python with tcp socket $5])
AT_CHECK([ovsdb_start_idltest "ptcp:0:127.0.0.1"])
PARSE_LISTENING_PORT([ovsdb-server.log], [TCP_PORT])
WRONG_PORT_1=$((TCP_PORT + 1))
WRONG_PORT_2=$((TCP_PORT + 2))
remote=tcp:127.0.0.1:$WRONG_PORT_1,tcp:127.0.0.1:$TCP_PORT,tcp:127.0.0.1:$WRONG_PORT_2
m4_if([$2], [], [],
[AT_CHECK([ovsdb-client transact tcp:127.0.0.1:$TCP_PORT $2], [0], [ignore], [ignore])])
AT_CHECK([$8 $srcdir/test-ovsdb.py -t10 idl $srcdir/idltest.ovsschema $remote $3],
[0], [stdout], [ignore])
AT_CHECK([sort stdout | uuidfilt]m4_if([$6],,, [[| $6]]),
[0], [$4])
OVSDB_SERVER_SHUTDOWN
AT_CLEANUP])
m4_define([OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PY],
[OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PYN([$1 - Python2 (multiple remotes)], [$2], [$3], [$4], [$5], [$6],
[$HAVE_PYTHON], [$PYTHON])
OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PYN([$1 - Python3 (multiple remotes)], [$2], [$3], [$4], [$5], [$6],
[$HAVE_PYTHON3], [$PYTHON3])])
# same as OVSDB_CHECK_IDL but uses the Python IDL implementation with tcp6 # same as OVSDB_CHECK_IDL but uses the Python IDL implementation with tcp6
m4_define([OVSDB_CHECK_IDL_TCP6_PYN], m4_define([OVSDB_CHECK_IDL_TCP6_PYN],
[AT_SETUP([$1 - tcp6]) [AT_SETUP([$1 - tcp6])
@@ -132,6 +158,32 @@ m4_define([OVSDB_CHECK_IDL_TCP6_PY],
OVSDB_CHECK_IDL_TCP6_PYN([$1 - Python3], [$2], [$3], [$4], [$5], [$6], OVSDB_CHECK_IDL_TCP6_PYN([$1 - Python3], [$2], [$3], [$4], [$5], [$6],
[$HAVE_PYTHON3], [$PYTHON3])]) [$HAVE_PYTHON3], [$PYTHON3])])
m4_define([OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PYN],
[AT_SETUP([$1 - tcp6])
AT_SKIP_IF([test $7 = no])
AT_SKIP_IF([test "$IS_WIN32" = "yes"])
AT_SKIP_IF([test $HAVE_IPV6 = no])
AT_KEYWORDS([ovsdb server idl positive Python with tcp6 socket $5])
AT_CHECK([ovsdb_start_idltest "ptcp:0:[[::1]]"])
PARSE_LISTENING_PORT([ovsdb-server.log], [TCP_PORT])
WRONG_PORT_1=$((TCP_PORT + 1))
WRONG_PORT_2=$((TCP_PORT + 2))
remote="tcp:[[::1]]:$WRONG_PORT_1,tcp:[[::1]]:$TCP_PORT,tcp:[[::1]]:$WRONG_PORT_2"
m4_if([$2], [], [],
[AT_CHECK([ovsdb-client transact "tcp:[[::1]]:$TCP_PORT" $2], [0], [ignore], [ignore])])
AT_CHECK([$8 $srcdir/test-ovsdb.py -t10 idl $srcdir/idltest.ovsschema $remote $3],
[0], [stdout], [ignore])
AT_CHECK([sort stdout | uuidfilt]m4_if([$6],,, [[| $6]]),
[0], [$4])
OVSDB_SERVER_SHUTDOWN
AT_CLEANUP])
m4_define([OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PY],
[OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PYN([$1 - Python2 (multiple remotes)], [$2], [$3], [$4], [$5], [$6],
[$HAVE_PYTHON], [$PYTHON])
OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PYN([$1 - Python3 (multiple remotes)], [$2], [$3], [$4], [$5], [$6],
[$HAVE_PYTHON3], [$PYTHON3])])
# same as OVSDB_CHECK_IDL but uses the Python IDL implementation with SSL # same as OVSDB_CHECK_IDL but uses the Python IDL implementation with SSL
m4_define([OVSDB_CHECK_IDL_SSL_PYN], m4_define([OVSDB_CHECK_IDL_SSL_PYN],
[AT_SETUP([$1 - SSL]) [AT_SETUP([$1 - SSL])
@@ -178,7 +230,9 @@ m4_define([OVSDB_CHECK_IDL],
OVSDB_CHECK_IDL_PY($@) OVSDB_CHECK_IDL_PY($@)
OVSDB_CHECK_IDL_REGISTER_COLUMNS_PY($@) OVSDB_CHECK_IDL_REGISTER_COLUMNS_PY($@)
OVSDB_CHECK_IDL_TCP_PY($@) OVSDB_CHECK_IDL_TCP_PY($@)
OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PY($@)
OVSDB_CHECK_IDL_TCP6_PY($@) OVSDB_CHECK_IDL_TCP6_PY($@)
OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PY($@)
OVSDB_CHECK_IDL_SSL_PY($@)]) OVSDB_CHECK_IDL_SSL_PY($@)])
# This test uses the Python IDL implementation with passive tcp # This test uses the Python IDL implementation with passive tcp

View File

@@ -595,9 +595,16 @@ def do_idl(schema_file, remote, *commands):
idl.index_create("simple3", "simple3_by_name") idl.index_create("simple3", "simple3_by_name")
if commands: if commands:
error, stream = ovs.stream.Stream.open_block( remotes = remote.split(',')
ovs.stream.Stream.open(remote)) stream = None
if error: for r in remotes:
error, stream = ovs.stream.Stream.open_block(
ovs.stream.Stream.open(r))
if not error and stream:
break
stream = None
if not stream:
sys.stderr.write("failed to connect to \"%s\"" % remote) sys.stderr.write("failed to connect to \"%s\"" % remote)
sys.exit(1) sys.exit(1)
rpc = ovs.jsonrpc.Connection(stream) rpc = ovs.jsonrpc.Connection(stream)