diff --git a/lib/rconn.c b/lib/rconn.c index 64cc6d067..9273cff83 100644 --- a/lib/rconn.c +++ b/lib/rconn.c @@ -664,7 +664,7 @@ int rconn_send_with_limit(struct rconn *rc, struct ofpbuf *b, struct rconn_packet_counter *counter, int queue_limit) { - if (counter->n_packets < queue_limit) { + if (rconn_packet_counter_n_packets(counter) < queue_limit) { return rconn_send(rc, b, counter); } else { COVERAGE_INC(rconn_overflow); @@ -863,7 +863,10 @@ struct rconn_packet_counter * rconn_packet_counter_create(void) { struct rconn_packet_counter *c = xzalloc(sizeof *c); + ovs_mutex_init(&c->mutex); + ovs_mutex_lock(&c->mutex); c->ref_cnt = 1; + ovs_mutex_unlock(&c->mutex); return c; } @@ -871,8 +874,15 @@ void rconn_packet_counter_destroy(struct rconn_packet_counter *c) { if (c) { + bool dead; + + ovs_mutex_lock(&c->mutex); ovs_assert(c->ref_cnt > 0); - if (!--c->ref_cnt && !c->n_packets) { + dead = !--c->ref_cnt && !c->n_packets; + ovs_mutex_unlock(&c->mutex); + + if (dead) { + ovs_mutex_destroy(&c->mutex); free(c); } } @@ -881,25 +891,56 @@ rconn_packet_counter_destroy(struct rconn_packet_counter *c) void rconn_packet_counter_inc(struct rconn_packet_counter *c, unsigned int n_bytes) { + ovs_mutex_lock(&c->mutex); c->n_packets++; c->n_bytes += n_bytes; + ovs_mutex_unlock(&c->mutex); } void rconn_packet_counter_dec(struct rconn_packet_counter *c, unsigned int n_bytes) { - ovs_assert(c->n_packets > 0); - ovs_assert(c->n_bytes >= n_bytes); + bool dead = false; - c->n_bytes -= n_bytes; + ovs_mutex_lock(&c->mutex); + ovs_assert(c->n_packets > 0); + ovs_assert(c->n_packets == 1 + ? c->n_bytes == n_bytes + : c->n_bytes > n_bytes); c->n_packets--; - if (!c->n_packets) { - ovs_assert(!c->n_bytes); - if (!c->ref_cnt) { - free(c); - } + c->n_bytes -= n_bytes; + dead = !c->n_packets && !c->ref_cnt; + ovs_mutex_unlock(&c->mutex); + + if (dead) { + ovs_mutex_destroy(&c->mutex); + free(c); } } + +unsigned int +rconn_packet_counter_n_packets(const struct rconn_packet_counter *c) +{ + unsigned int n; + + ovs_mutex_lock(&c->mutex); + n = c->n_packets; + ovs_mutex_unlock(&c->mutex); + + return n; +} + +unsigned int +rconn_packet_counter_n_bytes(const struct rconn_packet_counter *c) +{ + unsigned int n; + + ovs_mutex_lock(&c->mutex); + n = c->n_bytes; + ovs_mutex_unlock(&c->mutex); + + return n; +} /* Set rc->target and rc->name to 'target' and 'name', respectively. If 'name' * is null, 'target' is used. diff --git a/lib/rconn.h b/lib/rconn.h index aa3023836..d943203f1 100644 --- a/lib/rconn.h +++ b/lib/rconn.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2008, 2009, 2010, 2011, 2012 Nicira, Inc. + * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013 Nicira, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ #include #include #include "openvswitch/types.h" +#include "ovs-thread.h" /* A wrapper around vconn that provides queuing and optionally reliability. * @@ -91,9 +92,10 @@ unsigned int rconn_count_txqlen(const struct rconn *); /* Counts packets and bytes queued into an rconn by a given source. */ struct rconn_packet_counter { - unsigned int n_packets; /* Number of packets queued. */ - unsigned int n_bytes; /* Number of bytes queued. */ - int ref_cnt; /* Number of owners. */ + struct ovs_mutex mutex; + unsigned int n_packets OVS_GUARDED; /* Number of packets queued. */ + unsigned int n_bytes OVS_GUARDED; /* Number of bytes queued. */ + int ref_cnt OVS_GUARDED; /* Number of owners. */ }; struct rconn_packet_counter *rconn_packet_counter_create(void); @@ -101,4 +103,8 @@ void rconn_packet_counter_destroy(struct rconn_packet_counter *); void rconn_packet_counter_inc(struct rconn_packet_counter *, unsigned n_bytes); void rconn_packet_counter_dec(struct rconn_packet_counter *, unsigned n_bytes); +unsigned int rconn_packet_counter_n_packets( + const struct rconn_packet_counter *); +unsigned int rconn_packet_counter_n_bytes(const struct rconn_packet_counter *); + #endif /* rconn.h */ diff --git a/ofproto/connmgr.c b/ofproto/connmgr.c index 8a4195d83..ab9a556d0 100644 --- a/ofproto/connmgr.c +++ b/ofproto/connmgr.c @@ -1238,7 +1238,7 @@ ofconn_reconfigure(struct ofconn *ofconn, const struct ofproto_controller *c) static bool ofconn_may_recv(const struct ofconn *ofconn) { - int count = ofconn->reply_counter->n_packets; + int count = rconn_packet_counter_n_packets(ofconn->reply_counter); return (!ofconn->blocked || ofconn->retry) && count < OFCONN_REPLY_MAX; } @@ -1934,10 +1934,12 @@ ofmonitor_flush(struct connmgr *mgr) struct ofpbuf *msg, *next; LIST_FOR_EACH_SAFE (msg, next, list_node, &ofconn->updates) { + unsigned int n_bytes; + list_remove(&msg->list_node); ofconn_send(ofconn, msg, ofconn->monitor_counter); - if (!ofconn->monitor_paused - && ofconn->monitor_counter->n_bytes > 128 * 1024) { + n_bytes = rconn_packet_counter_n_bytes(ofconn->monitor_counter); + if (!ofconn->monitor_paused && n_bytes > 128 * 1024) { struct ofpbuf *pause; COVERAGE_INC(ofmonitor_pause); @@ -1980,7 +1982,8 @@ ofmonitor_run(struct connmgr *mgr) struct ofconn *ofconn; LIST_FOR_EACH (ofconn, node, &mgr->all_conns) { - if (ofconn->monitor_paused && !ofconn->monitor_counter->n_packets) { + if (ofconn->monitor_paused + && !rconn_packet_counter_n_packets(ofconn->monitor_counter)) { COVERAGE_INC(ofmonitor_resume); ofmonitor_resume(ofconn); } @@ -1993,7 +1996,8 @@ ofmonitor_wait(struct connmgr *mgr) struct ofconn *ofconn; LIST_FOR_EACH (ofconn, node, &mgr->all_conns) { - if (ofconn->monitor_paused && !ofconn->monitor_counter->n_packets) { + if (ofconn->monitor_paused + && !rconn_packet_counter_n_packets(ofconn->monitor_counter)) { poll_immediate_wake(); } }