mirror of
https://github.com/openvswitch/ovs
synced 2025-10-29 15:28:56 +00:00
stream: Add stream_run(), stream_run_wait() functions.
SSL, which will be added in an upcoming commit, requires some background processing, which is best done in a "run" function in our architecture. This commit adds stream_run() and stream_run_wait() and calls to them from the places where they will be required.
This commit is contained in:
@@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright (c) 2009 Nicira Networks.
|
* Copyright (c) 2009, 2010 Nicira Networks.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
@@ -88,6 +88,7 @@ jsonrpc_run(struct jsonrpc *rpc)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stream_run(rpc->stream);
|
||||||
while (!queue_is_empty(&rpc->output)) {
|
while (!queue_is_empty(&rpc->output)) {
|
||||||
struct ofpbuf *buf = rpc->output.head;
|
struct ofpbuf *buf = rpc->output.head;
|
||||||
int retval;
|
int retval;
|
||||||
@@ -113,8 +114,11 @@ jsonrpc_run(struct jsonrpc *rpc)
|
|||||||
void
|
void
|
||||||
jsonrpc_wait(struct jsonrpc *rpc)
|
jsonrpc_wait(struct jsonrpc *rpc)
|
||||||
{
|
{
|
||||||
if (!rpc->status && !queue_is_empty(&rpc->output)) {
|
if (!rpc->status) {
|
||||||
stream_send_wait(rpc->stream);
|
stream_run_wait(rpc->stream);
|
||||||
|
if (!queue_is_empty(&rpc->output)) {
|
||||||
|
stream_send_wait(rpc->stream);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -721,7 +725,10 @@ jsonrpc_session_run(struct jsonrpc_session *s)
|
|||||||
jsonrpc_session_disconnect(s);
|
jsonrpc_session_disconnect(s);
|
||||||
}
|
}
|
||||||
} else if (s->stream) {
|
} else if (s->stream) {
|
||||||
int error = stream_connect(s->stream);
|
int error;
|
||||||
|
|
||||||
|
stream_run(s->stream);
|
||||||
|
error = stream_connect(s->stream);
|
||||||
if (!error) {
|
if (!error) {
|
||||||
reconnect_connected(s->reconnect, time_msec());
|
reconnect_connected(s->reconnect, time_msec());
|
||||||
s->rpc = jsonrpc_open(s->stream);
|
s->rpc = jsonrpc_open(s->stream);
|
||||||
@@ -763,6 +770,7 @@ jsonrpc_session_wait(struct jsonrpc_session *s)
|
|||||||
if (s->rpc) {
|
if (s->rpc) {
|
||||||
jsonrpc_wait(s->rpc);
|
jsonrpc_wait(s->rpc);
|
||||||
} else if (s->stream) {
|
} else if (s->stream) {
|
||||||
|
stream_run_wait(s->stream);
|
||||||
stream_connect_wait(s->stream);
|
stream_connect_wait(s->stream);
|
||||||
}
|
}
|
||||||
reconnect_wait(s->reconnect, time_msec());
|
reconnect_wait(s->reconnect, time_msec());
|
||||||
|
|||||||
@@ -139,6 +139,8 @@ static struct stream_class stream_fd_class = {
|
|||||||
fd_connect, /* connect */
|
fd_connect, /* connect */
|
||||||
fd_recv, /* recv */
|
fd_recv, /* recv */
|
||||||
fd_send, /* send */
|
fd_send, /* send */
|
||||||
|
NULL, /* run */
|
||||||
|
NULL, /* run_wait */
|
||||||
fd_wait, /* wait */
|
fd_wait, /* wait */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -109,6 +109,18 @@ struct stream_class {
|
|||||||
* accepted for transmission, it should return -EAGAIN immediately. */
|
* accepted for transmission, it should return -EAGAIN immediately. */
|
||||||
ssize_t (*send)(struct stream *stream, const void *buffer, size_t n);
|
ssize_t (*send)(struct stream *stream, const void *buffer, size_t n);
|
||||||
|
|
||||||
|
/* Allows 'stream' to perform maintenance activities, such as flushing
|
||||||
|
* output buffers.
|
||||||
|
*
|
||||||
|
* May be null if 'stream' doesn't have anything to do here. */
|
||||||
|
void (*run)(struct stream *stream);
|
||||||
|
|
||||||
|
/* Arranges for the poll loop to wake up when 'stream' needs to perform
|
||||||
|
* maintenance activities.
|
||||||
|
*
|
||||||
|
* May be null if 'stream' doesn't have anything to do here. */
|
||||||
|
void (*run_wait)(struct stream *stream);
|
||||||
|
|
||||||
/* Arranges for the poll loop to wake up when 'stream' is ready to take an
|
/* Arranges for the poll loop to wake up when 'stream' is ready to take an
|
||||||
* action of the given 'type'. */
|
* action of the given 'type'. */
|
||||||
void (*wait)(struct stream *stream, enum stream_wait_type type);
|
void (*wait)(struct stream *stream, enum stream_wait_type type);
|
||||||
|
|||||||
@@ -90,6 +90,8 @@ struct stream_class tcp_stream_class = {
|
|||||||
NULL, /* connect */
|
NULL, /* connect */
|
||||||
NULL, /* recv */
|
NULL, /* recv */
|
||||||
NULL, /* send */
|
NULL, /* send */
|
||||||
|
NULL, /* run */
|
||||||
|
NULL, /* run_wait */
|
||||||
NULL, /* wait */
|
NULL, /* wait */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -69,6 +69,8 @@ struct stream_class unix_stream_class = {
|
|||||||
NULL, /* connect */
|
NULL, /* connect */
|
||||||
NULL, /* recv */
|
NULL, /* recv */
|
||||||
NULL, /* send */
|
NULL, /* send */
|
||||||
|
NULL, /* run */
|
||||||
|
NULL, /* run_wait */
|
||||||
NULL, /* wait */
|
NULL, /* wait */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
29
lib/stream.c
29
lib/stream.c
@@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright (c) 2008, 2009 Nicira Networks.
|
* Copyright (c) 2008, 2009, 2010 Nicira Networks.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
@@ -66,7 +66,8 @@ check_stream_classes(void)
|
|||||||
struct stream_class *class = stream_classes[i];
|
struct stream_class *class = stream_classes[i];
|
||||||
assert(class->name != NULL);
|
assert(class->name != NULL);
|
||||||
assert(class->open != NULL);
|
assert(class->open != NULL);
|
||||||
if (class->close || class->recv || class->send || class->wait) {
|
if (class->close || class->recv || class->send || class->run
|
||||||
|
|| class->run_wait || class->wait) {
|
||||||
assert(class->close != NULL);
|
assert(class->close != NULL);
|
||||||
assert(class->recv != NULL);
|
assert(class->recv != NULL);
|
||||||
assert(class->send != NULL);
|
assert(class->send != NULL);
|
||||||
@@ -166,6 +167,8 @@ stream_open_block(const char *name, struct stream **streamp)
|
|||||||
|
|
||||||
error = stream_open(name, &stream);
|
error = stream_open(name, &stream);
|
||||||
while (error == EAGAIN) {
|
while (error == EAGAIN) {
|
||||||
|
stream_run(stream);
|
||||||
|
stream_run_wait(stream);
|
||||||
stream_connect_wait(stream);
|
stream_connect_wait(stream);
|
||||||
poll_block();
|
poll_block();
|
||||||
error = stream_connect(stream);
|
error = stream_connect(stream);
|
||||||
@@ -312,6 +315,28 @@ stream_send(struct stream *stream, const void *buffer, size_t n)
|
|||||||
: (stream->class->send)(stream, buffer, n));
|
: (stream->class->send)(stream, buffer, n));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Allows 'stream' to perform maintenance activities, such as flushing
|
||||||
|
* output buffers. */
|
||||||
|
void
|
||||||
|
stream_run(struct stream *stream)
|
||||||
|
{
|
||||||
|
if (stream->class->run) {
|
||||||
|
(stream->class->run)(stream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Arranges for the poll loop to wake up when 'stream' needs to perform
|
||||||
|
* maintenance activities. */
|
||||||
|
void
|
||||||
|
stream_run_wait(struct stream *stream)
|
||||||
|
{
|
||||||
|
if (stream->class->run_wait) {
|
||||||
|
(stream->class->run_wait)(stream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Arranges for the poll loop to wake up when 'stream' is ready to take an
|
||||||
|
* action of the given 'type'. */
|
||||||
void
|
void
|
||||||
stream_wait(struct stream *stream, enum stream_wait_type wait)
|
stream_wait(struct stream *stream, enum stream_wait_type wait)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -42,6 +42,9 @@ int stream_connect(struct stream *);
|
|||||||
int stream_recv(struct stream *, void *buffer, size_t n);
|
int stream_recv(struct stream *, void *buffer, size_t n);
|
||||||
int stream_send(struct stream *, const void *buffer, size_t n);
|
int stream_send(struct stream *, const void *buffer, size_t n);
|
||||||
|
|
||||||
|
void stream_run(struct stream *);
|
||||||
|
void stream_run_wait(struct stream *);
|
||||||
|
|
||||||
enum stream_wait_type {
|
enum stream_wait_type {
|
||||||
STREAM_CONNECT,
|
STREAM_CONNECT,
|
||||||
STREAM_RECV,
|
STREAM_RECV,
|
||||||
|
|||||||
Reference in New Issue
Block a user