2
0
mirror of https://github.com/openvswitch/ovs synced 2025-08-22 01:51:26 +00:00

lib: Introduce cooperative multitasking module.

One of the goals of Open vSwitch is to be as resource efficient as
possible.  Core parts of the program has been implemented as
asynchronous state machines, and when absolutely necessary
additional threads are used.

Introduce cooperative multitasking module which allow us to
interleave important processing with long running tasks while
avoiding the additional resource consumption of threads and
complexity of asynchronous state machines.

We will use this module to ensure long running processing in the
OVSDB server does not interfere with stable maintenance of the
RAFT cluster in subsequent patches.

Suggested-by: Ilya Maximets <i.maximets@ovn.org>
Signed-off-by: Frode Nordahl <frode.nordahl@canonical.com>
Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
This commit is contained in:
Frode Nordahl 2024-01-16 22:52:02 +00:00 committed by Ilya Maximets
parent 6ece3d57b2
commit 3c8a4e942d
8 changed files with 625 additions and 0 deletions

View File

@ -94,6 +94,9 @@ lib_libopenvswitch_la_SOURCES = \
lib/conntrack-other.c \
lib/conntrack.c \
lib/conntrack.h \
lib/cooperative-multitasking.c \
lib/cooperative-multitasking.h \
lib/cooperative-multitasking-private.h \
lib/coverage.c \
lib/coverage.h \
lib/cpu.c \

View File

@ -0,0 +1,33 @@
/*
* Copyright (c) 2024 Canonical Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef COOPERATIVE_MULTITASKING_PRIVATE_H
#define COOPERATIVE_MULTITASKING_PRIVATE_H 1
#include "openvswitch/hmap.h"
extern struct hmap cooperative_multitasking_callbacks;
struct cm_entry {
struct hmap_node node;
void (*cb)(void *);
void *arg;
long long int threshold;
long long int last_run;
const char *name;
};
#endif /* COOPERATIVE_MULTITASKING_PRIVATE_H */

View File

