2
0
mirror of https://github.com/openvswitch/ovs synced 2025-08-22 09:58:01 +00:00

ovsdb: Reimplement replication. Using a state machine.

Current replication uses blocking transactions, which are error prone
in practice, especially in handling RPC connection flapping to the
active server.

Signed-off-by: Andy Zhou <azhou@ovn.org>
Acked-by: Ben Pfaff <blp@ovn.org>
This commit is contained in:
Andy Zhou 2016-08-23 13:57:37 -07:00
parent 5dd81c22a6
commit 23c16b5124
3 changed files with 333 additions and 305 deletions

View File

@ -137,7 +137,7 @@ ovsdb_replication_init(struct shash *all_dbs)
struct shash_node *node;
SHASH_FOR_EACH (node, all_dbs) {
struct db *db = node->data;
replication_add_db(db->db->schema->name, db->db);
replication_add_local_db(db->db->schema->name, db->db);
}
}
@ -188,7 +188,11 @@ main_loop(struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs,
ovsdb_jsonrpc_server_run(jsonrpc);
if (is_backup_server) {
replication_run();
replication_run();
if (!replication_is_alive()) {
int retval = replication_get_last_error();
ovs_fatal(retval, "replication connection failed");
}
}
SHASH_FOR_EACH(node, all_dbs) {
@ -212,6 +216,7 @@ main_loop(struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs,
if (is_backup_server) {
replication_wait();
}
ovsdb_jsonrpc_server_wait(jsonrpc);
unixctl_server_wait(unixctl);
SHASH_FOR_EACH(node, all_dbs) {
@ -231,7 +236,6 @@ main_loop(struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs,
}
}
disconnect_active_server();
free(remotes_error);
}
@ -1346,6 +1350,9 @@ ovsdb_server_add_database(struct unixctl_conn *conn, int argc OVS_UNUSED,
error = open_db(config, filename);
if (!error) {
save_config(config);
if (is_backup_server) {
ovsdb_replication_init(config->all_dbs);
}
unixctl_command_reply(conn, NULL);
} else {
unixctl_command_reply_error(conn, error);
@ -1376,6 +1383,9 @@ ovsdb_server_remove_database(struct unixctl_conn *conn, int argc OVS_UNUSED,
shash_delete(config->all_dbs, node);
save_config(config);
if (is_backup_server) {
ovsdb_replication_init(config->all_dbs);
}
unixctl_command_reply(conn, NULL);
}

View File

@ -1,6 +1,6 @@
/*
* (c) Copyright 2016 Hewlett Packard Enterprise Development LP
* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016 Nicira, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -38,27 +38,16 @@
VLOG_DEFINE_THIS_MODULE(replication);
static char *active_ovsdb_server;
static struct jsonrpc *rpc;
static struct sset monitored_tables = SSET_INITIALIZER(&monitored_tables);
static bool reset_dbs = true;
static struct jsonrpc_session *session = NULL;
static unsigned int session_seqno = UINT_MAX;
static struct jsonrpc *open_jsonrpc(const char *server);
static struct ovsdb_error *check_jsonrpc_error(int error,
struct jsonrpc_msg **reply_);
static void fetch_dbs(struct jsonrpc *rpc, struct svec *dbs);
static struct ovsdb_schema *fetch_schema(struct jsonrpc *rpc,
const char *database);
static void send_monitor_requests(void);
static struct jsonrpc_msg *create_monitor_request(struct ovsdb *db);
static void add_monitored_table(struct ovsdb_table_schema *table,
struct json *monitor_requests);
static void get_initial_db_state(struct ovsdb *db);
static void reset_database(struct ovsdb *db, struct ovsdb_txn *txn);
static struct ovsdb_error *reset_databases(void);
static struct ovsdb_error *reset_database(struct ovsdb *db);
static void check_for_notifications(void);
static void process_notification(struct json *table_updates, struct ovsdb *db);
static struct ovsdb_error *process_notification(struct json *, struct ovsdb *);
static struct ovsdb_error *process_table_update(struct json *table_update,
const char *table_name,
struct ovsdb *database,
@ -97,11 +86,23 @@ bool request_ids_lookup_and_free(const struct json *id, struct ovsdb **db);
static void request_ids_destroy(void);
void request_ids_clear(void);
enum ovsdb_replication_state {
RPL_S_DB_REQUESTED,
RPL_S_SCHEMA_REQUESTED,
RPL_S_MONITOR_REQUESTED,
RPL_S_REPLICATING,
RPL_S_ERR /* Error, no longer replicating. */
};
static enum ovsdb_replication_state state;
/* Currently replicating DBs.
* replication_dbs is an shash of 'struct ovsdb *'s that stores the
* replicating dbs. */
static struct shash replication_dbs = SHASH_INITIALIZER(&replication_dbs);
/* All DBs known to ovsdb-server. The actual replication dbs are stored
* in 'replication dbs', which is a subset of all dbs and remote dbs whose
* schema matches. */
static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs);
static struct shash *replication_dbs = NULL;
static struct shash *replication_db_clone(struct shash *dbs);
/* Find 'struct ovsdb' by name within 'replication_dbs' */
static struct ovsdb* find_db(const char *db_name);
@ -109,56 +110,211 @@ static struct ovsdb* find_db(const char *db_name);
void
replication_init(void)
{
shash_clear(&replication_dbs);
if (rpc) {
disconnect_active_server();
shash_destroy(replication_dbs);
replication_dbs = NULL;
shash_clear(&local_dbs);
if (session) {
jsonrpc_session_close(session);
}
reset_dbs = true;
session = jsonrpc_session_open(active_ovsdb_server, true);
session_seqno = UINT_MAX;
}
void
replication_add_db(const char *database, struct ovsdb *db)
replication_add_local_db(const char *database, struct ovsdb *db)
{
struct shash_node *node = xmalloc(sizeof *node);
shash_add_assert(&replication_dbs, database, db);
shash_add_assert(&local_dbs, database, db);
}
void
replication_run(void)
{
if (sset_is_empty(&monitored_tables) && active_ovsdb_server) {
/* Reset local databases. */
if (reset_dbs) {
struct ovsdb_error *error = reset_databases();
if (error) {
/* In case reset DB fails, log the error before exiting. */
char *msg = ovsdb_error_to_string(error);
ovsdb_error_destroy(error);
VLOG_FATAL("Failed to reset DB (%s).", msg);
}
reset_dbs = false;
}
/* Open JSON-RPC. */
jsonrpc_close(rpc);
rpc = open_jsonrpc(active_ovsdb_server);
if (!rpc) {
return;
}
/* Send monitor requests. */
send_monitor_requests();
if (!session) {
return;
}
if (!sset_is_empty(&monitored_tables)) {
check_for_notifications();
jsonrpc_session_run(session);
for (int i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) {
struct jsonrpc_msg *msg;
unsigned int seqno;
seqno = jsonrpc_session_get_seqno(session);
if (seqno != session_seqno) {
session_seqno = seqno;
request_ids_clear();
struct jsonrpc_msg *request;
request = jsonrpc_create_request("list_dbs",
json_array_create_empty(), NULL);
request_ids_add(request->id, NULL);
jsonrpc_session_send(session, request);
shash_destroy(replication_dbs);
replication_dbs = replication_db_clone(&local_dbs);
state = RPL_S_DB_REQUESTED;
}
msg = jsonrpc_session_recv(session);
if (!msg) {
continue;
}
if (msg->type == JSONRPC_NOTIFY && state != RPL_S_ERR
&& !strcmp(msg->method, "update")) {
if (msg->params->type == JSON_ARRAY
&& msg->params->u.array.n == 2
&& msg->params->u.array.elems[0]->type == JSON_STRING) {
char *db_name = msg->params->u.array.elems[0]->u.string;
struct ovsdb *db = find_db(db_name);
if (db) {
struct ovsdb_error *error;
error = process_notification(msg->params->u.array.elems[1],
db);
if (error) {
ovsdb_error_assert(error);
state = RPL_S_ERR;
}
}
}
} else if (msg->type == JSONRPC_REPLY) {
struct ovsdb *db;
if (!request_ids_lookup_and_free(msg->id, &db)) {
VLOG_WARN("received unexpected reply");
goto next;
}
switch (state) {
case RPL_S_DB_REQUESTED:
if (msg->result->type != JSON_ARRAY) {
struct ovsdb_error *error;
error = ovsdb_error("list-dbs failed",
"list_dbs response is not array");
ovsdb_error_assert(error);
state = RPL_S_ERR;
} else {
size_t i;
for (i = 0; i < msg->result->u.array.n; i++) {
const struct json *name = msg->result->u.array.elems[i];
if (name->type == JSON_STRING) {
/* Send one schema request for each remote DB. */
const char *db_name = json_string(name);
struct ovsdb *db = find_db(db_name);
if (db) {
struct jsonrpc_msg *request =
jsonrpc_create_request(
"get_schema",
json_array_create_1(
json_string_create(db_name)),
NULL);
request_ids_add(request->id, db);
jsonrpc_session_send(session, request);
}
}
}
state = RPL_S_SCHEMA_REQUESTED;
}
break;
case RPL_S_SCHEMA_REQUESTED: {
struct ovsdb_schema *schema;
struct ovsdb_error *error;
error = ovsdb_schema_from_json(msg->result, &schema);
if (error) {
ovsdb_error_assert(error);
state = RPL_S_ERR;
}
if (db != find_db(schema->name)) {
/* Unexpected schema. */
VLOG_WARN("unexpected schema %s", schema->name);
state = RPL_S_ERR;
} else if (!ovsdb_schema_equal(schema, db->schema)) {
/* Schmea version mismatch. */
VLOG_INFO("Schema version mismatch, %s not replicated",
schema->name);
shash_find_and_delete(replication_dbs, schema->name);
}
ovsdb_schema_destroy(schema);
/* After receiving schemas, reset the local databases that
* will be monitored and send out monitor requests for them. */
if (hmap_is_empty(&request_ids)) {
struct shash_node *node, *next;
SHASH_FOR_EACH_SAFE (node, next, replication_dbs) {
db = node->data;
struct ovsdb_error *error = reset_database(db);
if (error) {
const char *db_name = db->schema->name;
shash_find_and_delete(replication_dbs, db_name);
ovsdb_error_assert(error);
VLOG_WARN("Failed to reset database, "
"%s not replicated.", db_name);
}
}
if (shash_is_empty(replication_dbs)) {
VLOG_WARN("Nothing to replicate.");
state = RPL_S_ERR;
} else {
SHASH_FOR_EACH (node, replication_dbs) {
db = node->data;
struct ovsdb *db = node->data;
struct jsonrpc_msg *request =
create_monitor_request(db);
request_ids_add(request->id, db);
jsonrpc_session_send(session, request);
state = RPL_S_MONITOR_REQUESTED;
}
}
}
break;
}
case RPL_S_MONITOR_REQUESTED: {
/* Reply to monitor requests. */
struct ovsdb_error *error;
error = process_notification(msg->result, db);
if (error) {
ovsdb_error_assert(error);
state = RPL_S_ERR;
} else {
/* Transition to replicating state after receiving
* all replies of "monitor" requests. */
if (hmap_is_empty(&request_ids)) {
state = RPL_S_REPLICATING;
}
}
break;
}
case RPL_S_ERR:
/* Ignore all messages */
break;
case RPL_S_REPLICATING:
default:
OVS_NOT_REACHED();
}
}
next:
jsonrpc_msg_destroy(msg);
}
}
void
replication_wait(void)
{
if (rpc) {
jsonrpc_wait(rpc);
if (session) {
jsonrpc_session_wait(session);
jsonrpc_session_recv_wait(session);
}
}
@ -291,17 +447,13 @@ blacklist_tables_find(const char *database, const char *table)
void
disconnect_active_server(void)
{
jsonrpc_close(rpc);
rpc = NULL;
sset_clear(&monitored_tables);
shash_clear(&replication_dbs);
jsonrpc_session_close(session);
session = NULL;
}
void
replication_destroy(void)
{
disconnect_active_server();
sset_destroy(&monitored_tables);
blacklist_tables_clear();
shash_destroy(&blacklist_tables);
@ -311,34 +463,22 @@ replication_destroy(void)
}
request_ids_destroy();
shash_destroy(&replication_dbs);
shash_destroy(replication_dbs);
replication_dbs = NULL;
shash_destroy(&local_dbs);
}
static struct ovsdb *
find_db(const char *db_name)
{
return shash_find_data(&replication_dbs, db_name);
return shash_find_data(replication_dbs, db_name);
}
static struct ovsdb_error *
reset_databases(void)
{
struct shash_node *db_node;
struct ovsdb_error *error = NULL;
SHASH_FOR_EACH (db_node, &replication_dbs) {
struct ovsdb *db = db_node->data;
struct ovsdb_txn *txn = ovsdb_txn_create(db);
reset_database(db, txn);
error = ovsdb_txn_commit(txn, false);
}
return error;
}
static void
reset_database(struct ovsdb *db, struct ovsdb_txn *txn)
reset_database(struct ovsdb *db)
{
struct ovsdb_txn *txn = ovsdb_txn_create(db);
struct shash_node *table_node;
SHASH_FOR_EACH (table_node, &db->tables) {
@ -351,169 +491,45 @@ reset_database(struct ovsdb *db, struct ovsdb_txn *txn)
}
}
}
return ovsdb_txn_commit(txn, false);
}
static struct jsonrpc *
open_jsonrpc(const char *server)
/* Create a monitor request for 'db'. The monitor request will include
* any tables from 'blacklisted_tables'
*
* Caller is responsible for disposing 'request'.
*/
static struct jsonrpc_msg *
create_monitor_request(struct ovsdb *db)
{
struct stream *stream;
int error;
struct jsonrpc_msg *request;
struct json *monitor;
struct ovsdb_schema *schema = db->schema;
const char *db_name = schema->name;
error = jsonrpc_stream_open(server, &stream, DSCP_DEFAULT);
struct json *monitor_request = json_object_create();
size_t n = shash_count(&schema->tables);
const struct shash_node **nodes = shash_sort(&schema->tables);
return error ? NULL : jsonrpc_open(stream);
}
for (int j = 0; j < n; j++) {
struct ovsdb_table_schema *table = nodes[j]->data;
static struct ovsdb_error *
check_jsonrpc_error(int error, struct jsonrpc_msg **reply_)
{
struct jsonrpc_msg *reply = *reply_;
if (error) {
return ovsdb_error("transaction failed",
"transaction returned error %d",
error);
}
if (reply->error) {
return ovsdb_error("transaction failed",
"transaction returned error: %s",
json_to_string(reply->error, 0));
}
return NULL;
}
static void
fetch_dbs(struct jsonrpc *rpc, struct svec *dbs)
{
struct jsonrpc_msg *request, *reply;
struct ovsdb_error *error;
size_t i;
request = jsonrpc_create_request("list_dbs", json_array_create_empty(),
NULL);
error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply),
&reply);
if (error) {
ovsdb_error_assert(error);
return;
}
if (reply->result->type != JSON_ARRAY) {
ovsdb_error_assert(ovsdb_error("list-dbs failed",
"list_dbs response is not array"));
return;
}
for (i = 0; i < reply->result->u.array.n; i++) {
const struct json *name = reply->result->u.array.elems[i];
if (name->type != JSON_STRING) {
ovsdb_error_assert(ovsdb_error(
"list_dbs failed",
"list_dbs response %"PRIuSIZE" is not string",
i));
}
svec_add(dbs, name->u.string);
}
jsonrpc_msg_destroy(reply);
svec_sort(dbs);
}
static struct ovsdb_schema *
fetch_schema(struct jsonrpc *rpc, const char *database)
{
struct jsonrpc_msg *request, *reply;
struct ovsdb_schema *schema;
struct ovsdb_error *error;
request = jsonrpc_create_request("get_schema",
json_array_create_1(
json_string_create(database)),
NULL);
error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply),
&reply);
if (error) {
jsonrpc_msg_destroy(reply);
ovsdb_error_assert(error);
return NULL;
}
error = ovsdb_schema_from_json(reply->result, &schema);
if (error) {
jsonrpc_msg_destroy(reply);
ovsdb_error_assert(error);
return NULL;
}
jsonrpc_msg_destroy(reply);
return schema;
}
static void
send_monitor_requests(void)
{
const char *db_name;
struct svec dbs;
size_t i;
svec_init(&dbs);
fetch_dbs(rpc, &dbs);
SVEC_FOR_EACH (i, db_name, &dbs) {
struct ovsdb *db = find_db(db_name);
if (db) {
struct ovsdb_schema *local_schema, *remote_schema;
local_schema = db->schema;
remote_schema = fetch_schema(rpc, db_name);
if (ovsdb_schema_equal(local_schema, remote_schema)) {
struct jsonrpc_msg *request;
struct json *monitor, *monitor_request;
monitor_request = json_object_create();
size_t n = shash_count(&local_schema->tables);
const struct shash_node **nodes = shash_sort(
&local_schema->tables);
for (int j = 0; j < n; j++) {
struct ovsdb_table_schema *table = nodes[j]->data;
/* Monitor all tables not blacklisted. */
if (!blacklist_tables_find(db_name, table->name)) {
add_monitored_table(table, monitor_request);
}
}
free(nodes);
/* Send monitor request. */
monitor = json_array_create_3(
json_string_create(db_name),
json_string_create(db_name),
monitor_request);
request = jsonrpc_create_request("monitor", monitor, NULL);
jsonrpc_send(rpc, request);
get_initial_db_state(db);
}
ovsdb_schema_destroy(remote_schema);
/* Monitor all tables not blacklisted. */
if (!blacklist_tables_find(db_name, table->name)) {
add_monitored_table(table, monitor_request);
}
}
svec_destroy(&dbs);
}
free(nodes);
static void
get_initial_db_state(struct ovsdb *db)
{
struct jsonrpc_msg *msg;
/* Create a monitor request. */
monitor = json_array_create_3(
json_string_create(db_name),
json_string_create(db_name),
monitor_request);
request = jsonrpc_create_request("monitor", monitor, NULL);
jsonrpc_recv_block(rpc, &msg);
if (msg->type == JSONRPC_REPLY) {
process_notification(msg->result, db);
}
jsonrpc_msg_destroy(msg);
return request;
}
static void
@ -522,91 +538,44 @@ add_monitored_table(struct ovsdb_table_schema *table,
{
struct json *monitor_request_array;
sset_add(&monitored_tables, table->name);
monitor_request_array = json_array_create_empty();
json_array_add(monitor_request_array, json_object_create());
json_object_put(monitor_request, table->name, monitor_request_array);
}
static void
check_for_notifications(void)
{
struct jsonrpc_msg *msg;
int error;
error = jsonrpc_recv(rpc, &msg);
if (error == EAGAIN) {
return;
} else if (error) {
jsonrpc_close(rpc);
rpc = open_jsonrpc(active_ovsdb_server);
if (!rpc) {
/* Active server went down. */
disconnect_active_server();
}
jsonrpc_msg_destroy(msg);
return;
}
if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params),
msg->id));
} else if (msg->type == JSONRPC_NOTIFY
&& !strcmp(msg->method, "update")) {
struct json *params = msg->params;
if (params->type == JSON_ARRAY
&& params->u.array.n == 2) {
char *db_name = params->u.array.elems[0]->u.string;
struct ovsdb *db = find_db(db_name);
if (db) {
process_notification(params->u.array.elems[1], db);
}
}
}
jsonrpc_msg_destroy(msg);
jsonrpc_run(rpc);
}
static void
static struct ovsdb_error *
process_notification(struct json *table_updates, struct ovsdb *db)
{
struct ovsdb_error *error;
struct ovsdb_error *error = NULL;
struct ovsdb_txn *txn;
if (table_updates->type != JSON_OBJECT) {
sset_clear(&monitored_tables);
return;
}
if (table_updates->type == JSON_OBJECT) {
txn = ovsdb_txn_create(db);
txn = ovsdb_txn_create(db);
error = NULL;
/* Process each table update. */
struct shash_node *node;
SHASH_FOR_EACH (node, json_object(table_updates)) {
struct json *table_update = node->data;
if (table_update) {
error = process_table_update(table_update, node->name, db, txn);
if (error) {
break;
/* Process each table update. */
struct shash_node *node;
SHASH_FOR_EACH (node, json_object(table_updates)) {
struct json *table_update = node->data;
if (table_update) {
error = process_table_update(table_update, node->name, db, txn);
if (error) {
break;
}
}
}
if (error) {
ovsdb_txn_abort(txn);
return error;
} else {
/* Commit transaction. */
error = ovsdb_txn_commit(txn, false);
}
}
if (error) {
ovsdb_txn_abort(txn);
goto error;
}
/* Commit transaction. */
error = ovsdb_txn_commit(txn, false);
error:
if (error) {
ovsdb_error_assert(error);
disconnect_active_server();
}
return error;
}
static struct ovsdb_error *
@ -839,6 +808,53 @@ request_ids_clear(void)
hmap_init(&request_ids);
}
static struct shash *
replication_db_clone(struct shash *dbs)
{
struct shash *new = xmalloc(sizeof *new);
shash_init(new);
struct shash_node *node;
SHASH_FOR_EACH (node, dbs) {
shash_add(new, node->name, node->data);
}
return new;
}
/* Return true if replication just started or is ongoing.
* Return false if the connection failed, or the replication
* was not able to start. */
bool
replication_is_alive(void)
{
if (session) {
return jsonrpc_session_is_alive(session) && state != RPL_S_ERR;
}
return false;
}
/* Return the last error reported on a connection by 'session'. The
* return value is 0 if replication is not currently running, or
* if replication session has not encountered any error.
*
* Return a negative value if replication session has error, or the
* replication was not able to start. */
int
replication_get_last_error(void)
{
int err = 0;
if (session) {
err = jsonrpc_session_get_last_error(session);
if (!err) {
err = (state == RPL_S_ERR) ? ENOENT : 0;
}
}
return err;
}
void
replication_usage(void)
{

View File

@ -26,7 +26,9 @@ void replication_run(void);
void replication_wait(void);
void replication_destroy(void);
void replication_usage(void);
void replication_add_db(const char *databse, struct ovsdb *db);
void replication_add_local_db(const char *databse, struct ovsdb *db);
bool replication_is_alive(void);
int replication_get_last_error(void);
void set_active_ovsdb_server(const char *remote_server);
const char *get_active_ovsdb_server(void);