mirror of
https://github.com/openvswitch/ovs
synced 2025-10-21 14:49:41 +00:00
log: Add async commit support.
The OVSDB log code has always had the ability to commit the log to disk and wait for the commit to finish. This patch introduces a new feature that allows the client to start a commit in the background and then to determine asynchronously that the commit has completed. This will be especially useful later for the distributed database feature. Signed-off-by: Ben Pfaff <blp@ovn.org> Reviewed-by: Yifeng Sun <pkusunyifeng@gmail.com>
This commit is contained in:
@@ -670,7 +670,7 @@ ovsdb_file_compact(struct ovsdb_file *file)
|
||||
|
||||
/* Commit the old version, so that we can be assured that we'll eventually
|
||||
* have either the old or the new version. */
|
||||
error = ovsdb_log_commit(file->log);
|
||||
error = ovsdb_log_commit_block(file->log);
|
||||
if (error) {
|
||||
goto exit;
|
||||
}
|
||||
@@ -866,7 +866,7 @@ ovsdb_file_txn_commit(struct json *json, const char *comment,
|
||||
}
|
||||
|
||||
if (durable) {
|
||||
error = ovsdb_log_commit(log);
|
||||
error = ovsdb_log_commit_block(log);
|
||||
if (error) {
|
||||
return ovsdb_wrap_error(error, "committing transaction failed");
|
||||
}
|
||||
|
152
ovsdb/log.c
152
ovsdb/log.c
@@ -24,12 +24,17 @@
|
||||
#include <sys/stat.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "lockfile.h"
|
||||
#include "openvswitch/dynamic-string.h"
|
||||
#include "openvswitch/json.h"
|
||||
#include "openvswitch/vlog.h"
|
||||
#include "lockfile.h"
|
||||
#include "ovsdb.h"
|
||||
#include "ovs-atomic.h"
|
||||
#include "ovs-rcu.h"
|
||||
#include "ovs-thread.h"
|
||||
#include "ovsdb-error.h"
|
||||
#include "ovsdb.h"
|
||||
#include "openvswitch/poll-loop.h"
|
||||
#include "seq.h"
|
||||
#include "sha1.h"
|
||||
#include "socket-util.h"
|
||||
#include "transaction.h"
|
||||
@@ -78,6 +83,7 @@ struct ovsdb_log {
|
||||
struct lockfile *lockfile;
|
||||
FILE *stream;
|
||||
off_t base;
|
||||
struct afsync *afsync;
|
||||
};
|
||||
|
||||
/* Whether the OS supports renaming open files.
|
||||
@@ -95,6 +101,9 @@ static bool parse_header(char *header, const char **magicp,
|
||||
uint8_t sha1[SHA1_DIGEST_SIZE]);
|
||||
static bool is_magic_ok(const char *needle, const char *haystack);
|
||||
|
||||
static struct afsync *afsync_create(int fd, uint64_t initial_ticket);
|
||||
static uint64_t afsync_destroy(struct afsync *);
|
||||
|
||||
/* Attempts to open 'name' with the specified 'open_mode'. On success, stores
|
||||
* the new log into '*filep' and returns NULL; otherwise returns NULL and
|
||||
* stores NULL into '*filep'.
|
||||
@@ -269,6 +278,7 @@ ovsdb_log_open(const char *name, const char *magic,
|
||||
file->prev_offset = 0;
|
||||
file->offset = 0;
|
||||
file->base = 0;
|
||||
file->afsync = NULL;
|
||||
*filep = file;
|
||||
return NULL;
|
||||
|
||||
@@ -308,6 +318,7 @@ ovsdb_log_close(struct ovsdb_log *file)
|
||||
{
|
||||
if (file) {
|
||||
ovsdb_error_destroy(file->error);
|
||||
afsync_destroy(file->afsync);
|
||||
free(file->name);
|
||||
free(file->display_name);
|
||||
free(file->magic);
|
||||
@@ -634,8 +645,10 @@ ovsdb_log_write(struct ovsdb_log *file, const struct json *json)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Attempts to commit 'file' to disk. Waits for the commit to succeed or fail.
|
||||
* Returns NULL if successful, otherwise the error that occurred. */
|
||||
struct ovsdb_error *
|
||||
ovsdb_log_commit(struct ovsdb_log *file)
|
||||
ovsdb_log_commit_block(struct ovsdb_log *file)
|
||||
{
|
||||
if (file->stream && fsync(fileno(file->stream))) {
|
||||
return ovsdb_io_error(errno, "%s: fsync failed", file->display_name);
|
||||
@@ -740,7 +753,7 @@ ovsdb_rename(const char *old, const char *new)
|
||||
struct ovsdb_error * OVS_WARN_UNUSED_RESULT
|
||||
ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log *new)
|
||||
{
|
||||
struct ovsdb_error *error = ovsdb_log_commit(new);
|
||||
struct ovsdb_error *error = ovsdb_log_commit_block(new);
|
||||
if (error) {
|
||||
ovsdb_log_replace_abort(new);
|
||||
return error;
|
||||
@@ -812,6 +825,10 @@ ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log *new)
|
||||
ovsdb_error_destroy(old->error);
|
||||
old->error = NULL;
|
||||
/* prev_offset only matters for OVSDB_LOG_READ. */
|
||||
if (old->afsync) {
|
||||
uint64_t ticket = afsync_destroy(old->afsync);
|
||||
old->afsync = afsync_create(fileno(old->stream), ticket + 1);
|
||||
}
|
||||
old->offset = new->offset;
|
||||
/* Keep old->name. */
|
||||
free(old->magic);
|
||||
@@ -844,3 +861,130 @@ ovsdb_log_disable_renaming_open_files(void)
|
||||
{
|
||||
rename_open_files = false;
|
||||
}
|
||||
|
||||
struct afsync {
|
||||
pthread_t thread;
|
||||
atomic_uint64_t cur, next;
|
||||
struct seq *request, *complete;
|
||||
int fd;
|
||||
};
|
||||
|
||||
static void *
|
||||
afsync_thread(void *afsync_)
|
||||
{
|
||||
struct afsync *afsync = afsync_;
|
||||
uint64_t cur = 0;
|
||||
for (;;) {
|
||||
ovsrcu_quiesce_start();
|
||||
|
||||
uint64_t request_seq = seq_read(afsync->request);
|
||||
|
||||
uint64_t next;
|
||||
atomic_read_explicit(&afsync->next, &next, memory_order_acquire);
|
||||
if (next == UINT64_MAX) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (cur != next && afsync->fd != -1) {
|
||||
int error = fsync(afsync->fd) ? errno : 0;
|
||||
if (!error) {
|
||||
cur = next;
|
||||
atomic_store_explicit(&afsync->cur, cur, memory_order_release);
|
||||
seq_change(afsync->complete);
|
||||
} else {
|
||||
VLOG_WARN("fsync failed (%s)", ovs_strerror(error));
|
||||
}
|
||||
}
|
||||
|
||||
seq_wait(afsync->request, request_seq);
|
||||
poll_block();
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static struct afsync *
|
||||
afsync_create(int fd, uint64_t initial_ticket)
|
||||
{
|
||||
struct afsync *afsync = xzalloc(sizeof *afsync);
|
||||
atomic_init(&afsync->cur, initial_ticket);
|
||||
atomic_init(&afsync->next, initial_ticket);
|
||||
afsync->request = seq_create();
|
||||
afsync->complete = seq_create();
|
||||
afsync->thread = ovs_thread_create("log_fsync", afsync_thread, afsync);
|
||||
afsync->fd = fd;
|
||||
return afsync;
|
||||
}
|
||||
|
||||
static uint64_t
|
||||
afsync_destroy(struct afsync *afsync)
|
||||
{
|
||||
if (!afsync) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint64_t next;
|
||||
atomic_read(&afsync->next, &next);
|
||||
atomic_store(&afsync->next, UINT64_MAX);
|
||||
seq_change(afsync->request);
|
||||
xpthread_join(afsync->thread, NULL);
|
||||
|
||||
seq_destroy(afsync->request);
|
||||
seq_destroy(afsync->complete);
|
||||
|
||||
free(afsync);
|
||||
|
||||
return next;
|
||||
}
|
||||
|
||||
static struct afsync *
|
||||
ovsdb_log_get_afsync(struct ovsdb_log *log)
|
||||
{
|
||||
if (!log->afsync) {
|
||||
log->afsync = afsync_create(log->stream ? fileno(log->stream) : -1, 0);
|
||||
}
|
||||
return log->afsync;
|
||||
}
|
||||
|
||||
/* Starts committing 'log' to disk. Returns a ticket that can be passed to
|
||||
* ovsdb_log_commit_wait() or compared against the return value of
|
||||
* ovsdb_log_commit_progress() later. */
|
||||
uint64_t
|
||||
ovsdb_log_commit_start(struct ovsdb_log *log)
|
||||
{
|
||||
struct afsync *afsync = ovsdb_log_get_afsync(log);
|
||||
|
||||
uint64_t orig;
|
||||
atomic_add_explicit(&afsync->next, 1, &orig, memory_order_acq_rel);
|
||||
|
||||
seq_change(afsync->request);
|
||||
|
||||
return orig + 1;
|
||||
}
|
||||
|
||||
/* Returns a ticket value that represents the current progress of commits to
|
||||
* 'log'. Suppose that some call to ovsdb_log_commit_start() returns X and any
|
||||
* call ovsdb_log_commit_progress() returns Y, for the same 'log'. Then commit
|
||||
* X is complete if and only if X <= Y. */
|
||||
uint64_t
|
||||
ovsdb_log_commit_progress(struct ovsdb_log *log)
|
||||
{
|
||||
struct afsync *afsync = ovsdb_log_get_afsync(log);
|
||||
uint64_t cur;
|
||||
atomic_read_explicit(&afsync->cur, &cur, memory_order_acquire);
|
||||
return cur;
|
||||
}
|
||||
|
||||
/* Causes poll_block() to wake up if and when ovsdb_log_commit_progress(log)
|
||||
* would return at least 'goal'. */
|
||||
void
|
||||
ovsdb_log_commit_wait(struct ovsdb_log *log, uint64_t goal)
|
||||
{
|
||||
struct afsync *afsync = ovsdb_log_get_afsync(log);
|
||||
uint64_t complete = seq_read(afsync->complete);
|
||||
uint64_t cur = ovsdb_log_commit_progress(log);
|
||||
if (cur < goal) {
|
||||
seq_wait(afsync->complete, complete);
|
||||
} else {
|
||||
poll_immediate_wake();
|
||||
}
|
||||
}
|
||||
|
@@ -35,6 +35,7 @@
|
||||
* that compacting is advised.
|
||||
*/
|
||||
|
||||
#include <stdint.h>
|
||||
#include <sys/types.h>
|
||||
#include "compiler.h"
|
||||
|
||||
@@ -70,7 +71,11 @@ void ovsdb_log_compose_record(const struct json *, const char *magic,
|
||||
|
||||
struct ovsdb_error *ovsdb_log_write(struct ovsdb_log *, const struct json *)
|
||||
OVS_WARN_UNUSED_RESULT;
|
||||
struct ovsdb_error *ovsdb_log_commit(struct ovsdb_log *)
|
||||
|
||||
uint64_t ovsdb_log_commit_start(struct ovsdb_log *);
|
||||
uint64_t ovsdb_log_commit_progress(struct ovsdb_log *);
|
||||
void ovsdb_log_commit_wait(struct ovsdb_log *, uint64_t);
|
||||
struct ovsdb_error *ovsdb_log_commit_block(struct ovsdb_log *)
|
||||
OVS_WARN_UNUSED_RESULT;
|
||||
|
||||
void ovsdb_log_mark_base(struct ovsdb_log *);
|
||||
|
@@ -222,7 +222,7 @@ do_create(struct ovs_cmdl_context *ctx)
|
||||
check_ovsdb_error(ovsdb_log_open(db_file_name, OVSDB_MAGIC,
|
||||
OVSDB_LOG_CREATE_EXCL, -1, &log));
|
||||
check_ovsdb_error(ovsdb_log_write(log, json));
|
||||
check_ovsdb_error(ovsdb_log_commit(log));
|
||||
check_ovsdb_error(ovsdb_log_commit_block(log));
|
||||
ovsdb_log_close(log);
|
||||
|
||||
json_destroy(json);
|
||||
|
@@ -380,7 +380,7 @@ do_log_io(struct ovs_cmdl_context *ctx)
|
||||
error = ovsdb_log_write(target, json);
|
||||
json_destroy(json);
|
||||
} else if (!strcmp(command, "commit")) {
|
||||
error = ovsdb_log_commit(target);
|
||||
error = ovsdb_log_commit_block(target);
|
||||
} else if (!strcmp(command, "replace_start")) {
|
||||
ovs_assert(!replacement);
|
||||
error = ovsdb_log_replace_start(log, &replacement);
|
||||
|
Reference in New Issue
Block a user