@ -0,0 +1,157 @@
/*
* Copyright (c) 2024 Canonical Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <config.h>
#include "backtrace.h"
#include "cooperative-multitasking-private.h"
#include "cooperative-multitasking.h"
#include "hash.h"
#include "openvswitch/hmap.h"
#include "openvswitch/vlog.h"
#include "timeval.h"
VLOG_DEFINE_THIS_MODULE(cooperative_multitasking);
struct hmap cooperative_multitasking_callbacks = HMAP_INITIALIZER(
&cooperative_multitasking_callbacks);
/* Free any data allocated by calls to cooperative_multitasking_set(). */
void
cooperative_multitasking_destroy(void)
{
struct cm_entry *cm_entry;
HMAP_FOR_EACH_SAFE (cm_entry, node, &cooperative_multitasking_callbacks) {
hmap_remove(&cooperative_multitasking_callbacks, &cm_entry->node);
free(cm_entry);
}
}
/* Set/update callback as identified by 'cb' and 'arg'.
*
* 'name' is used for logging events related to this callback.
*
* The value for 'last_run' must be updated each time the callback is run.
*
* Updating the value for 'threshold' may be necessary as a consequence of
* change in runtime configuration or requirements of the part of the program
* serviced by the callback.
*
* Providing a value of 0 for 'last_run' or 'threshold' will leave the stored
* value untouched. */
void
cooperative_multitasking_set(void (*cb)(void *), void *arg,
long long int last_run, long long int threshold,
const char *name)
{
struct cm_entry *cm_entry;
HMAP_FOR_EACH_WITH_HASH (cm_entry, node, hash_pointer((void *) cb, 0),
&cooperative_multitasking_callbacks) {
if (cm_entry->cb == cb && cm_entry->arg == arg) {
if (last_run) {
cm_entry->last_run = last_run;
}
if (threshold) {
cm_entry->threshold = threshold;
}
return;
}
}
cm_entry = xzalloc(sizeof *cm_entry);
cm_entry->cb = cb;
cm_entry->arg = arg;
cm_entry->threshold = threshold;
cm_entry->last_run = last_run ? last_run : time_msec();
cm_entry->name = name;
hmap_insert(&cooperative_multitasking_callbacks,
&cm_entry->node, hash_pointer((void *) cm_entry->cb, 0));
}
/* Remove callback identified by 'cb' and 'arg'. */
void
cooperative_multitasking_remove(void (*cb)(void *), void *arg)
{
struct cm_entry *cm_entry;
HMAP_FOR_EACH_WITH_HASH (cm_entry, node, hash_pointer((void *) cb, 0),
&cooperative_multitasking_callbacks) {
if (cm_entry->cb == cb && cm_entry->arg == arg) {
hmap_remove(&cooperative_multitasking_callbacks, &cm_entry->node);
free(cm_entry);
return;
}
}
}
static void
cooperative_multitasking_yield_at__(const char *source_location)
{
long long int start = time_msec();
struct cm_entry *cm_entry;
long long int elapsed;
bool warn;
HMAP_FOR_EACH (cm_entry, node, &cooperative_multitasking_callbacks) {
elapsed = time_msec() - cm_entry->last_run;
if (elapsed >= cm_entry->threshold) {
warn = elapsed - cm_entry->threshold > cm_entry->threshold / 8;
VLOG(warn ? VLL_WARN : VLL_DBG, "%s: yield for %s(%p): "
"elapsed(%lld) >= threshold(%lld), overrun: %lld",
source_location, cm_entry->name, cm_entry->arg, elapsed,
cm_entry->threshold, elapsed - cm_entry->threshold);
if (warn && VLOG_IS_DBG_ENABLED()) {
log_backtrace();
}
(*cm_entry->cb)(cm_entry->arg);
}
}
elapsed = time_msec() - start;
if (elapsed > 1000) {
VLOG_WARN("Unreasonably long %lldms runtime for callbacks.", elapsed);
}
}
/* Iterate over registered callbacks and execute callbacks as demanded by the
* recorded time threshold. */
void
cooperative_multitasking_yield_at(const char *source_location)
{
static bool yield_in_progress = false;
if (yield_in_progress) {
VLOG_ERR_ONCE("Nested yield avoided, this is a bug! "
"Enable debug logging for more details.");
if (VLOG_IS_DBG_ENABLED()) {
VLOG_DBG("%s: nested yield.", source_location);
log_backtrace();
}
return;
}
yield_in_progress = true;
cooperative_multitasking_yield_at__(source_location);
yield_in_progress = false;
}

View File

@ -0,0 +1,113 @@
/*
* Copyright (c) 2024 Canonical Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef COOPERATIVE_MULTITASKING_H
#define COOPERATIVE_MULTITASKING_H 1
/*
* cooperative-multitasking, interleaved execution for Open vSwitch.
*
* Overview
* ========
*
* One of the goals of Open vSwitch is to be as resource efficient as
* possible. Core parts of the program has been implemented as asynchronous
* state machines, and when absolutely necessary additional threads are used.
*
* Modules with mostly synchronous and single threaded code that are expected
* to have heavy processing, can make use of the cooperative-multitasking
* interface to yield to modules that have registered callbacks at a time
* threshold.
*
* Typical Usage
* =============
*
* The module that provides the callback typically has a run() function that is
* already part of the main processing loop and can then register like this:
*
* static void my_run_cb(void *arg);
*
* static void
* my_run(struct data *my_data)
* {
* ...
*
* cooperative_multitasking_set(&my_run_cb, (void *) my_data,
* time_msec(), 1000, "my_run");
* }
*
* static void
* my_run_cb (void *arg)
* {
* struct data *my_data = (struct data *) arg;
*
* my_run(my_data);
* }
*
* static void
* my_destroy(struct data *my_data)
* {
* ...
*
* cooperatrive_multitasking_remove(&my_run_cb, (void *) my_data);
* }
*
* The module that is expected to have heavy processing can yield like this:
*
* HMAP_FOR_EACH (row, hmap_node, &src_table->rows) {
* cooperative_multitasking_yield();
*
* ...
* }
*
* Rules for implementation
* ========================
*
* - The module that registers itself with a callback must not use the yield
* functionality inside nor should it be possible to do so via calls to other
* modules.
*
* - The module that registers the callback should be self-sufficient, i.e.
* the internal state of that module should not matter to the outside world,
* at least it should not matter for the call stack that enters the
* cooperative_multitasking_yield().
*
* - cooperative_multitasking_yield() must not be called from places that can
* loop indefinitely, only in places that eventually end, otherwise it may
* give a false impression that the server is working fine while it is stuck
* and not actually doing any useful work.
*
* Thread-safety
* =============
*
* The cooperative-multitasking module and functions therein are not thread
* safe and must only be used by one thread.
*/
void cooperative_multitasking_destroy(void);
void cooperative_multitasking_set(void (*cb)(void *), void *arg,
long long int last_run,
long long int threshold,
const char *name);
void cooperative_multitasking_remove(void (*cb)(void *), void *arg);
void cooperative_multitasking_yield_at(const char *source_location);
#define cooperative_multitasking_yield() \
cooperative_multitasking_yield_at(OVS_SOURCE_LOCATOR)
#endif /* COOPERATIVE_MULTITASKING_H */

