mirror of
https://github.com/openvswitch/ovs
synced 2025-10-23 14:57:06 +00:00
jsonrpc: Treat receiving part of a message as activity.
Until now, the jsonrpc code has only counted receiving a full JSON-RPC messages as activity. This could theoretically time out, then, while a very long message is in transit or if a slow link is involved. This commit changes this code to count receiving any part of a message as activity. This isn't a problem for OpenFlow connections because OpenFlow messages are at most 64 kB in size. This problem hasn't actually been observed in practice. Bug #12789. Signed-off-by: Ben Pfaff <blp@nicira.com>
This commit is contained in:
@@ -178,6 +178,14 @@ jsonrpc_get_backlog(const struct jsonrpc *rpc)
|
|||||||
return rpc->status ? 0 : rpc->backlog;
|
return rpc->status ? 0 : rpc->backlog;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Returns the number of bytes that have been received on 'rpc''s underlying
|
||||||
|
* stream. (The value wraps around if it exceeds UINT_MAX.) */
|
||||||
|
unsigned int
|
||||||
|
jsonrpc_get_received_bytes(const struct jsonrpc *rpc)
|
||||||
|
{
|
||||||
|
return rpc->input.head;
|
||||||
|
}
|
||||||
|
|
||||||
/* Returns 'rpc''s name, that is, the name returned by stream_get_name() for
|
/* Returns 'rpc''s name, that is, the name returned by stream_get_name() for
|
||||||
* the stream underlying 'rpc' when 'rpc' was created. */
|
* the stream underlying 'rpc' when 'rpc' was created. */
|
||||||
const char *
|
const char *
|
||||||
@@ -988,10 +996,21 @@ struct jsonrpc_msg *
|
|||||||
jsonrpc_session_recv(struct jsonrpc_session *s)
|
jsonrpc_session_recv(struct jsonrpc_session *s)
|
||||||
{
|
{
|
||||||
if (s->rpc) {
|
if (s->rpc) {
|
||||||
|
unsigned int received_bytes;
|
||||||
struct jsonrpc_msg *msg;
|
struct jsonrpc_msg *msg;
|
||||||
|
|
||||||
|
received_bytes = jsonrpc_get_received_bytes(s->rpc);
|
||||||
jsonrpc_recv(s->rpc, &msg);
|
jsonrpc_recv(s->rpc, &msg);
|
||||||
if (msg) {
|
if (received_bytes != jsonrpc_get_received_bytes(s->rpc)) {
|
||||||
|
/* Data was successfully received.
|
||||||
|
*
|
||||||
|
* Previously we only counted receiving a full message as activity,
|
||||||
|
* but with large messages or a slow connection that policy could
|
||||||
|
* time out the session mid-message. */
|
||||||
reconnect_activity(s->reconnect, time_msec());
|
reconnect_activity(s->reconnect, time_msec());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (msg) {
|
||||||
if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
|
if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
|
||||||
/* Echo request. Send reply. */
|
/* Echo request. Send reply. */
|
||||||
struct jsonrpc_msg *reply;
|
struct jsonrpc_msg *reply;
|
||||||
|
@@ -50,6 +50,7 @@ void jsonrpc_wait(struct jsonrpc *);
|
|||||||
|
|
||||||
int jsonrpc_get_status(const struct jsonrpc *);
|
int jsonrpc_get_status(const struct jsonrpc *);
|
||||||
size_t jsonrpc_get_backlog(const struct jsonrpc *);
|
size_t jsonrpc_get_backlog(const struct jsonrpc *);
|
||||||
|
unsigned int jsonrpc_get_received_bytes(const struct jsonrpc *);
|
||||||
const char *jsonrpc_get_name(const struct jsonrpc *);
|
const char *jsonrpc_get_name(const struct jsonrpc *);
|
||||||
|
|
||||||
int jsonrpc_send(struct jsonrpc *, struct jsonrpc_msg *);
|
int jsonrpc_send(struct jsonrpc *, struct jsonrpc_msg *);
|
||||||
|
@@ -186,6 +186,7 @@ class Connection(object):
|
|||||||
self.input = ""
|
self.input = ""
|
||||||
self.output = ""
|
self.output = ""
|
||||||
self.parser = None
|
self.parser = None
|
||||||
|
self.received_bytes = 0
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.stream.close()
|
self.stream.close()
|
||||||
@@ -221,6 +222,9 @@ class Connection(object):
|
|||||||
else:
|
else:
|
||||||
return len(self.output)
|
return len(self.output)
|
||||||
|
|
||||||
|
def get_received_bytes(self):
|
||||||
|
return self.received_bytes
|
||||||
|
|
||||||
def __log_msg(self, title, msg):
|
def __log_msg(self, title, msg):
|
||||||
vlog.dbg("%s: %s %s" % (self.name, title, msg))
|
vlog.dbg("%s: %s %s" % (self.name, title, msg))
|
||||||
|
|
||||||
@@ -271,6 +275,7 @@ class Connection(object):
|
|||||||
return EOF, None
|
return EOF, None
|
||||||
else:
|
else:
|
||||||
self.input += data
|
self.input += data
|
||||||
|
self.received_bytes += len(data)
|
||||||
else:
|
else:
|
||||||
if self.parser is None:
|
if self.parser is None:
|
||||||
self.parser = ovs.json.Parser()
|
self.parser = ovs.json.Parser()
|
||||||
@@ -444,7 +449,16 @@ class Session(object):
|
|||||||
self.pstream = None
|
self.pstream = None
|
||||||
|
|
||||||
if self.rpc:
|
if self.rpc:
|
||||||
|
received_bytes = self.rpc.get_received_bytes()
|
||||||
self.rpc.run()
|
self.rpc.run()
|
||||||
|
if received_bytes != self.rpc.get_received_bytes():
|
||||||
|
# Data was successfully received.
|
||||||
|
#
|
||||||
|
# Previously we only counted receiving a full message as
|
||||||
|
# activity, but with large messages or a slow connection that
|
||||||
|
# policy could time out the session mid-message.
|
||||||
|
self.reconnect.activity(ovs.timeval.msec())
|
||||||
|
|
||||||
error = self.rpc.get_status()
|
error = self.rpc.get_status()
|
||||||
if error != 0:
|
if error != 0:
|
||||||
self.reconnect.disconnected(ovs.timeval.msec(), error)
|
self.reconnect.disconnected(ovs.timeval.msec(), error)
|
||||||
|
Reference in New Issue
Block a user