mirror of
https://github.com/openvswitch/ovs
synced 2025-08-31 14:25:26 +00:00
python jsonrpc: Allow jsonrpc_session to have more than one remote.
Python IDL implementation doesn't have the support to connect to the
cluster dbs. This patch adds this support. We are still missing the
support in python idl class to connect to the cluster master. That
support will be added in an upcoming patch.
This patch is similar to the commit 8cf6bbb184
which added multiple remote
support in the C jsonrpc implementation.
Acked-by: Mark Michelson <mmichels@redhat.com>
Signed-off-by: Numan Siddique <nusiddiq@redhat.com>
Signed-off-by: Ben Pfaff <blp@ovn.org>
This commit is contained in:
committed by
Ben Pfaff
parent
c1aa16d191
commit
31e434fc98
@@ -101,6 +101,9 @@ class Idl(object):
|
||||
ovs.jsonrpc.session.open(). The connection will maintain an in-memory
|
||||
replica of the remote database.
|
||||
|
||||
'remote' can be comma separated multiple remotes and each remote
|
||||
should be in a form acceptable to ovs.jsonrpc.session.open().
|
||||
|
||||
'schema_helper' should be an instance of the SchemaHelper class which
|
||||
generates schema for the remote database. The caller may have cut it
|
||||
down by removing tables or columns that are not of interest. The IDL
|
||||
@@ -127,7 +130,8 @@ class Idl(object):
|
||||
self.tables = schema.tables
|
||||
self.readonly = schema.readonly
|
||||
self._db = schema
|
||||
self._session = ovs.jsonrpc.Session.open(remote,
|
||||
remotes = self._parse_remotes(remote)
|
||||
self._session = ovs.jsonrpc.Session.open_multiple(remotes,
|
||||
probe_interval=probe_interval)
|
||||
self._monitor_request_id = None
|
||||
self._last_seqno = None
|
||||
@@ -155,6 +159,19 @@ class Idl(object):
|
||||
table.condition = [True]
|
||||
table.cond_changed = False
|
||||
|
||||
def _parse_remotes(self, remote):
|
||||
# If remote is -
|
||||
# "tcp:10.0.0.1:6641,unix:/tmp/db.sock,t,s,tcp:10.0.0.2:6642"
|
||||
# this function returns
|
||||
# ["tcp:10.0.0.1:6641", "unix:/tmp/db.sock,t,s", tcp:10.0.0.2:6642"]
|
||||
remotes = []
|
||||
for r in remote.split(','):
|
||||
if remotes and r.find(":") == -1:
|
||||
remotes[-1] += "," + r
|
||||
else:
|
||||
remotes.append(r)
|
||||
return remotes
|
||||
|
||||
def index_create(self, table, name):
|
||||
"""Create a named multi-column index on a table"""
|
||||
return self.tables[table].rows.index_create(name)
|
||||
|
@@ -14,6 +14,7 @@
|
||||
import codecs
|
||||
import errno
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
|
||||
import ovs.json
|
||||
@@ -368,12 +369,17 @@ class Connection(object):
|
||||
class Session(object):
|
||||
"""A JSON-RPC session with reconnection."""
|
||||
|
||||
def __init__(self, reconnect, rpc):
|
||||
def __init__(self, reconnect, rpc, remotes):
|
||||
self.reconnect = reconnect
|
||||
self.rpc = rpc
|
||||
self.stream = None
|
||||
self.pstream = None
|
||||
self.seqno = 0
|
||||
if type(remotes) != list:
|
||||
remotes = [remotes]
|
||||
self.remotes = remotes
|
||||
random.shuffle(self.remotes)
|
||||
self.next_remote = 0
|
||||
|
||||
@staticmethod
|
||||
def open(name, probe_interval=None):
|
||||
@@ -393,28 +399,38 @@ class Session(object):
|
||||
feature. If non-zero the value will be forced to at least 1000
|
||||
milliseconds. If None it will just use the default value in OVS.
|
||||
"""
|
||||
reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
|
||||
reconnect.set_name(name)
|
||||
reconnect.enable(ovs.timeval.msec())
|
||||
return Session.open_multiple([name], probe_interval=probe_interval)
|
||||
|
||||
if ovs.stream.PassiveStream.is_valid_name(name):
|
||||
@staticmethod
|
||||
def open_multiple(remotes, probe_interval=None):
|
||||
reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
|
||||
session = Session(reconnect, None, remotes)
|
||||
session.pick_remote()
|
||||
reconnect.enable(ovs.timeval.msec())
|
||||
reconnect.set_backoff_free_tries(len(remotes))
|
||||
if ovs.stream.PassiveStream.is_valid_name(reconnect.get_name()):
|
||||
reconnect.set_passive(True, ovs.timeval.msec())
|
||||
|
||||
if not ovs.stream.stream_or_pstream_needs_probes(name):
|
||||
if not ovs.stream.stream_or_pstream_needs_probes(reconnect.get_name()):
|
||||
reconnect.set_probe_interval(0)
|
||||
elif probe_interval is not None:
|
||||
reconnect.set_probe_interval(probe_interval)
|
||||
|
||||
return Session(reconnect, None)
|
||||
return session
|
||||
|
||||
@staticmethod
|
||||
def open_unreliably(jsonrpc):
|
||||
reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
|
||||
session = Session(reconnect, None, [jsonrpc.name])
|
||||
reconnect.set_quiet(True)
|
||||
reconnect.set_name(jsonrpc.name)
|
||||
session.pick_remote()
|
||||
reconnect.set_max_tries(0)
|
||||
reconnect.connected(ovs.timeval.msec())
|
||||
return Session(reconnect, jsonrpc)
|
||||
return session
|
||||
|
||||
def pick_remote(self):
|
||||
self.reconnect.set_name(self.remotes[self.next_remote])
|
||||
self.next_remote = (self.next_remote + 1) % len(self.remotes)
|
||||
|
||||
def close(self):
|
||||
if self.rpc is not None:
|
||||
@@ -448,6 +464,8 @@ class Session(object):
|
||||
self.reconnect.connecting(ovs.timeval.msec())
|
||||
else:
|
||||
self.reconnect.connect_failed(ovs.timeval.msec(), error)
|
||||
self.stream = None
|
||||
self.pick_remote()
|
||||
elif self.pstream is None:
|
||||
error, self.pstream = ovs.stream.PassiveStream.open(name)
|
||||
if not error:
|
||||
@@ -490,6 +508,7 @@ class Session(object):
|
||||
if error != 0:
|
||||
self.reconnect.disconnected(ovs.timeval.msec(), error)
|
||||
self.__disconnect()
|
||||
self.pick_remote()
|
||||
elif self.stream is not None:
|
||||
self.stream.run()
|
||||
error = self.stream.connect()
|
||||
@@ -499,6 +518,7 @@ class Session(object):
|
||||
self.stream = None
|
||||
elif error != errno.EAGAIN:
|
||||
self.reconnect.connect_failed(ovs.timeval.msec(), error)
|
||||
self.pick_remote()
|
||||
self.stream.close()
|
||||
self.stream = None
|
||||
|
||||
@@ -583,3 +603,6 @@ class Session(object):
|
||||
|
||||
def force_reconnect(self):
|
||||
self.reconnect.force_reconnect(ovs.timeval.msec())
|
||||
|
||||
def get_num_of_remotes(self):
|
||||
return len(self.remotes)
|
||||
|
@@ -106,6 +106,32 @@ m4_define([OVSDB_CHECK_IDL_TCP_PY],
|
||||
OVSDB_CHECK_IDL_TCP_PYN([$1 - Python3], [$2], [$3], [$4], [$5], [$6],
|
||||
[$HAVE_PYTHON3], [$PYTHON3])])
|
||||
|
||||
# same as OVSDB_CHECK_IDL but uses the Python IDL implementation with tcp
|
||||
# with multiple remotes with only one remote reachable
|
||||
m4_define([OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PYN],
|
||||
[AT_SETUP([$1 - tcp])
|
||||
AT_SKIP_IF([test $7 = no])
|
||||
AT_KEYWORDS([ovsdb server idl positive Python with tcp socket $5])
|
||||
AT_CHECK([ovsdb_start_idltest "ptcp:0:127.0.0.1"])
|
||||
PARSE_LISTENING_PORT([ovsdb-server.log], [TCP_PORT])
|
||||
WRONG_PORT_1=$((TCP_PORT + 1))
|
||||
WRONG_PORT_2=$((TCP_PORT + 2))
|
||||
remote=tcp:127.0.0.1:$WRONG_PORT_1,tcp:127.0.0.1:$TCP_PORT,tcp:127.0.0.1:$WRONG_PORT_2
|
||||
m4_if([$2], [], [],
|
||||
[AT_CHECK([ovsdb-client transact tcp:127.0.0.1:$TCP_PORT $2], [0], [ignore], [ignore])])
|
||||
AT_CHECK([$8 $srcdir/test-ovsdb.py -t10 idl $srcdir/idltest.ovsschema $remote $3],
|
||||
[0], [stdout], [ignore])
|
||||
AT_CHECK([sort stdout | uuidfilt]m4_if([$6],,, [[| $6]]),
|
||||
[0], [$4])
|
||||
OVSDB_SERVER_SHUTDOWN
|
||||
AT_CLEANUP])
|
||||
|
||||
m4_define([OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PY],
|
||||
[OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PYN([$1 - Python2 (multiple remotes)], [$2], [$3], [$4], [$5], [$6],
|
||||
[$HAVE_PYTHON], [$PYTHON])
|
||||
OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PYN([$1 - Python3 (multiple remotes)], [$2], [$3], [$4], [$5], [$6],
|
||||
[$HAVE_PYTHON3], [$PYTHON3])])
|
||||
|
||||
# same as OVSDB_CHECK_IDL but uses the Python IDL implementation with tcp6
|
||||
m4_define([OVSDB_CHECK_IDL_TCP6_PYN],
|
||||
[AT_SETUP([$1 - tcp6])
|
||||
@@ -132,6 +158,32 @@ m4_define([OVSDB_CHECK_IDL_TCP6_PY],
|
||||
OVSDB_CHECK_IDL_TCP6_PYN([$1 - Python3], [$2], [$3], [$4], [$5], [$6],
|
||||
[$HAVE_PYTHON3], [$PYTHON3])])
|
||||
|
||||
m4_define([OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PYN],
|
||||
[AT_SETUP([$1 - tcp6])
|
||||
AT_SKIP_IF([test $7 = no])
|
||||
AT_SKIP_IF([test "$IS_WIN32" = "yes"])
|
||||
AT_SKIP_IF([test $HAVE_IPV6 = no])
|
||||
AT_KEYWORDS([ovsdb server idl positive Python with tcp6 socket $5])
|
||||
AT_CHECK([ovsdb_start_idltest "ptcp:0:[[::1]]"])
|
||||
PARSE_LISTENING_PORT([ovsdb-server.log], [TCP_PORT])
|
||||
WRONG_PORT_1=$((TCP_PORT + 1))
|
||||
WRONG_PORT_2=$((TCP_PORT + 2))
|
||||
remote="tcp:[[::1]]:$WRONG_PORT_1,tcp:[[::1]]:$TCP_PORT,tcp:[[::1]]:$WRONG_PORT_2"
|
||||
m4_if([$2], [], [],
|
||||
[AT_CHECK([ovsdb-client transact "tcp:[[::1]]:$TCP_PORT" $2], [0], [ignore], [ignore])])
|
||||
AT_CHECK([$8 $srcdir/test-ovsdb.py -t10 idl $srcdir/idltest.ovsschema $remote $3],
|
||||
[0], [stdout], [ignore])
|
||||
AT_CHECK([sort stdout | uuidfilt]m4_if([$6],,, [[| $6]]),
|
||||
[0], [$4])
|
||||
OVSDB_SERVER_SHUTDOWN
|
||||
AT_CLEANUP])
|
||||
|
||||
m4_define([OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PY],
|
||||
[OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PYN([$1 - Python2 (multiple remotes)], [$2], [$3], [$4], [$5], [$6],
|
||||
[$HAVE_PYTHON], [$PYTHON])
|
||||
OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PYN([$1 - Python3 (multiple remotes)], [$2], [$3], [$4], [$5], [$6],
|
||||
[$HAVE_PYTHON3], [$PYTHON3])])
|
||||
|
||||
# same as OVSDB_CHECK_IDL but uses the Python IDL implementation with SSL
|
||||
m4_define([OVSDB_CHECK_IDL_SSL_PYN],
|
||||
[AT_SETUP([$1 - SSL])
|
||||
@@ -178,7 +230,9 @@ m4_define([OVSDB_CHECK_IDL],
|
||||
OVSDB_CHECK_IDL_PY($@)
|
||||
OVSDB_CHECK_IDL_REGISTER_COLUMNS_PY($@)
|
||||
OVSDB_CHECK_IDL_TCP_PY($@)
|
||||
OVSDB_CHECK_IDL_TCP_MULTIPLE_REMOTES_PY($@)
|
||||
OVSDB_CHECK_IDL_TCP6_PY($@)
|
||||
OVSDB_CHECK_IDL_TCP6_MULTIPLE_REMOTES_PY($@)
|
||||
OVSDB_CHECK_IDL_SSL_PY($@)])
|
||||
|
||||
# This test uses the Python IDL implementation with passive tcp
|
||||
|
@@ -595,9 +595,16 @@ def do_idl(schema_file, remote, *commands):
|
||||
idl.index_create("simple3", "simple3_by_name")
|
||||
|
||||
if commands:
|
||||
error, stream = ovs.stream.Stream.open_block(
|
||||
ovs.stream.Stream.open(remote))
|
||||
if error:
|
||||
remotes = remote.split(',')
|
||||
stream = None
|
||||
for r in remotes:
|
||||
error, stream = ovs.stream.Stream.open_block(
|
||||
ovs.stream.Stream.open(r))
|
||||
if not error and stream:
|
||||
break
|
||||
stream = None
|
||||
|
||||
if not stream:
|
||||
sys.stderr.write("failed to connect to \"%s\"" % remote)
|
||||
sys.exit(1)
|
||||
rpc = ovs.jsonrpc.Connection(stream)
|
||||
|
Reference in New Issue
Block a user