View File

@ -455,6 +455,7 @@ tests_ovstest_SOURCES = \
tests/test-ccmap.c \
tests/test-cmap.c \
tests/test-conntrack.c \
tests/test-cooperative-multitasking.c \
tests/test-csum.c \
tests/test-flows.c \
tests/test-hash.c \

View File

@ -296,3 +296,13 @@ AT_CLEANUP
AT_SETUP([uuidset module])
AT_CHECK([ovstest test-uuidset], [0], [], [ignore])
AT_CLEANUP
AT_SETUP([cooperative-multitasking module])
AT_CHECK([ovstest test-cooperative-multitasking], [0], [])
AT_CLEANUP
AT_SETUP([cooperative-multitasking module nested yield detection])
AT_CHECK([ovstest test-cooperative-multitasking-nested-yield], [0], [], [dnl
cooperative_multitasking|ERR|Nested yield avoided, this is a bug! Enable debug logging for more details.
])
AT_CLEANUP

View File

@ -2833,6 +2833,7 @@ m4_define([CLEAN_LOG_FILE],
[sed 's/[[0-9\-]]*T[[0-9:\.]]*Z|[[0-9]]*\(|.*$\)/\1/g' $1 | dnl
sed '/|poll_loop|/d' | dnl
sed '/|socket_util|/d' | dnl
sed '/|cooperative_multitasking|DBG|/d' | dnl
sed 's/[[0-9]]*\.ctl/<cleared>\.ctl/g'> $2])
CLEAN_LOG_FILE([1.log], [1.log.clear])

View File

