2
0
mirror of https://github.com/openvswitch/ovs synced 2025-08-31 14:25:26 +00:00

ovsdb-idl: Tolerate missing tables and columns.

Until now, if ovs-vsctl (or another client of the C ovsdb-idl library) was
compiled against a schema that had a column or table that was not in the
database actually being used (e.g. during an upgrade), and the column or
table was selected for monitoring, then ovsdb-idl would fail to get any
data at all because ovsdb-server would report an error due to a request
about a column or a table it didn't know about.

This commit fixes the problem by making ovsdb-idl retrieve the database
schema from the database server and omit any tables or columns that don't
exist from its monitoring request.  This works OK for the kinds of upgrades
that OVSDB otherwise supports gracefully because it will simply make the
missing columns or tables appear empty, which clients of the ovsdb-idl
library already have to tolerate.

VMware-BZ: #1413562
Reported-by: Alex Wang <alexw@nicira.com>
Signed-off-by: Ben Pfaff <blp@nicira.com>
Acked-by: Alex Wang <alexw@nicira.com>
This commit is contained in:
Ben Pfaff
2015-03-19 23:45:42 -07:00
parent da1e25d5af
commit d18e52e3d9
6 changed files with 357 additions and 31 deletions

View File

@@ -28,11 +28,15 @@
#include "fatal-signal.h"
#include "json.h"
#include "jsonrpc.h"
#include "ovsdb/ovsdb.h"
#include "ovsdb/table.h"
#include "ovsdb-data.h"
#include "ovsdb-error.h"
#include "ovsdb-idl-provider.h"
#include "ovsdb-parser.h"
#include "poll-loop.h"
#include "shash.h"
#include "sset.h"
#include "util.h"
#include "openvswitch/vlog.h"
@@ -71,16 +75,25 @@ struct ovsdb_idl_arc {
struct ovsdb_idl_row *dst; /* Destination row. */
};
enum ovsdb_idl_state {
IDL_S_SCHEMA_REQUESTED,
IDL_S_MONITOR_REQUESTED,
IDL_S_MONITORING
};
struct ovsdb_idl {
const struct ovsdb_idl_class *class;
struct jsonrpc_session *session;
struct shash table_by_name;
struct ovsdb_idl_table *tables; /* Contains "struct ovsdb_idl_table *"s.*/
struct json *monitor_request_id;
unsigned int last_monitor_request_seqno;
unsigned int change_seqno;
bool verify_write_only;
/* Session state. */
unsigned int state_seqno;
enum ovsdb_idl_state state;
struct json *request_id;
/* Database locking. */
char *lock_name; /* Name of lock we need, NULL if none. */
bool has_lock; /* Has db server told us we have the lock? */
@@ -124,7 +137,9 @@ static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
static struct vlog_rate_limit semantic_rl = VLOG_RATE_LIMIT_INIT(1, 5);
static void ovsdb_idl_clear(struct ovsdb_idl *);
static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *);
static void ovsdb_idl_send_schema_request(struct ovsdb_idl *);
static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *,
const struct json *schema_json);
static void ovsdb_idl_parse_update(struct ovsdb_idl *, const struct json *);
static struct ovsdb_error *ovsdb_idl_parse_update__(struct ovsdb_idl *,
const struct json *);
@@ -214,7 +229,10 @@ ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class,
hmap_init(&table->rows);
table->idl = idl;
}
idl->last_monitor_request_seqno = UINT_MAX;
idl->state_seqno = UINT_MAX;
idl->request_id = NULL;
hmap_init(&idl->outstanding_txns);
return idl;
@@ -239,7 +257,7 @@ ovsdb_idl_destroy(struct ovsdb_idl *idl)
}
shash_destroy(&idl->table_by_name);
free(idl->tables);
json_destroy(idl->monitor_request_id);
json_destroy(idl->request_id);
free(idl->lock_name);
json_destroy(idl->lock_request_id);
hmap_destroy(&idl->outstanding_txns);
@@ -298,14 +316,17 @@ ovsdb_idl_run(struct ovsdb_idl *idl)
unsigned int seqno;
seqno = jsonrpc_session_get_seqno(idl->session);
if (idl->last_monitor_request_seqno != seqno) {
idl->last_monitor_request_seqno = seqno;
if (idl->state_seqno != seqno) {
idl->state_seqno = seqno;
json_destroy(idl->request_id);
idl->request_id = NULL;
ovsdb_idl_txn_abort_all(idl);
ovsdb_idl_send_monitor_request(idl);
ovsdb_idl_send_schema_request(idl);
idl->state = IDL_S_SCHEMA_REQUESTED;
if (idl->lock_name) {
ovsdb_idl_send_lock_request(idl);
}
break;
}
msg = jsonrpc_session_recv(idl->session);
@@ -321,14 +342,31 @@ ovsdb_idl_run(struct ovsdb_idl *idl)
/* Database contents changed. */
ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1]);
} else if (msg->type == JSONRPC_REPLY
&& idl->monitor_request_id
&& json_equal(idl->monitor_request_id, msg->id)) {
/* Reply to our "monitor" request. */
idl->change_seqno++;
json_destroy(idl->monitor_request_id);
idl->monitor_request_id = NULL;
ovsdb_idl_clear(idl);
ovsdb_idl_parse_update(idl, msg->result);
&& idl->request_id
&& json_equal(idl->request_id, msg->id)) {
switch (idl->state) {
case IDL_S_SCHEMA_REQUESTED:
/* Reply to our "get_schema" request. */
json_destroy(idl->request_id);
idl->request_id = NULL;
ovsdb_idl_send_monitor_request(idl, msg->result);
idl->state = IDL_S_MONITOR_REQUESTED;
break;
case IDL_S_MONITOR_REQUESTED:
/* Reply to our "monitor" request. */
idl->change_seqno++;
json_destroy(idl->request_id);
idl->request_id = NULL;
idl->state = IDL_S_MONITORING;
ovsdb_idl_clear(idl);
ovsdb_idl_parse_update(idl, msg->result);
break;
case IDL_S_MONITORING:
default:
OVS_NOT_REACHED();
}
} else if (msg->type == JSONRPC_REPLY
&& idl->lock_request_id
&& json_equal(idl->lock_request_id, msg->id)) {
@@ -555,8 +593,107 @@ ovsdb_idl_omit(struct ovsdb_idl *idl, const struct ovsdb_idl_column *column)
}
static void
ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
ovsdb_idl_send_schema_request(struct ovsdb_idl *idl)
{
struct jsonrpc_msg *msg;
json_destroy(idl->request_id);
msg = jsonrpc_create_request(
"get_schema",
json_array_create_1(json_string_create(idl->class->database)),
&idl->request_id);
jsonrpc_session_send(idl->session, msg);
}
static void
log_error(struct ovsdb_error *error)
{
char *s = ovsdb_error_to_string(error);
VLOG_WARN("error parsing database schema: %s", s);
free(s);
ovsdb_error_destroy(error);
}
/* Frees 'schema', which is in the format returned by parse_schema(). */
static void
free_schema(struct shash *schema)
{
if (schema) {
struct shash_node *node, *next;
SHASH_FOR_EACH_SAFE (node, next, schema) {
struct sset *sset = node->data;
sset_destroy(sset);
free(sset);
shash_delete(schema, node);
}
shash_destroy(schema);
free(schema);
}
}
/* Parses 'schema_json', an OVSDB schema in JSON format as described in RFC
* 7047, to obtain the names of its rows and columns. If successful, returns
* an shash whose keys are table names and whose values are ssets, where each
* sset contains the names of its table's columns. On failure (due to a parse
* error), returns NULL.
*
* It would also be possible to use the general-purpose OVSDB schema parser in
* ovsdb-server, but that's overkill, possibly too strict for the current use
* case, and would require restructuring ovsdb-server to separate the schema
* code from the rest. */
static struct shash *
parse_schema(const struct json *schema_json)
{
struct ovsdb_parser parser;
const struct json *tables_json;
struct ovsdb_error *error;
struct shash_node *node;
struct shash *schema;
ovsdb_parser_init(&parser, schema_json, "database schema");
tables_json = ovsdb_parser_member(&parser, "tables", OP_OBJECT);
error = ovsdb_parser_destroy(&parser);
if (error) {
log_error(error);
return NULL;
}
schema = xmalloc(sizeof *schema);
shash_init(schema);
SHASH_FOR_EACH (node, json_object(tables_json)) {
const char *table_name = node->name;
const struct json *json = node->data;
const struct json *columns_json;
ovsdb_parser_init(&parser, json, "table schema for table %s",
table_name);
columns_json = ovsdb_parser_member(&parser, "columns", OP_OBJECT);
error = ovsdb_parser_destroy(&parser);
if (error) {
log_error(error);
free_schema(schema);
return NULL;
}
struct sset *columns = xmalloc(sizeof *columns);
sset_init(columns);
struct shash_node *node2;
SHASH_FOR_EACH (node2, json_object(columns_json)) {
const char *column_name = node2->name;
sset_add(columns, column_name);
}
shash_add(schema, table_name, columns);
}
return schema;
}
static void
ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl,
const struct json *schema_json)
{
struct shash *schema = parse_schema(schema_json);
struct json *monitor_requests;
struct jsonrpc_msg *msg;
size_t i;
@@ -566,12 +703,25 @@ ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
const struct ovsdb_idl_table *table = &idl->tables[i];
const struct ovsdb_idl_table_class *tc = table->class;
struct json *monitor_request, *columns;
const struct sset *table_schema;
size_t j;
table_schema = (schema
? shash_find_data(schema, table->class->name)
: NULL);
columns = table->need_table ? json_array_create_empty() : NULL;
for (j = 0; j < tc->n_columns; j++) {
const struct ovsdb_idl_column *column = &tc->columns[j];
if (table->modes[j] & OVSDB_IDL_MONITOR) {
if (table_schema
&& !sset_contains(table_schema, column->name)) {
VLOG_WARN("%s table in %s database lacks %s column "
"(database needs upgrade?)",
table->class->name, idl->class->database,
column->name);
continue;
}
if (!columns) {
columns = json_array_create_empty();
}
@@ -580,18 +730,27 @@ ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl)
}
if (columns) {
if (schema && !table_schema) {
VLOG_WARN("%s database lacks %s table "
"(database needs upgrade?)",
idl->class->database, table->class->name);
json_destroy(columns);
continue;
}
monitor_request = json_object_create();
json_object_put(monitor_request, "columns", columns);
json_object_put(monitor_requests, tc->name, monitor_request);
}
}
free_schema(schema);
json_destroy(idl->monitor_request_id);
json_destroy(idl->request_id);
msg = jsonrpc_create_request(
"monitor",
json_array_create_3(json_string_create(idl->class->database),
json_null_create(), monitor_requests),
&idl->monitor_request_id);
&idl->request_id);
jsonrpc_session_send(idl->session, msg);
}
@@ -2389,11 +2548,11 @@ static void
ovsdb_idl_update_has_lock(struct ovsdb_idl *idl, bool new_has_lock)
{
if (new_has_lock && !idl->has_lock) {
if (!idl->monitor_request_id) {
if (idl->state == IDL_S_MONITORING) {
idl->change_seqno++;
} else {
/* We're waiting for a monitor reply, so don't signal that the
* database changed. The monitor reply will increment change_seqno
/* We're setting up a session, so don't signal that the database
* changed. Finalizing the session will increment change_seqno
* anyhow. */
}
idl->is_lock_contended = false;