mirror of
https://github.com/openvswitch/ovs
synced 2025-09-01 14:55:18 +00:00
ovsdb: New ovsdb 'relay' service model.
New database service model 'relay' that is needed to scale out read-mostly database access, e.g. ovn-controller connections to OVN_Southbound. In this service model ovsdb-server connects to existing OVSDB server and maintains in-memory copy of the database. It serves read-only transactions and monitor requests by its own, but forwards write transactions to the relay source. Key differences from the active-backup replication: - support for "write" transactions (next commit). - no on-disk storage. (probably, faster operation) - support for multiple remotes (connect to the clustered db). - doesn't try to keep connection as long as possible, but faster reconnects to other remotes to avoid missing updates. - No need to know the complete database schema beforehand, only the schema name. - can be used along with other standalone and clustered databases by the same ovsdb-server process. (doesn't turn the whole jsonrpc server to read-only mode) - supports modern version of monitors (monitor_cond_since), because based on ovsdb-cs. - could be chained, i.e. multiple relays could be connected one to another in a row or in a tree-like form. - doesn't increase availability. - cannot be converted to other service models or become a main active server. Some performance test results can be found here: https://mail.openvswitch.org/pipermail/ovs-dev/2021-July/385825.html Acked-by: Mark D. Gray <mark.d.gray@redhat.com> Acked-by: Dumitru Ceara <dceara@redhat.com> Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
This commit is contained in:
@@ -1,13 +1,14 @@
|
||||
{"name": "_Server",
|
||||
"version": "1.1.0",
|
||||
"cksum": "3236486585 698",
|
||||
"version": "1.2.0",
|
||||
"cksum": "3009684573 744",
|
||||
"tables": {
|
||||
"Database": {
|
||||
"columns": {
|
||||
"name": {"type": "string"},
|
||||
"model": {
|
||||
"type": {"key": {"type": "string",
|
||||
"enum": ["set", ["standalone", "clustered"]]}}},
|
||||
"enum": ["set",
|
||||
["standalone", "clustered", "relay"]]}}},
|
||||
"connected": {"type": "boolean"},
|
||||
"leader": {"type": "boolean"},
|
||||
"schema": {
|
||||
|
@@ -60,12 +60,15 @@
|
||||
|
||||
<column name="model">
|
||||
The storage model: <code>standalone</code> for a standalone or
|
||||
active-backup database, <code>clustered</code> for a clustered database.
|
||||
active-backup database, <code>clustered</code> for a clustered database,
|
||||
<code>relay</code> for a relay database.
|
||||
</column>
|
||||
|
||||
<column name="schema">
|
||||
The database schema, as a JSON string. In the case of a clustered
|
||||
database, this is empty until it finishes joining its cluster.
|
||||
database, this is empty until it finishes joining its cluster. In the
|
||||
case of a relay database, this is empty until it connects to the relay
|
||||
source.
|
||||
</column>
|
||||
|
||||
<group title="Clustered Databases">
|
||||
@@ -85,20 +88,21 @@
|
||||
|
||||
<column name="leader">
|
||||
True if the database is the leader in its cluster. For a standalone or
|
||||
active-backup database, this is always true.
|
||||
active-backup database, this is always true. For a relay database,
|
||||
this is always false.
|
||||
</column>
|
||||
|
||||
<column name="cid">
|
||||
The cluster ID for this database, which is the same for all of the
|
||||
servers that host this particular clustered database. For a standalone
|
||||
or active-backup database, this is empty.
|
||||
servers that host this particular clustered database. For a
|
||||
standalone, active-backup or relay database, this is empty.
|
||||
</column>
|
||||
|
||||
<column name="sid">
|
||||
The server ID for this database, different for each server that hosts a
|
||||
particular clustered database. A server that hosts more than one
|
||||
clustered database will have a different <code>sid</code> in each one.
|
||||
For a standalone or active-backup database, this is empty.
|
||||
For a standalone, active-backup or relay database, this is empty.
|
||||
</column>
|
||||
|
||||
<column name="index">
|
||||
@@ -112,7 +116,7 @@
|
||||
</p>
|
||||
|
||||
<p>
|
||||
For a standalone or active-backup database, this is empty.
|
||||
For a standalone, active-backup or relay database, this is empty.
|
||||
</p>
|
||||
</column>
|
||||
</group>
|
||||
|
@@ -34,6 +34,8 @@ ovsdb_libovsdb_la_SOURCES = \
|
||||
ovsdb/rbac.h \
|
||||
ovsdb/replication.c \
|
||||
ovsdb/replication.h \
|
||||
ovsdb/relay.c \
|
||||
ovsdb/relay.h \
|
||||
ovsdb/row.c \
|
||||
ovsdb/row.h \
|
||||
ovsdb/server.c \
|
||||
|
@@ -196,6 +196,11 @@ ovsdb_execute_compose(struct ovsdb *db, const struct ovsdb_session *session,
|
||||
"%s operation not allowed on "
|
||||
"table in reserved database %s",
|
||||
op_name, db->schema->name);
|
||||
} else if (db->is_relay) {
|
||||
error = ovsdb_error("not allowed",
|
||||
"%s operation not allowed when "
|
||||
"database server is in relay mode",
|
||||
op_name);
|
||||
}
|
||||
}
|
||||
if (error) {
|
||||
|
@@ -44,6 +44,7 @@
|
||||
#include "openvswitch/poll-loop.h"
|
||||
#include "process.h"
|
||||
#include "replication.h"
|
||||
#include "relay.h"
|
||||
#include "row.h"
|
||||
#include "simap.h"
|
||||
#include "openvswitch/shash.h"
|
||||
@@ -225,6 +226,8 @@ main_loop(struct server_config *config,
|
||||
}
|
||||
}
|
||||
|
||||
ovsdb_relay_run();
|
||||
|
||||
struct shash_node *next;
|
||||
SHASH_FOR_EACH_SAFE (node, next, all_dbs) {
|
||||
struct db *db = node->data;
|
||||
@@ -273,6 +276,8 @@ main_loop(struct server_config *config,
|
||||
replication_wait();
|
||||
}
|
||||
|
||||
ovsdb_relay_wait();
|
||||
|
||||
ovsdb_jsonrpc_server_wait(jsonrpc);
|
||||
unixctl_server_wait(unixctl);
|
||||
SHASH_FOR_EACH(node, all_dbs) {
|
||||
@@ -546,6 +551,9 @@ close_db(struct server_config *config, struct db *db, char *comment)
|
||||
{
|
||||
if (db) {
|
||||
ovsdb_jsonrpc_server_remove_db(config->jsonrpc, db->db, comment);
|
||||
if (db->db->is_relay) {
|
||||
ovsdb_relay_del_db(db->db);
|
||||
}
|
||||
ovsdb_destroy(db->db);
|
||||
free(db->filename);
|
||||
free(db);
|
||||
@@ -554,6 +562,28 @@ close_db(struct server_config *config, struct db *db, char *comment)
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
update_schema(struct ovsdb *db, const struct ovsdb_schema *schema, void *aux)
|
||||
{
|
||||
struct server_config *config = aux;
|
||||
|
||||
if (!db->schema || strcmp(schema->version, db->schema->version)) {
|
||||
ovsdb_jsonrpc_server_reconnect(
|
||||
config->jsonrpc, false,
|
||||
(db->schema
|
||||
? xasprintf("database %s schema changed", db->name)
|
||||
: xasprintf("database %s connected to storage", db->name)));
|
||||
}
|
||||
|
||||
ovsdb_replace(db, ovsdb_create(ovsdb_schema_clone(schema), NULL));
|
||||
|
||||
/* Force update to schema in _Server database. */
|
||||
struct db *dbp = shash_find_data(config->all_dbs, db->name);
|
||||
if (dbp) {
|
||||
dbp->row_uuid = UUID_ZERO;
|
||||
}
|
||||
}
|
||||
|
||||
static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
|
||||
parse_txn(struct server_config *config, struct db *db,
|
||||
const struct ovsdb_schema *schema, const struct json *txn_json,
|
||||
@@ -575,21 +605,7 @@ parse_txn(struct server_config *config, struct db *db,
|
||||
if (error) {
|
||||
return error;
|
||||
}
|
||||
|
||||
if (!db->db->schema ||
|
||||
strcmp(schema->version, db->db->schema->version)) {
|
||||
ovsdb_jsonrpc_server_reconnect(
|
||||
config->jsonrpc, false,
|
||||
(db->db->schema
|
||||
? xasprintf("database %s schema changed", db->db->name)
|
||||
: xasprintf("database %s connected to storage",
|
||||
db->db->name)));
|
||||
}
|
||||
|
||||
ovsdb_replace(db->db, ovsdb_create(ovsdb_schema_clone(schema), NULL));
|
||||
|
||||
/* Force update to schema in _Server database. */
|
||||
db->row_uuid = UUID_ZERO;
|
||||
update_schema(db->db, schema, config);
|
||||
}
|
||||
|
||||
if (txn_json) {
|
||||
@@ -660,35 +676,56 @@ add_db(struct server_config *config, struct db *db)
|
||||
static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
|
||||
open_db(struct server_config *config, const char *filename)
|
||||
{
|
||||
/* If we know that the file is already open, return a good error message.
|
||||
* Otherwise, if the file is open, we'll fail later on with a harder to
|
||||
* interpret file locking error. */
|
||||
if (is_already_open(config, filename)) {
|
||||
return ovsdb_error(NULL, "%s: already open", filename);
|
||||
}
|
||||
|
||||
const char *relay_prefix = "relay:";
|
||||
const char *relay_remotes = NULL;
|
||||
const int relay_prefix_len = strlen(relay_prefix);
|
||||
struct ovsdb_storage *storage;
|
||||
struct ovsdb_error *error;
|
||||
error = ovsdb_storage_open(filename, true, &storage);
|
||||
if (error) {
|
||||
return error;
|
||||
bool is_relay;
|
||||
char *name;
|
||||
|
||||
is_relay = !strncmp(filename, relay_prefix, relay_prefix_len);
|
||||
if (!is_relay) {
|
||||
/* If we know that the file is already open, return a good error
|
||||
* message. Otherwise, if the file is open, we'll fail later on with
|
||||
* a harder to interpret file locking error. */
|
||||
if (is_already_open(config, filename)) {
|
||||
return ovsdb_error(NULL, "%s: already open", filename);
|
||||
}
|
||||
|
||||
error = ovsdb_storage_open(filename, true, &storage);
|
||||
if (error) {
|
||||
return error;
|
||||
}
|
||||
name = xstrdup(filename);
|
||||
} else {
|
||||
/* Parsing the relay in format 'relay:DB_NAME:<list of remotes>'*/
|
||||
relay_remotes = strchr(filename + relay_prefix_len, ':');
|
||||
|
||||
if (!relay_remotes || relay_remotes[0] == '\0') {
|
||||
return ovsdb_error(NULL, "%s: invalid syntax", filename);
|
||||
}
|
||||
name = xmemdup0(filename, relay_remotes - filename);
|
||||
storage = ovsdb_storage_create_unbacked(name + relay_prefix_len);
|
||||
relay_remotes++; /* Skip the ':'. */
|
||||
}
|
||||
|
||||
struct ovsdb_schema *schema;
|
||||
if (ovsdb_storage_is_clustered(storage)) {
|
||||
if (is_relay || ovsdb_storage_is_clustered(storage)) {
|
||||
schema = NULL;
|
||||
} else {
|
||||
struct json *txn_json;
|
||||
error = ovsdb_storage_read(storage, &schema, &txn_json, NULL);
|
||||
if (error) {
|
||||
ovsdb_storage_close(storage);
|
||||
free(name);
|
||||
return error;
|
||||
}
|
||||
ovs_assert(schema && !txn_json);
|
||||
}
|
||||
|
||||
struct db *db = xzalloc(sizeof *db);
|
||||
db->filename = xstrdup(filename);
|
||||
db->filename = name;
|
||||
db->db = ovsdb_create(schema, storage);
|
||||
ovsdb_jsonrpc_server_add_db(config->jsonrpc, db->db);
|
||||
|
||||
@@ -714,6 +751,10 @@ open_db(struct server_config *config, const char *filename)
|
||||
}
|
||||
|
||||
add_db(config, db);
|
||||
|
||||
if (is_relay) {
|
||||
ovsdb_relay_add_db(db->db, relay_remotes, update_schema, config);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@@ -1151,11 +1192,11 @@ update_database_status(struct ovsdb_row *row, struct db *db)
|
||||
{
|
||||
ovsdb_util_write_string_column(row, "name", db->db->name);
|
||||
ovsdb_util_write_string_column(row, "model",
|
||||
ovsdb_storage_get_model(db->db->storage));
|
||||
db->db->is_relay ? "relay" : ovsdb_storage_get_model(db->db->storage));
|
||||
ovsdb_util_write_bool_column(row, "connected",
|
||||
ovsdb_storage_is_connected(db->db->storage));
|
||||
ovsdb_util_write_bool_column(row, "leader",
|
||||
ovsdb_storage_is_leader(db->db->storage));
|
||||
db->db->is_relay ? false : ovsdb_storage_is_leader(db->db->storage));
|
||||
ovsdb_util_write_uuid_column(row, "cid",
|
||||
ovsdb_storage_get_cid(db->db->storage));
|
||||
ovsdb_util_write_uuid_column(row, "sid",
|
||||
|
@@ -421,6 +421,8 @@ ovsdb_create(struct ovsdb_schema *schema, struct ovsdb_storage *storage)
|
||||
ovs_list_init(&db->triggers);
|
||||
db->run_triggers_now = db->run_triggers = false;
|
||||
|
||||
db->is_relay = false;
|
||||
|
||||
shash_init(&db->tables);
|
||||
if (schema) {
|
||||
SHASH_FOR_EACH (node, &schema->tables) {
|
||||
|
@@ -91,6 +91,9 @@ struct ovsdb {
|
||||
bool need_txn_history; /* Need to maintain history of transactions. */
|
||||
unsigned int n_txn_history; /* Current number of history transactions. */
|
||||
struct ovs_list txn_history; /* Contains "struct ovsdb_txn_history_node. */
|
||||
|
||||
/* Relay mode. */
|
||||
bool is_relay;
|
||||
};
|
||||
|
||||
struct ovsdb *ovsdb_create(struct ovsdb_schema *, struct ovsdb_storage *);
|
||||
|
343
ovsdb/relay.c
Normal file
343
ovsdb/relay.c
Normal file
@@ -0,0 +1,343 @@
|
||||
/*
|
||||
* Copyright (c) 2021, Red Hat, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <config.h>
|
||||
|
||||
#include "relay.h"
|
||||
|
||||
#include "coverage.h"
|
||||
#include "jsonrpc.h"
|
||||
#include "openvswitch/hmap.h"
|
||||
#include "openvswitch/json.h"
|
||||
#include "openvswitch/list.h"
|
||||
#include "openvswitch/poll-loop.h"
|
||||
#include "openvswitch/shash.h"
|
||||
#include "openvswitch/vlog.h"
|
||||
#include "ovsdb.h"
|
||||
#include "ovsdb-cs.h"
|
||||
#include "ovsdb-error.h"
|
||||
#include "row.h"
|
||||
#include "table.h"
|
||||
#include "transaction.h"
|
||||
#include "util.h"
|
||||
|
||||
VLOG_DEFINE_THIS_MODULE(relay);
|
||||
|
||||
static struct shash relay_dbs = SHASH_INITIALIZER(&relay_dbs);
|
||||
|
||||
struct relay_ctx {
|
||||
struct ovsdb *db;
|
||||
struct ovsdb_cs *cs;
|
||||
|
||||
/* Schema updates. */
|
||||
struct ovsdb_schema *new_schema;
|
||||
schema_change_callback schema_change_cb;
|
||||
void *schema_change_aux;
|
||||
};
|
||||
|
||||
static struct json *
|
||||
ovsdb_relay_compose_monitor_request(const struct json *schema_json, void *ctx_)
|
||||
{
|
||||
struct json *monitor_request = json_object_create();
|
||||
struct relay_ctx *ctx = ctx_;
|
||||
struct ovsdb_schema *schema;
|
||||
struct ovsdb *db = ctx->db;
|
||||
struct ovsdb_error *error;
|
||||
|
||||
error = ovsdb_schema_from_json(schema_json, &schema);
|
||||
if (error) {
|
||||
char *msg = ovsdb_error_to_string_free(error);
|
||||
VLOG_WARN("%s: Failed to parse db schema: %s", db->name, msg);
|
||||
free(msg);
|
||||
/* There is nothing we can really do here. */
|
||||
return monitor_request;
|
||||
}
|
||||
|
||||
const struct shash_node *node;
|
||||
SHASH_FOR_EACH (node, &schema->tables) {
|
||||
struct json *monitor_request_array = json_array_create_empty();
|
||||
struct ovsdb_table_schema *table = node->data;
|
||||
|
||||
json_array_add(monitor_request_array, json_object_create());
|
||||
json_object_put(monitor_request, table->name, monitor_request_array);
|
||||
}
|
||||
|
||||
if (!db->schema || !ovsdb_schema_equal(schema, db->schema)) {
|
||||
VLOG_DBG("database %s schema changed.", db->name);
|
||||
if (ctx->new_schema) {
|
||||
ovsdb_schema_destroy(ctx->new_schema);
|
||||
}
|
||||
/* We will update the schema later when we will receive actual data
|
||||
* from the mointor in order to avoid sitting with an empty database
|
||||
* until the monitor reply. */
|
||||
ctx->new_schema = schema;
|
||||
} else {
|
||||
ovsdb_schema_destroy(schema);
|
||||
}
|
||||
return monitor_request;
|
||||
}
|
||||
|
||||
static struct ovsdb_cs_ops relay_cs_ops = {
|
||||
.compose_monitor_requests = ovsdb_relay_compose_monitor_request,
|
||||
};
|
||||
|
||||
void
|
||||
ovsdb_relay_add_db(struct ovsdb *db, const char *remote,
|
||||
schema_change_callback schema_change_cb,
|
||||
void *schema_change_aux)
|
||||
{
|
||||
struct relay_ctx *ctx;
|
||||
|
||||
if (!db || !remote) {
|
||||
return;
|
||||
}
|
||||
|
||||
ctx = shash_find_data(&relay_dbs, db->name);
|
||||
if (ctx) {
|
||||
ovsdb_cs_set_remote(ctx->cs, remote, true);
|
||||
VLOG_DBG("%s: relay source set to '%s'", db->name, remote);
|
||||
return;
|
||||
}
|
||||
|
||||
db->is_relay = true;
|
||||
ctx = xzalloc(sizeof *ctx);
|
||||
ctx->schema_change_cb = schema_change_cb;
|
||||
ctx->schema_change_aux = schema_change_aux;
|
||||
ctx->db = db;
|
||||
ctx->cs = ovsdb_cs_create(db->name, 3, &relay_cs_ops, ctx);
|
||||
shash_add(&relay_dbs, db->name, ctx);
|
||||
ovsdb_cs_set_leader_only(ctx->cs, false);
|
||||
ovsdb_cs_set_remote(ctx->cs, remote, true);
|
||||
|
||||
VLOG_DBG("added database: %s, %s", db->name, remote);
|
||||
}
|
||||
|
||||
void
|
||||
ovsdb_relay_del_db(struct ovsdb *db)
|
||||
{
|
||||
struct relay_ctx *ctx;
|
||||
|
||||
if (!db) {
|
||||
return;
|
||||
}
|
||||
|
||||
ctx = shash_find_and_delete(&relay_dbs, db->name);
|
||||
if (!ctx) {
|
||||
VLOG_WARN("Failed to remove relay database %s: not found.", db->name);
|
||||
return;
|
||||
}
|
||||
|
||||
VLOG_DBG("removed database: %s", db->name);
|
||||
|
||||
db->is_relay = false;
|
||||
ovsdb_cs_destroy(ctx->cs);
|
||||
free(ctx);
|
||||
}
|
||||
|
||||
static struct ovsdb_error *
|
||||
ovsdb_relay_process_row_update(struct ovsdb_table *table,
|
||||
const struct ovsdb_cs_row_update *ru,
|
||||
struct ovsdb_txn *txn)
|
||||
{
|
||||
const struct uuid *uuid = &ru->row_uuid;
|
||||
struct ovsdb_error * error = NULL;
|
||||
|
||||
/* XXX: ovsdb-cs module returns shash which was previously part of a json
|
||||
* structure and we need json row format in order to use ovsdb_row*
|
||||
* functions. Creating a json object out of shash. */
|
||||
struct json *json_row = json_object_create();
|
||||
struct shash *obj = json_row->object;
|
||||
json_row->object = CONST_CAST(struct shash *, ru->columns);
|
||||
|
||||
switch (ru->type) {
|
||||
case OVSDB_CS_ROW_DELETE:
|
||||
error = ovsdb_table_execute_delete(txn, uuid, table);
|
||||
break;
|
||||
|
||||
case OVSDB_CS_ROW_INSERT:
|
||||
error = ovsdb_table_execute_insert(txn, uuid, table, json_row);
|
||||
break;
|
||||
|
||||
case OVSDB_CS_ROW_UPDATE:
|
||||
error = ovsdb_table_execute_update(txn, uuid, table, json_row, false);
|
||||
break;
|
||||
|
||||
case OVSDB_CS_ROW_XOR:
|
||||
error = ovsdb_table_execute_update(txn, uuid, table, json_row, true);
|
||||
break;
|
||||
|
||||
default:
|
||||
OVS_NOT_REACHED();
|
||||
}
|
||||
|
||||
json_row->object = obj;
|
||||
json_destroy(json_row);
|
||||
|
||||
return error;
|
||||
}
|
||||
|
||||
static struct ovsdb_error *
|
||||
ovsdb_relay_parse_update__(struct ovsdb *db,
|
||||
const struct ovsdb_cs_db_update *du)
|
||||
{
|
||||
struct ovsdb_error *error = NULL;
|
||||
struct ovsdb_txn *txn;
|
||||
|
||||
txn = ovsdb_txn_create(db);
|
||||
|
||||
for (size_t i = 0; i < du->n; i++) {
|
||||
const struct ovsdb_cs_table_update *tu = &du->table_updates[i];
|
||||
struct ovsdb_table *table = ovsdb_get_table(db, tu->table_name);
|
||||
|
||||
if (!table) {
|
||||
error = ovsdb_error("unknown table", "unknown table %s",
|
||||
tu->table_name);
|
||||
goto exit;
|
||||
}
|
||||
|
||||
for (size_t j = 0; j < tu->n; j++) {
|
||||
const struct ovsdb_cs_row_update *ru = &tu->row_updates[j];
|
||||
|
||||
error = ovsdb_relay_process_row_update(table, ru, txn);
|
||||
if (error) {
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
exit:
|
||||
if (error) {
|
||||
ovsdb_txn_abort(txn);
|
||||
return error;
|
||||
} else {
|
||||
/* Commit transaction. */
|
||||
error = ovsdb_txn_propose_commit_block(txn, false);
|
||||
}
|
||||
|
||||
return error;
|
||||
}
|
||||
|
||||
static struct ovsdb_error *
|
||||
ovsdb_relay_clear(struct ovsdb *db)
|
||||
{
|
||||
struct ovsdb_txn *txn = ovsdb_txn_create(db);
|
||||
struct shash_node *table_node;
|
||||
|
||||
SHASH_FOR_EACH (table_node, &db->tables) {
|
||||
struct ovsdb_table *table = table_node->data;
|
||||
struct ovsdb_row *row, *next;
|
||||
|
||||
HMAP_FOR_EACH_SAFE (row, next, hmap_node, &table->rows) {
|
||||
ovsdb_txn_row_delete(txn, row);
|
||||
}
|
||||
}
|
||||
|
||||
return ovsdb_txn_propose_commit_block(txn, false);
|
||||
}
|
||||
|
||||
static void
|
||||
ovsdb_relay_parse_update(struct relay_ctx *ctx,
|
||||
const struct ovsdb_cs_update_event *update)
|
||||
{
|
||||
if (!ctx->db) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (update->monitor_reply && ctx->new_schema) {
|
||||
/* There was a schema change. Updating a database with a new schema
|
||||
* before processing monitor reply with the new data. */
|
||||
ctx->schema_change_cb(ctx->db, ctx->new_schema,
|
||||
ctx->schema_change_aux);
|
||||
ovsdb_schema_destroy(ctx->new_schema);
|
||||
ctx->new_schema = NULL;
|
||||
}
|
||||
|
||||
struct ovsdb_cs_db_update *du;
|
||||
struct ovsdb_error *error = ovsdb_cs_parse_db_update(update->table_updates,
|
||||
update->version, &du);
|
||||
if (!error) {
|
||||
if (update->clear) {
|
||||
error = ovsdb_relay_clear(ctx->db);
|
||||
}
|
||||
if (!error) {
|
||||
error = ovsdb_relay_parse_update__(ctx->db, du);
|
||||
}
|
||||
}
|
||||
ovsdb_cs_db_update_destroy(du);
|
||||
if (error) {
|
||||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
|
||||
if (!VLOG_DROP_WARN(&rl)) {
|
||||
char *s = ovsdb_error_to_string(error);
|
||||
VLOG_WARN_RL(&rl, "%s", s);
|
||||
free(s);
|
||||
}
|
||||
/* Something bad happened. Try to recover. */
|
||||
if (!strcmp(ovsdb_error_get_tag(error), "consistency violation")) {
|
||||
ovsdb_cs_flag_inconsistency(ctx->cs);
|
||||
} else {
|
||||
ovsdb_cs_force_reconnect(ctx->cs);
|
||||
}
|
||||
ovsdb_error_destroy(error);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
ovsdb_relay_run(void)
|
||||
{
|
||||
struct shash_node *node;
|
||||
SHASH_FOR_EACH (node, &relay_dbs) {
|
||||
struct relay_ctx *ctx = node->data;
|
||||
struct ovs_list events;
|
||||
|
||||
ovsdb_cs_run(ctx->cs, &events);
|
||||
|
||||
struct ovsdb_cs_event *event;
|
||||
LIST_FOR_EACH_POP (event, list_node, &events) {
|
||||
if (!ctx->db) {
|
||||
ovsdb_cs_event_destroy(event);
|
||||
continue;
|
||||
}
|
||||
|
||||
switch (event->type) {
|
||||
case OVSDB_CS_EVENT_TYPE_RECONNECT:
|
||||
/* Nothing to do. */
|
||||
break;
|
||||
|
||||
case OVSDB_CS_EVENT_TYPE_UPDATE:
|
||||
ovsdb_relay_parse_update(ctx, &event->update);
|
||||
break;
|
||||
|
||||
case OVSDB_CS_EVENT_TYPE_TXN_REPLY:
|
||||
case OVSDB_CS_EVENT_TYPE_LOCKED:
|
||||
/* Not expected. */
|
||||
break;
|
||||
}
|
||||
ovsdb_cs_event_destroy(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
ovsdb_relay_wait(void)
|
||||
{
|
||||
struct shash_node *node;
|
||||
|
||||
SHASH_FOR_EACH (node, &relay_dbs) {
|
||||
struct relay_ctx *ctx = node->data;
|
||||
|
||||
ovsdb_cs_wait(ctx->cs);
|
||||
}
|
||||
}
|
34
ovsdb/relay.h
Normal file
34
ovsdb/relay.h
Normal file
@@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Copyright (c) 2021, Red Hat, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef OVSDB_RELAY_H
|
||||
#define OVSDB_RELAY_H 1
|
||||
|
||||
struct json;
|
||||
struct ovsdb;
|
||||
struct ovsdb_schema;
|
||||
|
||||
typedef void (*schema_change_callback)(struct ovsdb *,
|
||||
const struct ovsdb_schema *, void *aux);
|
||||
|
||||
void ovsdb_relay_add_db(struct ovsdb *, const char *remote,
|
||||
schema_change_callback schema_change_cb,
|
||||
void *schema_change_aux);
|
||||
void ovsdb_relay_del_db(struct ovsdb *);
|
||||
void ovsdb_relay_run(void);
|
||||
void ovsdb_relay_wait(void);
|
||||
|
||||
#endif /* OVSDB_RELAY_H */
|
Reference in New Issue
Block a user