@ -0,0 +1,307 @@
/*
* Copyright (c) 2023 Canonical Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <config.h>
#undef NDEBUG
#include "cooperative-multitasking.h"
#include "cooperative-multitasking-private.h"
#include "openvswitch/hmap.h"
#include "ovstest.h"
#include "timeval.h"
#include "util.h"
#include "openvswitch/vlog.h"
struct fixture_arg {
bool called;
};
static void fixture_run_wrap(void *arg);
#define FIXTURE_RUN_NAME "fixture_run"
static void
fixture_run(struct fixture_arg *arg)
{
cooperative_multitasking_set(&fixture_run_wrap, (void *) arg,
time_msec(), 0, FIXTURE_RUN_NAME);
if (arg) {
arg->called = true;
}
}
static void
fixture_run_wrap(void *arg)
{
struct fixture_arg *fixture_arg = (struct fixture_arg *) arg;
fixture_run(fixture_arg);
}
static void fixture_other_run_wrap(void *arg);
#define FIXTURE_OTHER_RUN_NAME "fixture_other_run"
static void
fixture_other_run(struct fixture_arg *arg)
{
cooperative_multitasking_set(&fixture_other_run_wrap, (void *) arg,
time_msec(), 0, FIXTURE_OTHER_RUN_NAME);
if (arg) {
arg->called = true;
}
}
static void
fixture_other_run_wrap(void *arg)
{
struct fixture_arg *fixture_arg = (struct fixture_arg *) arg;
fixture_other_run(fixture_arg);
}
static void
test_cm_set_registration(void)
{
struct cm_entry *cm_entry;
struct fixture_arg arg1 = {
.called = false,
};
struct fixture_arg arg2 = {
.called = false,
};
timeval_stop();
long long int now = time_msec();
cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg1, 0, 1000,
FIXTURE_RUN_NAME);
cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg2, 0, 2000,
FIXTURE_RUN_NAME);
cooperative_multitasking_set(&fixture_other_run_wrap, NULL, 0, 3000,
FIXTURE_OTHER_RUN_NAME);
ovs_assert(hmap_count(&cooperative_multitasking_callbacks) == 3);
HMAP_FOR_EACH (cm_entry, node, &cooperative_multitasking_callbacks) {
if (cm_entry->arg == (void *) &arg1) {
ovs_assert(cm_entry->cb == &fixture_run_wrap);
ovs_assert(cm_entry->threshold == 1000);
ovs_assert(cm_entry->last_run == now);
} else if (cm_entry->arg == (void *) &arg2) {
ovs_assert(cm_entry->cb == &fixture_run_wrap);
ovs_assert(cm_entry->threshold == 2000);
ovs_assert(cm_entry->last_run == now);
} else if (cm_entry->cb == &fixture_other_run_wrap) {
ovs_assert(cm_entry->arg == NULL);
ovs_assert(cm_entry->threshold == 3000);
ovs_assert(cm_entry->last_run == now);
} else {
OVS_NOT_REACHED();
}
}
cooperative_multitasking_remove(&fixture_other_run_wrap, NULL);
ovs_assert(hmap_count(&cooperative_multitasking_callbacks) == 2);
cooperative_multitasking_remove(&fixture_run_wrap, (void *) &arg2);
ovs_assert(hmap_count(&cooperative_multitasking_callbacks) == 1);
cooperative_multitasking_destroy();
}
static void
test_cm_set_update(void)
{
struct cm_entry *cm_entry;
struct fixture_arg arg1 = {
.called = false,
};
struct fixture_arg arg2 = {
.called = false,
};
timeval_stop();
long long int now = time_msec();
/* First register a couple of callbacks. */
cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg1, 0, 0,
FIXTURE_RUN_NAME);
cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg2, 0, 0,
FIXTURE_RUN_NAME);
ovs_assert(hmap_count(&cooperative_multitasking_callbacks) == 2);
HMAP_FOR_EACH (cm_entry, node, &cooperative_multitasking_callbacks) {
if (cm_entry->arg == (void *) &arg1) {
ovs_assert(cm_entry->threshold == 0);
ovs_assert(cm_entry->last_run == now);
} else if (cm_entry->arg == (void *) &arg2) {
ovs_assert(cm_entry->threshold == 0);
ovs_assert(cm_entry->last_run == now);
} else {
OVS_NOT_REACHED();
}
}
/* Update 'last_run' and 'threshold' for each callback and validate
* that the correct entry was actually updated. */
cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg1, 1, 2,
FIXTURE_RUN_NAME);
cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg2, 3, 4,
FIXTURE_RUN_NAME);
HMAP_FOR_EACH (cm_entry, node, &cooperative_multitasking_callbacks) {
if (cm_entry->arg == (void *) &arg1) {
ovs_assert(cm_entry->threshold == 2);
ovs_assert(cm_entry->last_run == 1);
} else if (cm_entry->arg == (void *) &arg2) {
ovs_assert(cm_entry->threshold == 4);
ovs_assert(cm_entry->last_run == 3);
} else {
OVS_NOT_REACHED();
}
}
/* Confirm that providing 0 for 'last_run' or 'threshold' leaves the
* existing value untouched. */
cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg1, 0, 5,
FIXTURE_RUN_NAME);
cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg2, 6, 0,
FIXTURE_RUN_NAME);
HMAP_FOR_EACH (cm_entry, node, &cooperative_multitasking_callbacks) {
if (cm_entry->arg == (void *) &arg1) {
ovs_assert(cm_entry->threshold == 5);
ovs_assert(cm_entry->last_run == 1);
} else if (cm_entry->arg == (void *) &arg2) {
ovs_assert(cm_entry->threshold == 4);
ovs_assert(cm_entry->last_run == 6);
} else {
OVS_NOT_REACHED();
}
}
cooperative_multitasking_destroy();
}
static void
test_cm_yield(void)
{
struct cm_entry *cm_entry;
struct fixture_arg arg1 = {
.called = false,
};
struct fixture_arg arg2 = {
.called = false,
};
timeval_stop();
long long int now = time_msec();
/* First register a couple of callbacks. */
cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg1, 0, 1000,
FIXTURE_RUN_NAME);
cooperative_multitasking_set(&fixture_run_wrap, (void *) &arg2, 0, 2000,
FIXTURE_RUN_NAME);
ovs_assert(hmap_count(&cooperative_multitasking_callbacks) == 2);
/* Call to yield should not execute callbacks until time threshold. */
cooperative_multitasking_yield();
ovs_assert(arg1.called == false);
ovs_assert(arg2.called == false);
HMAP_FOR_EACH (cm_entry, node, &cooperative_multitasking_callbacks) {
ovs_assert(cm_entry->last_run == now);
}
/* Move clock forward and confirm the expected callbacks to be executed. */
timeval_warp(1000);
timeval_stop();
cooperative_multitasking_yield();
ovs_assert(arg1.called == true);
ovs_assert(arg2.called == false);
/* Move clock forward and confirm the expected callbacks to be executed. */
arg1.called = arg2.called = false;
timeval_warp(1000);
timeval_stop();
cooperative_multitasking_yield();
ovs_assert(arg1.called == true);
ovs_assert(arg2.called == true);
cooperative_multitasking_destroy();
}
static void fixture_buggy_run_wrap(void *arg);
#define FIXTURE_BUGGY_RUN_NAME "fixture_buggy_run"
static void
fixture_buggy_run(struct fixture_arg *arg)
{
cooperative_multitasking_set(&fixture_buggy_run_wrap, (void *) arg,
time_msec(), 0, FIXTURE_BUGGY_RUN_NAME);
if (arg) {
arg->called = true;
}
/* A real run function MUST NOT directly or indirectly call yield, this is
* here to test the detection of such a programming error. */
cooperative_multitasking_yield();
}
static void
fixture_buggy_run_wrap(void *arg)
{
struct fixture_arg *fixture_arg = (struct fixture_arg *) arg;
fixture_buggy_run(fixture_arg);
}
static void
test_cooperative_multitasking_nested_yield(int argc OVS_UNUSED, char *argv[])
{
struct fixture_arg arg1 = {
.called = false,
};
set_program_name(argv[0]);
vlog_set_pattern(VLF_CONSOLE, "%c|%p|%m");
vlog_set_levels(NULL, VLF_SYSLOG, VLL_OFF);
time_msec(); /* Ensure timeval is initialized. */
cooperative_multitasking_set(&fixture_buggy_run_wrap, (void *) &arg1,
0, 1000, FIXTURE_BUGGY_RUN_NAME);
timeval_warp(1000);
cooperative_multitasking_yield();
cooperative_multitasking_destroy();
}
static void
test_cooperative_multitasking(int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
{
time_msec(); /* Ensure timeval is initialized. */
test_cm_set_registration();
test_cm_set_update();
test_cm_yield();
}
OVSTEST_REGISTER("test-cooperative-multitasking",
test_cooperative_multitasking);
OVSTEST_REGISTER("test-cooperative-multitasking-nested-yield",
test_cooperative_multitasking_nested_yield);