mirror of
https://github.com/openvswitch/ovs
synced 2025-08-22 01:51:26 +00:00
The Raft codebase includes calls to ovsdb_log_write_and_free() that are incorrectly wrapped in ignore(). This causes potential error resources to be leaked. These calls should be wrapped in ovsdb_error_destroy() instead, to ensure that any returned error objects are properly freed and do not result in memory leaks. Fixes: 1b1d2e6daa56 ("ovsdb: Introduce experimental support for clustered databases.") Acked-by: Mike Pattrick <mkp@redhat.com> Acked-by: Aaron Conole <aconole@redhat.com> Signed-off-by: Eelco Chaudron <echaudro@redhat.com>
5322 lines
183 KiB
C
5322 lines
183 KiB
C
/*
|
||
* Copyright (c) 2017, 2018 Nicira, Inc.
|
||
*
|
||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||
* you may not use this file except in compliance with the License.
|
||
* You may obtain a copy of the License at:
|
||
*
|
||
* http://www.apache.org/licenses/LICENSE-2.0
|
||
*
|
||
* Unless required by applicable law or agreed to in writing, software
|
||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
* See the License for the specific language governing permissions and
|
||
* limitations under the License.
|
||
*/
|
||
|
||
#include <config.h>
|
||
|
||
#include "raft.h"
|
||
#include "raft-private.h"
|
||
|
||
#include <errno.h>
|
||
#include <unistd.h>
|
||
|
||
#include "cooperative-multitasking.h"
|
||
#include "hash.h"
|
||
#include "jsonrpc.h"
|
||
#include "lockfile.h"
|
||
#include "openvswitch/dynamic-string.h"
|
||
#include "openvswitch/hmap.h"
|
||
#include "openvswitch/json.h"
|
||
#include "openvswitch/list.h"
|
||
#include "openvswitch/poll-loop.h"
|
||
#include "openvswitch/vlog.h"
|
||
#include "ovsdb-error.h"
|
||
#include "ovsdb-parser.h"
|
||
#include "ovsdb/log.h"
|
||
#include "raft-rpc.h"
|
||
#include "random.h"
|
||
#include "simap.h"
|
||
#include "socket-util.h"
|
||
#include "stream.h"
|
||
#include "timeval.h"
|
||
#include "unicode.h"
|
||
#include "unixctl.h"
|
||
#include "util.h"
|
||
#include "uuid.h"
|
||
|
||
VLOG_DEFINE_THIS_MODULE(raft);
|
||
|
||
/* Roles for a Raft server:
|
||
*
|
||
* - Followers: Servers in touch with the current leader.
|
||
*
|
||
* - Candidate: Servers unaware of a current leader and seeking election to
|
||
* leader.
|
||
*
|
||
* - Leader: Handles all client requests. At most one at a time.
|
||
*
|
||
* In normal operation there is exactly one leader and all of the other servers
|
||
* are followers. */
|
||
enum raft_role {
|
||
RAFT_FOLLOWER,
|
||
RAFT_CANDIDATE,
|
||
RAFT_LEADER
|
||
};
|
||
|
||
static const char *raft_role_to_string(enum raft_role);
|
||
|
||
/* Flags for unit tests. */
|
||
enum raft_failure_test {
|
||
FT_NO_TEST,
|
||
FT_CRASH_BEFORE_SEND_APPEND_REQ,
|
||
FT_CRASH_AFTER_SEND_APPEND_REQ,
|
||
FT_CRASH_BEFORE_SEND_EXEC_REP,
|
||
FT_CRASH_AFTER_SEND_EXEC_REP,
|
||
FT_CRASH_BEFORE_SEND_EXEC_REQ,
|
||
FT_CRASH_AFTER_SEND_EXEC_REQ,
|
||
FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE,
|
||
FT_CRASH_BEFORE_SEND_SNAPSHOT_REP,
|
||
FT_DELAY_ELECTION,
|
||
FT_DONT_SEND_VOTE_REQUEST,
|
||
FT_STOP_RAFT_RPC,
|
||
FT_TRANSFER_LEADERSHIP,
|
||
FT_TRANSFER_LEADERSHIP_AFTER_SEND_APPEND_REQ,
|
||
FT_TRANSFER_LEADERSHIP_AFTER_STARTING_TO_ADD,
|
||
};
|
||
static enum raft_failure_test failure_test;
|
||
|
||
/* A connection between this Raft server and another one. */
|
||
struct raft_conn {
|
||
struct ovs_list list_node; /* In struct raft's 'conns' list. */
|
||
struct jsonrpc_session *js; /* JSON-RPC connection. */
|
||
struct uuid sid; /* This server's unique ID. */
|
||
char *nickname; /* Short name for use in log messages. */
|
||
bool incoming; /* True if incoming, false if outgoing. */
|
||
unsigned int js_seqno; /* Seqno for noticing (re)connections. */
|
||
};
|
||
|
||
static void raft_conn_close(struct raft_conn *);
|
||
|
||
/* A "command", that is, a request to append an entry to the log.
|
||
*
|
||
* The Raft specification only allows clients to issue commands to the leader.
|
||
* With this implementation, clients may issue a command on any server, which
|
||
* then relays the command to the leader if necessary.
|
||
*
|
||
* This structure is thus used in three cases:
|
||
*
|
||
* 1. We are the leader and the command was issued to us directly.
|
||
*
|
||
* 2. We are a follower and relayed the command to the leader.
|
||
*
|
||
* 3. We are the leader and a follower relayed the command to us.
|
||
*/
|
||
struct raft_command {
|
||
/* All cases. */
|
||
struct hmap_node hmap_node; /* In struct raft's 'commands' hmap. */
|
||
unsigned int n_refs; /* Reference count. */
|
||
enum raft_command_status status; /* Execution status. */
|
||
struct uuid eid; /* Entry ID of result. */
|
||
|
||
/* Case 1 only. */
|
||
uint64_t index; /* Index in log (0 if being relayed). */
|
||
|
||
/* Case 2 only. */
|
||
long long int timestamp; /* Issue or last ping time, for expiration. */
|
||
|
||
/* Case 3 only. */
|
||
struct uuid sid; /* The follower (otherwise UUID_ZERO). */
|
||
};
|
||
|
||
static void raft_command_complete(struct raft *, struct raft_command *,
|
||
enum raft_command_status);
|
||
|
||
static void raft_complete_all_commands(struct raft *,
|
||
enum raft_command_status);
|
||
|
||
/* Type of deferred action, see struct raft_waiter. */
|
||
enum raft_waiter_type {
|
||
RAFT_W_ENTRY,
|
||
RAFT_W_TERM,
|
||
RAFT_W_RPC,
|
||
};
|
||
|
||
/* An action deferred until a log write commits to disk. */
|
||
struct raft_waiter {
|
||
struct ovs_list list_node;
|
||
uint64_t commit_ticket;
|
||
|
||
enum raft_waiter_type type;
|
||
union {
|
||
/* RAFT_W_ENTRY.
|
||
*
|
||
* Waits for a RAFT_REC_ENTRY write to our local log to commit. Upon
|
||
* completion, updates 'log_synced' to indicate that the new log entry
|
||
* or entries are committed and, if we are leader, also updates our
|
||
* local 'match_index'. */
|
||
struct {
|
||
uint64_t index;
|
||
} entry;
|
||
|
||
/* RAFT_W_TERM.
|
||
*
|
||
* Waits for a RAFT_REC_TERM or RAFT_REC_VOTE record write to commit.
|
||
* Upon completion, updates 'synced_term' and 'synced_vote', which
|
||
* triggers sending RPCs deferred by the uncommitted 'term' and
|
||
* 'vote'. */
|
||
struct {
|
||
uint64_t term;
|
||
struct uuid vote;
|
||
} term;
|
||
|
||
/* RAFT_W_RPC.
|
||
*
|
||
* Sometimes, sending an RPC to a peer must be delayed until an entry,
|
||
* a term, or a vote mentioned in the RPC is synced to disk. This
|
||
* waiter keeps a copy of such an RPC until the previous waiters have
|
||
* committed. */
|
||
union raft_rpc *rpc;
|
||
};
|
||
};
|
||
|
||
static struct raft_waiter *raft_waiter_create(struct raft *,
|
||
enum raft_waiter_type,
|
||
bool start_commit);
|
||
static void raft_waiters_destroy(struct raft *);
|
||
|
||
/* The Raft state machine. */
|
||
struct raft {
|
||
struct hmap_node hmap_node; /* In 'all_rafts'. */
|
||
struct ovsdb_log *log;
|
||
|
||
/* Persistent derived state.
|
||
*
|
||
* This must be updated on stable storage before responding to RPCs. It can be
|
||
* derived from the header, snapshot, and log in 'log'. */
|
||
|
||
struct uuid cid; /* Cluster ID (immutable for the cluster). */
|
||
struct uuid sid; /* Server ID (immutable for the server). */
|
||
char *local_address; /* Local address (immutable for the server). */
|
||
char *local_nickname; /* Used for local server in log messages. */
|
||
char *name; /* Schema name (immutable for the cluster). */
|
||
|
||
/* Contains "struct raft_server"s and represents the server configuration
|
||
* most recently added to 'log'. */
|
||
struct hmap servers;
|
||
|
||
#define ELECTION_BASE_MSEC 1000
|
||
#define ELECTION_RANGE_MSEC 1000
|
||
#define ELECTION_MIN_MSEC 100
|
||
#define ELECTION_MAX_MSEC 600000
|
||
/* The election timeout base value for leader election, in milliseconds.
|
||
* It can be set by unixctl cluster/change-election-timer. Default value is
|
||
* ELECTION_BASE_MSEC. */
|
||
uint64_t election_timer;
|
||
/* If not 0, it is the new value of election_timer being proposed. */
|
||
uint64_t election_timer_new;
|
||
|
||
/* Persistent state on all servers.
|
||
*
|
||
* Must be updated on stable storage before responding to RPCs. */
|
||
|
||
/* Current term and the vote for that term. These might be on the way to
|
||
* disk now. */
|
||
uint64_t term; /* Initialized to 0 and only increases. */
|
||
struct uuid vote; /* All-zeros if no vote yet in 'term'. */
|
||
|
||
/* The term and vote that have been synced to disk. */
|
||
uint64_t synced_term;
|
||
struct uuid synced_vote;
|
||
|
||
/* The log.
|
||
*
|
||
* A log entry with index 1 never really exists; the initial snapshot for a
|
||
* Raft is considered to include this index. The first real log entry has
|
||
* index 2.
|
||
*
|
||
* A new Raft instance contains an empty log: log_start=2, log_end=2.
|
||
* Over time, the log grows: log_start=2, log_end=N.
|
||
* At some point, the server takes a snapshot: log_start=N, log_end=N.
|
||
* The log continues to grow: log_start=N, log_end=N+1...
|
||
*
|
||
* Must be updated on stable storage before responding to RPCs. */
|
||
struct raft_entry *entries; /* Log entry i is in log[i - log_start]. */
|
||
uint64_t log_start; /* Index of first entry in log. */
|
||
uint64_t log_end; /* Index of last entry in log, plus 1. */
|
||
uint64_t log_synced; /* Index of last synced entry. */
|
||
size_t allocated_log; /* Allocated entries in 'log'. */
|
||
|
||
/* Snapshot state (see Figure 5.1)
|
||
*
|
||
* This is the state of the cluster as of the last discarded log entry,
|
||
* that is, at log index 'log_start - 1' (called prevIndex in Figure 5.1).
|
||
* Only committed log entries can be included in a snapshot. */
|
||
struct raft_entry snap;
|
||
|
||
/* Volatile state.
|
||
*
|
||
* The snapshot is always committed, but the rest of the log might not be yet.
|
||
* 'last_applied' tracks what entries have been passed to the client. If the
|
||
* client hasn't yet read the latest snapshot, then even the snapshot isn't
|
||
* applied yet. Thus, the invariants are different for these members:
|
||
*
|
||
* log_start - 2 <= last_applied <= commit_index < log_end.
|
||
* log_start - 1 <= commit_index < log_end.
|
||
*/
|
||
|
||
enum raft_role role; /* Current role. */
|
||
uint64_t commit_index; /* Max log index known to be committed. */
|
||
uint64_t last_applied; /* Max log index applied to state machine. */
|
||
struct uuid leader_sid; /* Server ID of leader (zero, if unknown). */
|
||
|
||
long long int election_base; /* Time of last heartbeat from leader. */
|
||
long long int election_timeout; /* Time at which we start an election. */
|
||
|
||
long long int election_start; /* Start election time. */
|
||
long long int election_won; /* Time of election completion. */
|
||
bool leadership_transfer; /* Was the leadership transferred? */
|
||
|
||
unsigned int n_disconnections;
|
||
|
||
/* Used for joining a cluster. */
|
||
bool joining; /* Attempting to join the cluster? */
|
||
struct sset remote_addresses; /* Addresses to try to find other servers. */
|
||
#define RAFT_JOIN_TIMEOUT_MS 1000
|
||
long long int join_timeout; /* Time to re-send add server request. */
|
||
|
||
/* Used for leaving a cluster. */
|
||
bool leaving; /* True if we are leaving the cluster. */
|
||
bool left; /* True if we have finished leaving. */
|
||
long long int leave_timeout; /* Time to re-send remove server request. */
|
||
|
||
/* Failure. */
|
||
bool failed; /* True if unrecoverable error has occurred. */
|
||
|
||
/* File synchronization. */
|
||
struct ovs_list waiters; /* Contains "struct raft_waiter"s. */
|
||
|
||
/* Network connections. */
|
||
struct pstream *listener; /* For connections from other Raft servers. */
|
||
long long int listen_backoff; /* For retrying creating 'listener'. */
|
||
struct ovs_list conns; /* Contains struct raft_conns. */
|
||
|
||
/* Leaders only. Reinitialized after becoming leader. */
|
||
struct hmap add_servers; /* Contains "struct raft_server"s to add. */
|
||
struct raft_server *remove_server; /* Server being removed. */
|
||
struct hmap commands; /* Contains "struct raft_command"s. */
|
||
long long int ping_timeout; /* Time at which to send a heartbeat */
|
||
|
||
/* Candidates only. Reinitialized at start of election. */
|
||
int n_votes; /* Number of votes for me. */
|
||
bool prevote_passed; /* Indicates if it passed pre-vote phase.
|
||
* Pre-vote mechanism is introduced in raft
|
||
* paper section 9.6. We implement it as a
|
||
* sub-state of candidate to minimize the
|
||
* change and keep backward compatibility. */
|
||
|
||
/* Followers and candidates only. */
|
||
bool candidate_retrying; /* The earlier election timed-out and we are
|
||
now retrying. */
|
||
bool had_leader; /* There has been leader elected since last
|
||
election initiated. This is to help setting
|
||
candidate_retrying. */
|
||
|
||
/* For all. */
|
||
bool ever_had_leader; /* There has been leader elected since the raft
|
||
is initialized, meaning it is ever
|
||
connected. */
|
||
|
||
/* Connection backlog limits. */
|
||
#define DEFAULT_MAX_BACKLOG_N_MSGS 500
|
||
#define DEFAULT_MAX_BACKLOG_N_BYTES UINT32_MAX
|
||
size_t conn_backlog_max_n_msgs; /* Number of messages. */
|
||
size_t conn_backlog_max_n_bytes; /* Number of bytes. */
|
||
};
|
||
|
||
/* All Raft structures. */
|
||
static struct hmap all_rafts = HMAP_INITIALIZER(&all_rafts);
|
||
|
||
static void raft_init(void);
|
||
|
||
static struct ovsdb_error *raft_read_header(struct raft *)
|
||
OVS_WARN_UNUSED_RESULT;
|
||
|
||
static void raft_send_execute_command_reply(struct raft *,
|
||
const struct uuid *sid,
|
||
const struct uuid *eid,
|
||
enum raft_command_status,
|
||
uint64_t commit_index);
|
||
|
||
static void raft_update_our_match_index(struct raft *, uint64_t min_index);
|
||
|
||
static void raft_send_remove_server_reply__(
|
||
struct raft *, const struct uuid *target_sid,
|
||
const struct uuid *requester_sid, struct unixctl_conn *requester_conn,
|
||
bool success, const char *comment);
|
||
static void raft_finished_leaving_cluster(struct raft *);
|
||
|
||
static void raft_server_init_leader(struct raft *, struct raft_server *);
|
||
|
||
static bool raft_rpc_is_heartbeat(const union raft_rpc *);
|
||
static bool raft_is_rpc_synced(const struct raft *, const union raft_rpc *);
|
||
|
||
static void raft_handle_rpc(struct raft *, const union raft_rpc *);
|
||
|
||
static bool raft_send_at(struct raft *, const union raft_rpc *,
|
||
int line_number);
|
||
#define raft_send(raft, rpc) raft_send_at(raft, rpc, __LINE__)
|
||
|
||
static bool raft_send_to_conn_at(struct raft *, const union raft_rpc *,
|
||
struct raft_conn *, int line_number);
|
||
#define raft_send_to_conn(raft, rpc, conn) \
|
||
raft_send_to_conn_at(raft, rpc, conn, __LINE__)
|
||
|
||
static void raft_send_append_request(struct raft *,
|
||
struct raft_server *, unsigned int n,
|
||
const char *comment);
|
||
|
||
static void raft_role_transition_at(struct raft *, enum raft_role,
|
||
const char *func, const char *source);
|
||
#define raft_role_transition(raft, role) \
|
||
raft_role_transition_at(raft, role, __func__, OVS_SOURCE_LOCATOR)
|
||
|
||
static void raft_become_leader(struct raft *);
|
||
static void raft_become_follower(struct raft *);
|
||
static void raft_reset_election_timer(struct raft *);
|
||
static void raft_reset_ping_timer(struct raft *);
|
||
static void raft_send_heartbeats(struct raft *);
|
||
static void raft_start_election(struct raft *, bool is_prevote,
|
||
bool leadership_transfer);
|
||
static bool raft_truncate(struct raft *, uint64_t new_end);
|
||
static void raft_get_servers_from_log(struct raft *, enum vlog_level);
|
||
static void raft_get_election_timer_from_log(struct raft *);
|
||
|
||
static bool raft_handle_write_error(struct raft *, struct ovsdb_error *);
|
||
static bool raft_has_uncommitted_configuration(const struct raft *);
|
||
|
||
static void raft_run_reconfigure(struct raft *);
|
||
|
||
static void raft_set_leader(struct raft *, const struct uuid *sid);
|
||
|
||
static struct raft_server *
|
||
raft_find_server(const struct raft *raft, const struct uuid *sid)
|
||
{
|
||
return raft_server_find(&raft->servers, sid);
|
||
}
|
||
|
||
static struct raft_server *
|
||
raft_find_new_server(struct raft *raft, const struct uuid *uuid)
|
||
{
|
||
return raft_server_find(&raft->add_servers, uuid);
|
||
}
|
||
|
||
static char *
|
||
raft_make_address_passive(const char *address_)
|
||
{
|
||
if (!strncmp(address_, "unix:", 5)) {
|
||
return xasprintf("p%s", address_);
|
||
} else {
|
||
char *address = xstrdup(address_);
|
||
char *host, *port;
|
||
inet_parse_host_port_tokens(strchr(address, ':') + 1, &host, &port);
|
||
|
||
struct ds paddr = DS_EMPTY_INITIALIZER;
|
||
ds_put_format(&paddr, "p%.3s:%s:", address, port);
|
||
if (strchr(host, ':')) {
|
||
ds_put_format(&paddr, "[%s]", host);
|
||
} else {
|
||
ds_put_cstr(&paddr, host);
|
||
}
|
||
free(address);
|
||
return ds_steal_cstr(&paddr);
|
||
}
|
||
}
|
||
|
||
static struct raft *
|
||
raft_alloc(void)
|
||
{
|
||
raft_init();
|
||
|
||
struct raft *raft = xzalloc(sizeof *raft);
|
||
hmap_node_nullify(&raft->hmap_node);
|
||
hmap_init(&raft->servers);
|
||
raft->log_start = raft->log_end = 1;
|
||
raft_role_transition(raft, RAFT_FOLLOWER);
|
||
sset_init(&raft->remote_addresses);
|
||
raft->join_timeout = LLONG_MAX;
|
||
ovs_list_init(&raft->waiters);
|
||
raft->listen_backoff = LLONG_MIN;
|
||
ovs_list_init(&raft->conns);
|
||
hmap_init(&raft->add_servers);
|
||
hmap_init(&raft->commands);
|
||
|
||
raft->election_timer = ELECTION_BASE_MSEC;
|
||
|
||
raft->conn_backlog_max_n_msgs = DEFAULT_MAX_BACKLOG_N_MSGS;
|
||
raft->conn_backlog_max_n_bytes = DEFAULT_MAX_BACKLOG_N_BYTES;
|
||
|
||
return raft;
|
||
}
|
||
|
||
/* Creates an on-disk file that represents a new Raft cluster and initializes
|
||
* it to consist of a single server, the one on which this function is called.
|
||
*
|
||
* Creates the local copy of the cluster's log in 'file_name', which must not
|
||
* already exist. Gives it the name 'name', which should be the database
|
||
* schema name and which is used only to match up this database with the server
|
||
* added to the cluster later if the cluster ID is unavailable.
|
||
*
|
||
* The new server is located at 'local_address', which must take one of the
|
||
* forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
|
||
* square bracket enclosed IPv6 address and PORT is a TCP port number.
|
||
*
|
||
* This only creates the on-disk file. Use raft_open() to start operating the
|
||
* new server.
|
||
*
|
||
* The optional election_timer argument, when greater than zero, sets the given
|
||
* leader election timer for the new cluster, in miliseconds. If non-zero, it
|
||
* must be between 100 and 600000 inclusive.
|
||
*
|
||
* Returns null if successful, otherwise an ovsdb_error describing the
|
||
* problem. */
|
||
struct ovsdb_error * OVS_WARN_UNUSED_RESULT
|
||
raft_create_cluster(const char *file_name, const char *name,
|
||
const char *local_address, const struct json *data,
|
||
const uint64_t election_timer)
|
||
{
|
||
/* Parse and verify validity of the local address. */
|
||
struct ovsdb_error *error = raft_address_validate(local_address);
|
||
if (error) {
|
||
return error;
|
||
}
|
||
|
||
/* Validate optional election timer */
|
||
if (election_timer > 0) {
|
||
error = raft_validate_election_timer(election_timer);
|
||
if (error) {
|
||
return error;
|
||
}
|
||
}
|
||
|
||
/* Create log file. */
|
||
struct ovsdb_log *log;
|
||
error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
|
||
-1, &log);
|
||
if (error) {
|
||
return error;
|
||
}
|
||
|
||
/* Write log file. */
|
||
const uint64_t term = 1;
|
||
uint64_t index = 1;
|
||
struct raft_header h = {
|
||
.sid = uuid_random(),
|
||
.cid = uuid_random(),
|
||
.name = xstrdup(name),
|
||
.local_address = xstrdup(local_address),
|
||
.joining = false,
|
||
.remote_addresses = SSET_INITIALIZER(&h.remote_addresses),
|
||
.snap_index = index++,
|
||
.snap = {
|
||
.term = term,
|
||
.eid = uuid_random(),
|
||
.servers = json_object_create(),
|
||
},
|
||
};
|
||
raft_entry_set_parsed_data(&h.snap, data);
|
||
shash_add_nocopy(json_object(h.snap.servers),
|
||
uuid_to_string(&h.sid),
|
||
json_string_create(local_address));
|
||
error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
|
||
raft_header_uninit(&h);
|
||
if (error) {
|
||
goto error;
|
||
}
|
||
|
||
if (election_timer > 0) {
|
||
struct raft_record r = {
|
||
.type = RAFT_REC_ENTRY,
|
||
.term = term,
|
||
.entry = {
|
||
.index = index,
|
||
.data = NULL,
|
||
.servers = NULL,
|
||
.election_timer = election_timer,
|
||
.eid = UUID_ZERO,
|
||
},
|
||
};
|
||
error = ovsdb_log_write_and_free(log, raft_record_to_json(&r));
|
||
raft_record_uninit(&r);
|
||
if (error) {
|
||
goto error;
|
||
}
|
||
}
|
||
|
||
error = ovsdb_log_commit_block(log);
|
||
|
||
error:
|
||
ovsdb_log_close(log);
|
||
return error;
|
||
}
|
||
|
||
/* Creates a database file that represents a new server in an existing Raft
|
||
* cluster.
|
||
*
|
||
* Creates the local copy of the cluster's log in 'file_name', which must not
|
||
* already exist. Gives it the name 'name', which must be the same name
|
||
* passed in to raft_create_cluster() earlier.
|
||
*
|
||
* 'cid' is optional. If specified, the new server will join only the cluster
|
||
* with the given cluster ID.
|
||
*
|
||
* The new server is located at 'local_address', which must take one of the
|
||
* forms "tcp:IP:PORT" or "ssl:IP:PORT", where IP is an IPv4 address or a
|
||
* square bracket enclosed IPv6 address and PORT is a TCP port number.
|
||
*
|
||
* Joining the cluster requires contacting it. Thus, 'remote_addresses'
|
||
* specifies the addresses of existing servers in the cluster. One server out
|
||
* of the existing cluster is sufficient, as long as that server is reachable
|
||
* and not partitioned from the current cluster leader. If multiple servers
|
||
* from the cluster are specified, then it is sufficient for any of them to
|
||
* meet this criterion.
|
||
*
|
||
* This only creates the on-disk file and does no network access. Use
|
||
* raft_open() to start operating the new server. (Until this happens, the
|
||
* new server has not joined the cluster.)
|
||
*
|
||
* Returns null if successful, otherwise an ovsdb_error describing the
|
||
* problem. */
|
||
struct ovsdb_error * OVS_WARN_UNUSED_RESULT
|
||
raft_join_cluster(const char *file_name,
|
||
const char *name, const char *local_address,
|
||
const struct sset *remote_addresses,
|
||
const struct uuid *cid)
|
||
{
|
||
ovs_assert(!sset_is_empty(remote_addresses));
|
||
|
||
/* Parse and verify validity of the addresses. */
|
||
struct ovsdb_error *error = raft_address_validate(local_address);
|
||
if (error) {
|
||
return error;
|
||
}
|
||
const char *addr;
|
||
SSET_FOR_EACH (addr, remote_addresses) {
|
||
error = raft_address_validate(addr);
|
||
if (error) {
|
||
return error;
|
||
}
|
||
if (!strcmp(addr, local_address)) {
|
||
return ovsdb_error(NULL, "remote addresses cannot be the same "
|
||
"as the local address");
|
||
}
|
||
}
|
||
|
||
/* Verify validity of the cluster ID (if provided). */
|
||
if (cid && uuid_is_zero(cid)) {
|
||
return ovsdb_error(NULL, "all-zero UUID is not valid cluster ID");
|
||
}
|
||
|
||
/* Create log file. */
|
||
struct ovsdb_log *log;
|
||
error = ovsdb_log_open(file_name, RAFT_MAGIC, OVSDB_LOG_CREATE_EXCL,
|
||
-1, &log);
|
||
if (error) {
|
||
return error;
|
||
}
|
||
|
||
/* Write log file. */
|
||
struct raft_header h = {
|
||
.sid = uuid_random(),
|
||
.cid = cid ? *cid : UUID_ZERO,
|
||
.name = xstrdup(name),
|
||
.local_address = xstrdup(local_address),
|
||
.joining = true,
|
||
/* No snapshot yet. */
|
||
};
|
||
sset_clone(&h.remote_addresses, remote_addresses);
|
||
error = ovsdb_log_write_and_free(log, raft_header_to_json(&h));
|
||
raft_header_uninit(&h);
|
||
if (!error) {
|
||
error = ovsdb_log_commit_block(log);
|
||
}
|
||
ovsdb_log_close(log);
|
||
|
||
return error;
|
||
}
|
||
|
||
/* Reads the initial header record from 'log', which must be a Raft clustered
|
||
* database log, and populates '*md' with the information read from it. The
|
||
* caller must eventually destroy 'md' with raft_metadata_destroy().
|
||
*
|
||
* On success, returns NULL. On failure, returns an error that the caller must
|
||
* eventually destroy and zeros '*md'. */
|
||
struct ovsdb_error * OVS_WARN_UNUSED_RESULT
|
||
raft_read_metadata(struct ovsdb_log *log, struct raft_metadata *md)
|
||
{
|
||
struct raft *raft = raft_alloc();
|
||
raft->log = log;
|
||
|
||
struct ovsdb_error *error = raft_read_header(raft);
|
||
if (!error) {
|
||
md->sid = raft->sid;
|
||
md->name = xstrdup(raft->name);
|
||
md->local = xstrdup(raft->local_address);
|
||
md->cid = raft->cid;
|
||
} else {
|
||
memset(md, 0, sizeof *md);
|
||
}
|
||
|
||
raft->log = NULL;
|
||
raft_close(raft);
|
||
return error;
|
||
}
|
||
|
||
/* Frees the metadata in 'md'. */
|
||
void
|
||
raft_metadata_destroy(struct raft_metadata *md)
|
||
{
|
||
if (md) {
|
||
free(md->name);
|
||
free(md->local);
|
||
}
|
||
}
|
||
|
||
static const struct raft_entry *
|
||
raft_get_entry(const struct raft *raft, uint64_t index)
|
||
{
|
||
ovs_assert(index >= raft->log_start);
|
||
ovs_assert(index < raft->log_end);
|
||
return &raft->entries[index - raft->log_start];
|
||
}
|
||
|
||
static uint64_t
|
||
raft_get_term(const struct raft *raft, uint64_t index)
|
||
{
|
||
return (index == raft->log_start - 1
|
||
? raft->snap.term
|
||
: raft_get_entry(raft, index)->term);
|
||
}
|
||
|
||
static const struct json *
|
||
raft_servers_for_index(const struct raft *raft, uint64_t index)
|
||
{
|
||
ovs_assert(index >= raft->log_start - 1);
|
||
ovs_assert(index < raft->log_end);
|
||
|
||
const struct json *servers = raft->snap.servers;
|
||
for (uint64_t i = raft->log_start; i <= index; i++) {
|
||
const struct raft_entry *e = raft_get_entry(raft, i);
|
||
if (e->servers) {
|
||
servers = e->servers;
|
||
}
|
||
}
|
||
return servers;
|
||
}
|
||
|
||
static void
|
||
raft_set_servers(struct raft *raft, const struct hmap *new_servers,
|
||
enum vlog_level level)
|
||
{
|
||
struct raft_server *s;
|
||
HMAP_FOR_EACH_SAFE (s, hmap_node, &raft->servers) {
|
||
if (!raft_server_find(new_servers, &s->sid)) {
|
||
ovs_assert(s != raft->remove_server);
|
||
|
||
hmap_remove(&raft->servers, &s->hmap_node);
|
||
VLOG(level, "server %s removed from configuration", s->nickname);
|
||
raft_server_destroy(s);
|
||
}
|
||
}
|
||
|
||
HMAP_FOR_EACH_SAFE (s, hmap_node, new_servers) {
|
||
if (!raft_find_server(raft, &s->sid)) {
|
||
VLOG(level, "server %s added to configuration", s->nickname);
|
||
|
||
struct raft_server *new
|
||
= raft_server_add(&raft->servers, &s->sid, s->address);
|
||
raft_server_init_leader(raft, new);
|
||
}
|
||
}
|
||
}
|
||
|
||
static uint64_t
|
||
raft_add_entry(struct raft *raft,
|
||
uint64_t term, struct json *data, const struct uuid *eid,
|
||
struct json *servers, uint64_t election_timer)
|
||
{
|
||
if (raft->log_end - raft->log_start >= raft->allocated_log) {
|
||
raft->entries = x2nrealloc(raft->entries, &raft->allocated_log,
|
||
sizeof *raft->entries);
|
||
}
|
||
|
||
uint64_t index = raft->log_end++;
|
||
struct raft_entry *entry = &raft->entries[index - raft->log_start];
|
||
entry->term = term;
|
||
entry->eid = eid ? *eid : UUID_ZERO;
|
||
entry->servers = servers;
|
||
entry->election_timer = election_timer;
|
||
raft_entry_set_parsed_data_nocopy(entry, data);
|
||
return index;
|
||
}
|
||
|
||
/* Writes a RAFT_REC_ENTRY record for 'term', 'data', 'eid', 'servers',
|
||
* 'election_timer' to * 'raft''s log and returns an error indication. */
|
||
static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
|
||
raft_write_entry(struct raft *raft, uint64_t term, struct json *data,
|
||
const struct uuid *eid, struct json *servers,
|
||
uint64_t election_timer)
|
||
{
|
||
uint64_t index = raft_add_entry(raft, term, data, eid, servers,
|
||
election_timer);
|
||
const struct json *entry_data = raft_entry_get_serialized_data(
|
||
&raft->entries[index - raft->log_start]);
|
||
struct raft_record r = {
|
||
.type = RAFT_REC_ENTRY,
|
||
.term = term,
|
||
.entry = {
|
||
.index = index,
|
||
.data = CONST_CAST(struct json *, entry_data),
|
||
.servers = servers,
|
||
.election_timer = election_timer,
|
||
.eid = eid ? *eid : UUID_ZERO,
|
||
},
|
||
};
|
||
return ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r));
|
||
}
|
||
|
||
static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
|
||
raft_write_state(struct ovsdb_log *log,
|
||
uint64_t term, const struct uuid *vote)
|
||
{
|
||
struct raft_record r = { .term = term };
|
||
if (vote && !uuid_is_zero(vote)) {
|
||
r.type = RAFT_REC_VOTE;
|
||
r.sid = *vote;
|
||
} else {
|
||
r.type = RAFT_REC_TERM;
|
||
}
|
||
return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
|
||
}
|
||
|
||
static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
|
||
raft_apply_record(struct raft *raft, unsigned long long int rec_idx,
|
||
const struct raft_record *r)
|
||
{
|
||
/* Apply "term", which is present in most kinds of records (and otherwise
|
||
* 0).
|
||
*
|
||
* A Raft leader can replicate entries from previous terms to the other
|
||
* servers in the cluster, retaining the original terms on those entries
|
||
* (see section 3.6.2 "Committing entries from previous terms" for more
|
||
* information), so it's OK for the term in a log record to precede the
|
||
* current term. */
|
||
if (r->term > raft->term) {
|
||
raft->term = raft->synced_term = r->term;
|
||
raft->vote = raft->synced_vote = UUID_ZERO;
|
||
}
|
||
|
||
switch (r->type) {
|
||
case RAFT_REC_ENTRY:
|
||
if (r->entry.index < raft->commit_index) {
|
||
return ovsdb_error(NULL, "record %llu attempts to truncate log "
|
||
"from %"PRIu64" to %"PRIu64" entries, but "
|
||
"commit index is already %"PRIu64,
|
||
rec_idx, raft->log_end, r->entry.index,
|
||
raft->commit_index);
|
||
} else if (r->entry.index > raft->log_end) {
|
||
return ovsdb_error(NULL, "record %llu with index %"PRIu64" skips "
|
||
"past expected index %"PRIu64,
|
||
rec_idx, r->entry.index, raft->log_end);
|
||
}
|
||
|
||
if (r->entry.index < raft->log_end) {
|
||
/* This can happen, but it is notable. */
|
||
VLOG_DBG("record %llu truncates log from %"PRIu64" to %"PRIu64
|
||
" entries", rec_idx, raft->log_end, r->entry.index);
|
||
raft_truncate(raft, r->entry.index);
|
||
}
|
||
|
||
uint64_t prev_term = (raft->log_end > raft->log_start
|
||
? raft->entries[raft->log_end
|
||
- raft->log_start - 1].term
|
||
: raft->snap.term);
|
||
if (r->term < prev_term) {
|
||
return ovsdb_error(NULL, "record %llu with index %"PRIu64" term "
|
||
"%"PRIu64" precedes previous entry's term "
|
||
"%"PRIu64,
|
||
rec_idx, r->entry.index, r->term, prev_term);
|
||
}
|
||
|
||
raft->log_synced = raft_add_entry(
|
||
raft, r->term,
|
||
json_nullable_clone(r->entry.data), &r->entry.eid,
|
||
json_nullable_clone(r->entry.servers),
|
||
r->entry.election_timer);
|
||
return NULL;
|
||
|
||
case RAFT_REC_TERM:
|
||
return NULL;
|
||
|
||
case RAFT_REC_VOTE:
|
||
if (r->term < raft->term) {
|
||
return ovsdb_error(NULL, "record %llu votes for term %"PRIu64" "
|
||
"but current term is %"PRIu64,
|
||
rec_idx, r->term, raft->term);
|
||
} else if (!uuid_is_zero(&raft->vote)
|
||
&& !uuid_equals(&raft->vote, &r->sid)) {
|
||
return ovsdb_error(NULL, "record %llu votes for "SID_FMT" in term "
|
||
"%"PRIu64" but a previous record for the "
|
||
"same term voted for "SID_FMT, rec_idx,
|
||
SID_ARGS(&raft->vote), r->term,
|
||
SID_ARGS(&r->sid));
|
||
} else {
|
||
raft->vote = raft->synced_vote = r->sid;
|
||
return NULL;
|
||
}
|
||
break;
|
||
|
||
case RAFT_REC_NOTE:
|
||
if (!strcmp(r->note, "left")) {
|
||
return ovsdb_error(NULL, "record %llu indicates server has left "
|
||
"the cluster; it cannot be added back (use "
|
||
"\"ovsdb-tool join-cluster\" to add a new "
|
||
"server)", rec_idx);
|
||
}
|
||
return NULL;
|
||
|
||
case RAFT_REC_COMMIT_INDEX:
|
||
if (r->commit_index < raft->commit_index) {
|
||
return ovsdb_error(NULL, "record %llu regresses commit index "
|
||
"from %"PRIu64 " to %"PRIu64,
|
||
rec_idx, raft->commit_index, r->commit_index);
|
||
} else if (r->commit_index >= raft->log_end) {
|
||
return ovsdb_error(NULL, "record %llu advances commit index to "
|
||
"%"PRIu64 " but last log index is %"PRIu64,
|
||
rec_idx, r->commit_index, raft->log_end - 1);
|
||
} else {
|
||
raft->commit_index = r->commit_index;
|
||
return NULL;
|
||
}
|
||
break;
|
||
|
||
case RAFT_REC_LEADER:
|
||
/* XXX we could use this to take back leadership for quick restart */
|
||
return NULL;
|
||
|
||
default:
|
||
OVS_NOT_REACHED();
|
||
}
|
||
}
|
||
|
||
static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
|
||
raft_read_header(struct raft *raft)
|
||
{
|
||
/* Read header record. */
|
||
struct json *json;
|
||
struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
|
||
if (error || !json) {
|
||
/* Report error or end-of-file. */
|
||
return error;
|
||
}
|
||
ovsdb_log_mark_base(raft->log);
|
||
|
||
struct raft_header h;
|
||
error = raft_header_from_json(&h, json);
|
||
json_destroy(json);
|
||
if (error) {
|
||
return error;
|
||
}
|
||
|
||
raft->sid = h.sid;
|
||
raft->cid = h.cid;
|
||
raft->name = xstrdup(h.name);
|
||
raft->local_address = xstrdup(h.local_address);
|
||
raft->local_nickname = raft_address_to_nickname(h.local_address, &h.sid);
|
||
raft->joining = h.joining;
|
||
|
||
if (h.joining) {
|
||
sset_clone(&raft->remote_addresses, &h.remote_addresses);
|
||
} else {
|
||
raft_entry_clone(&raft->snap, &h.snap);
|
||
raft->log_start = raft->log_end = h.snap_index + 1;
|
||
raft->log_synced = raft->commit_index = h.snap_index;
|
||
raft->last_applied = h.snap_index - 1;
|
||
}
|
||
|
||
raft_header_uninit(&h);
|
||
|
||
return NULL;
|
||
}
|
||
|
||
static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
|
||
raft_read_log(struct raft *raft)
|
||
{
|
||
for (unsigned long long int i = 1; ; i++) {
|
||
struct json *json;
|
||
struct ovsdb_error *error = ovsdb_log_read(raft->log, &json);
|
||
if (!json) {
|
||
if (error) {
|
||
/* We assume that the error is due to a partial write while
|
||
* appending to the file before a crash, so log it and
|
||
* continue. */
|
||
char *error_string = ovsdb_error_to_string_free(error);
|
||
VLOG_WARN("%s", error_string);
|
||
free(error_string);
|
||
error = NULL;
|
||
}
|
||
break;
|
||
}
|
||
|
||
struct raft_record r;
|
||
error = raft_record_from_json(&r, json);
|
||
if (!error) {
|
||
error = raft_apply_record(raft, i, &r);
|
||
raft_record_uninit(&r);
|
||
}
|
||
json_destroy(json);
|
||
if (error) {
|
||
return ovsdb_wrap_error(error, "error reading record %llu from "
|
||
"%s log", i, raft->name);
|
||
}
|
||
}
|
||
|
||
/* Set the most recent servers. */
|
||
raft_get_servers_from_log(raft, VLL_DBG);
|
||
|
||
/* Set the most recent election_timer. */
|
||
raft_get_election_timer_from_log(raft);
|
||
|
||
return NULL;
|
||
}
|
||
|
||
static void
|
||
raft_reset_election_timer(struct raft *raft)
|
||
{
|
||
unsigned int duration = (raft->election_timer
|
||
+ random_range(ELECTION_RANGE_MSEC));
|
||
raft->election_base = time_msec();
|
||
if (failure_test == FT_DELAY_ELECTION) {
|
||
/* Slow down this node so that it won't win the next election. */
|
||
duration += raft->election_timer;
|
||
}
|
||
raft->election_timeout = raft->election_base + duration;
|
||
}
|
||
|
||
#define RAFT_TIMER_THRESHOLD(t) (t / 3)
|
||
|
||
static void
|
||
raft_reset_ping_timer(struct raft *raft)
|
||
{
|
||
raft->ping_timeout =
|
||
time_msec() + RAFT_TIMER_THRESHOLD(raft->election_timer);
|
||
}
|
||
|
||
static void
|
||
raft_conn_update_probe_interval(struct raft *raft, struct raft_conn *r_conn)
|
||
{
|
||
/* Inactivity probe will be sent if connection will remain idle for the
|
||
* time of an election timeout. Connection will be dropped if inactivity
|
||
* will last twice that time.
|
||
*
|
||
* It's not enough to just have heartbeats if connection is still
|
||
* established, but no packets received from the other side. Without
|
||
* inactivity probe follower will just try to initiate election
|
||
* indefinitely staying in 'candidate' role. And the leader will continue
|
||
* to send heartbeats to the dead connection thinking that remote server
|
||
* is still part of the cluster.
|
||
*
|
||
* While joining, the real value of the election timeout is not known to
|
||
* this server, so using the maximum. */
|
||
int probe_interval = (raft->joining ? ELECTION_MAX_MSEC
|
||
: raft->election_timer)
|
||
+ ELECTION_RANGE_MSEC;
|
||
|
||
jsonrpc_session_set_probe_interval(r_conn->js, probe_interval);
|
||
}
|
||
|
||
static void
|
||
raft_update_probe_intervals(struct raft *raft)
|
||
{
|
||
struct raft_conn *r_conn;
|
||
|
||
LIST_FOR_EACH (r_conn, list_node, &raft->conns) {
|
||
raft_conn_update_probe_interval(raft, r_conn);
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
|
||
const struct uuid *sid, bool incoming)
|
||
{
|
||
struct raft_conn *conn = xzalloc(sizeof *conn);
|
||
ovs_list_push_back(&raft->conns, &conn->list_node);
|
||
conn->js = js;
|
||
if (sid) {
|
||
conn->sid = *sid;
|
||
}
|
||
conn->nickname = raft_address_to_nickname(jsonrpc_session_get_name(js),
|
||
&conn->sid);
|
||
conn->incoming = incoming;
|
||
conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
|
||
raft_conn_update_probe_interval(raft, conn);
|
||
jsonrpc_session_set_backlog_threshold(js, raft->conn_backlog_max_n_msgs,
|
||
raft->conn_backlog_max_n_bytes);
|
||
}
|
||
|
||
/* Starts the local server in an existing Raft cluster, using the local copy of
|
||
* the cluster's log in 'file_name'. Takes ownership of 'log', whether
|
||
* successful or not. */
|
||
struct ovsdb_error * OVS_WARN_UNUSED_RESULT
|
||
raft_open(struct ovsdb_log *log, struct raft **raftp)
|
||
{
|
||
struct raft *raft = raft_alloc();
|
||
raft->log = log;
|
||
|
||
struct ovsdb_error *error = raft_read_header(raft);
|
||
if (error) {
|
||
goto error;
|
||
}
|
||
|
||
if (!raft->joining) {
|
||
error = raft_read_log(raft);
|
||
if (error) {
|
||
goto error;
|
||
}
|
||
|
||
/* Find our own server. */
|
||
if (!raft_find_server(raft, &raft->sid)) {
|
||
error = ovsdb_error(NULL, "server does not belong to cluster");
|
||
goto error;
|
||
}
|
||
|
||
/* If there's only one server, start an election right away so that the
|
||
* cluster bootstraps quickly. */
|
||
if (hmap_count(&raft->servers) == 1) {
|
||
/* No pre-vote needed since we are the only one. */
|
||
raft_start_election(raft, false, false);
|
||
}
|
||
} else {
|
||
raft->join_timeout = time_msec() + RAFT_JOIN_TIMEOUT_MS;
|
||
}
|
||
|
||
raft_reset_ping_timer(raft);
|
||
raft_reset_election_timer(raft);
|
||
|
||
VLOG_INFO("local server ID is "SID_FMT, SID_ARGS(&raft->sid));
|
||
|
||
*raftp = raft;
|
||
hmap_insert(&all_rafts, &raft->hmap_node, hash_string(raft->name, 0));
|
||
return NULL;
|
||
|
||
error:
|
||
raft_close(raft);
|
||
*raftp = NULL;
|
||
return error;
|
||
}
|
||
|
||
/* Returns the name of 'raft', which in OVSDB is the database schema name. */
|
||
const char *
|
||
raft_get_name(const struct raft *raft)
|
||
{
|
||
return raft->name;
|
||
}
|
||
|
||
/* Returns the cluster ID of 'raft'. If 'raft' has not yet completed joining
|
||
* its cluster, then 'cid' will be all-zeros (unless the administrator
|
||
* specified a cluster ID running "ovsdb-tool join-cluster").
|
||
*
|
||
* Each cluster has a unique cluster ID. */
|
||
const struct uuid *
|
||
raft_get_cid(const struct raft *raft)
|
||
{
|
||
return &raft->cid;
|
||
}
|
||
|
||
/* Returns the server ID of 'raft'. Each server has a unique server ID. */
|
||
const struct uuid *
|
||
raft_get_sid(const struct raft *raft)
|
||
{
|
||
return &raft->sid;
|
||
}
|
||
|
||
/* Adds memory consumption info to 'usage' for later use by memory_report(). */
|
||
void
|
||
raft_get_memory_usage(const struct raft *raft, struct simap *usage)
|
||
{
|
||
struct raft_conn *conn;
|
||
uint64_t backlog = 0;
|
||
int cnt = 0;
|
||
|
||
LIST_FOR_EACH (conn, list_node, &raft->conns) {
|
||
backlog += jsonrpc_session_get_backlog(conn->js);
|
||
cnt++;
|
||
}
|
||
simap_increase(usage, "raft-backlog-kB", backlog / 1000);
|
||
simap_increase(usage, "raft-connections", cnt);
|
||
simap_increase(usage, "raft-log", raft->log_end - raft->log_start);
|
||
}
|
||
|
||
/* Returns an error if the election timer (in miliseconds) is out of bounds.
|
||
* Values smaller than 100ms or bigger than 10min don't make sense.
|
||
*/
|
||
struct ovsdb_error *
|
||
raft_validate_election_timer(const uint64_t ms)
|
||
{
|
||
/* Validate optional election timer */
|
||
if (ms < ELECTION_MIN_MSEC || ms > ELECTION_MAX_MSEC) {
|
||
return ovsdb_error(NULL, "election timer must be between %d and "
|
||
"%d, in msec.", ELECTION_MIN_MSEC,
|
||
ELECTION_MAX_MSEC);
|
||
}
|
||
return NULL;
|
||
}
|
||
|
||
/* Returns true if 'raft' has completed joining its cluster, has not left or
|
||
* initiated leaving the cluster, does not have failed disk storage, and is
|
||
* apparently connected to the leader in a healthy way (or is itself the
|
||
* leader).
|
||
*
|
||
* If 'raft' is candidate:
|
||
* a) if it is the first round of election, consider it as connected, hoping
|
||
* it will successfully elect a new leader soon.
|
||
* b) if it is already retrying, consider it as disconnected (so that clients
|
||
* may decide to reconnect to other members). */
|
||
bool
|
||
raft_is_connected(const struct raft *raft)
|
||
{
|
||
static bool last_state = false;
|
||
bool ret = (!raft->candidate_retrying
|
||
&& !raft->joining
|
||
&& !raft->leaving
|
||
&& !raft->left
|
||
&& !raft->failed
|
||
&& raft->ever_had_leader);
|
||
|
||
if (!ret) {
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
|
||
VLOG_DBG_RL(&rl, "raft_is_connected: false");
|
||
} else if (!last_state) {
|
||
VLOG_DBG("raft_is_connected: true");
|
||
}
|
||
last_state = ret;
|
||
|
||
return ret;
|
||
}
|
||
|
||
/* Returns true if 'raft' is the cluster leader. */
|
||
bool
|
||
raft_is_leader(const struct raft *raft)
|
||
{
|
||
return raft->role == RAFT_LEADER;
|
||
}
|
||
|
||
/* Returns true if 'raft' is the process of joining its cluster. */
|
||
bool
|
||
raft_is_joining(const struct raft *raft)
|
||
{
|
||
return raft->joining;
|
||
}
|
||
|
||
/* Only returns *connected* connections. */
|
||
static struct raft_conn *
|
||
raft_find_conn_by_sid(struct raft *raft, const struct uuid *sid)
|
||
{
|
||
if (!uuid_is_zero(sid)) {
|
||
struct raft_conn *conn;
|
||
LIST_FOR_EACH (conn, list_node, &raft->conns) {
|
||
if (uuid_equals(sid, &conn->sid)
|
||
&& jsonrpc_session_is_connected(conn->js)) {
|
||
return conn;
|
||
}
|
||
}
|
||
}
|
||
return NULL;
|
||
}
|
||
|
||
static struct raft_conn *
|
||
raft_find_conn_by_address(struct raft *raft, const char *address)
|
||
{
|
||
struct raft_conn *conn;
|
||
LIST_FOR_EACH (conn, list_node, &raft->conns) {
|
||
if (!strcmp(jsonrpc_session_get_name(conn->js), address)) {
|
||
return conn;
|
||
}
|
||
}
|
||
return NULL;
|
||
}
|
||
|
||
static void OVS_PRINTF_FORMAT(3, 4)
|
||
raft_record_note(struct raft *raft, const char *note,
|
||
const char *comment_format, ...)
|
||
{
|
||
va_list args;
|
||
va_start(args, comment_format);
|
||
char *comment = xvasprintf(comment_format, args);
|
||
va_end(args);
|
||
|
||
struct raft_record r = {
|
||
.type = RAFT_REC_NOTE,
|
||
.comment = comment,
|
||
.note = CONST_CAST(char *, note),
|
||
};
|
||
ovsdb_error_destroy(
|
||
ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
|
||
|
||
free(comment);
|
||
}
|
||
|
||
/* If we're leader, try to transfer leadership to another server, logging
|
||
* 'reason' as the human-readable reason (it should be a phrase suitable for
|
||
* following "because") . */
|
||
void
|
||
raft_transfer_leadership(struct raft *raft, const char *reason)
|
||
{
|
||
if (raft->role != RAFT_LEADER) {
|
||
return;
|
||
}
|
||
|
||
struct raft_server **servers, *s;
|
||
uint64_t threshold = 0;
|
||
size_t n = 0, start, i;
|
||
|
||
servers = xmalloc(hmap_count(&raft->servers) * sizeof *servers);
|
||
|
||
HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
|
||
if (uuid_equals(&raft->sid, &s->sid)
|
||
|| s->phase != RAFT_PHASE_STABLE) {
|
||
continue;
|
||
}
|
||
if (s->match_index > threshold) {
|
||
threshold = s->match_index;
|
||
}
|
||
servers[n++] = s;
|
||
}
|
||
|
||
start = n ? random_range(n) : 0;
|
||
|
||
retry:
|
||
for (i = 0; i < n; i++) {
|
||
s = servers[(start + i) % n];
|
||
|
||
if (s->match_index >= threshold) {
|
||
struct raft_conn *conn = raft_find_conn_by_sid(raft, &s->sid);
|
||
if (!conn) {
|
||
continue;
|
||
}
|
||
|
||
union raft_rpc rpc = {
|
||
.become_leader = {
|
||
.common = {
|
||
.comment = CONST_CAST(char *, reason),
|
||
.type = RAFT_RPC_BECOME_LEADER,
|
||
.sid = s->sid,
|
||
},
|
||
.term = raft->term,
|
||
}
|
||
};
|
||
|
||
if (!raft_send_to_conn(raft, &rpc, conn)) {
|
||
continue;
|
||
}
|
||
|
||
raft_record_note(raft, "transfer leadership",
|
||
"transferring leadership to %s because %s",
|
||
s->nickname, reason);
|
||
break;
|
||
}
|
||
}
|
||
|
||
if (n && i == n && threshold) {
|
||
if (threshold > raft->commit_index) {
|
||
/* Failed to transfer to servers with the highest 'match_index'.
|
||
* Try other servers that are not behind the majority. */
|
||
threshold = raft->commit_index;
|
||
} else {
|
||
/* Try any other server. It is safe, because they either have all
|
||
* the append requests queued up for them before the leadership
|
||
* transfer message or their connection is broken and we will not
|
||
* transfer anyway. */
|
||
threshold = 0;
|
||
}
|
||
goto retry;
|
||
}
|
||
|
||
free(servers);
|
||
}
|
||
|
||
/* Send a RemoveServerRequest to the rest of the servers in the cluster.
|
||
*
|
||
* If we know which server is the leader, we can just send the request to it.
|
||
* However, we might not know which server is the leader, and we might never
|
||
* find out if the remove request was actually previously committed by a
|
||
* majority of the servers (because in that case the new leader will not send
|
||
* AppendRequests or heartbeats to us). Therefore, we instead send
|
||
* RemoveRequests to every server. This theoretically has the same problem, if
|
||
* the current cluster leader was not previously a member of the cluster, but
|
||
* it seems likely to be more robust in practice. */
|
||
static void
|
||
raft_send_remove_server_requests(struct raft *raft)
|
||
{
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
|
||
VLOG_INFO_RL(&rl, "sending remove request (joining=%s, leaving=%s)",
|
||
raft->joining ? "true" : "false",
|
||
raft->leaving ? "true" : "false");
|
||
const struct raft_server *s;
|
||
HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
|
||
if (!uuid_equals(&s->sid, &raft->sid)) {
|
||
union raft_rpc rpc = (union raft_rpc) {
|
||
.remove_server_request = {
|
||
.common = {
|
||
.type = RAFT_RPC_REMOVE_SERVER_REQUEST,
|
||
.sid = s->sid,
|
||
},
|
||
.sid = raft->sid,
|
||
},
|
||
};
|
||
raft_send(raft, &rpc);
|
||
}
|
||
}
|
||
}
|
||
|
||
/* Sends requests required to leave the cluster and schedules the next time
|
||
* this function should be called. */
|
||
static void
|
||
raft_send_leave_requests(struct raft *raft)
|
||
{
|
||
long long int delay = raft->election_timer;
|
||
|
||
if (raft->role == RAFT_LEADER) {
|
||
raft_transfer_leadership(raft, "this server is leaving the cluster");
|
||
raft_become_follower(raft);
|
||
/* Not sending the RemoveServerRequest right away, because a new
|
||
* leader has to be elected first for the request to be successful.
|
||
* But setting a shorter delay to avoid waiting for too long when
|
||
* the leader re-election is fast. Randomized to avoid two servers
|
||
* bouncing the leadership between each other and never actually
|
||
* leaving. */
|
||
delay = delay / 10 + random_range(delay / 10);
|
||
} else {
|
||
raft_send_remove_server_requests(raft);
|
||
}
|
||
raft->leave_timeout = time_msec() + delay;
|
||
}
|
||
|
||
/* Attempts to start 'raft' leaving its cluster. The caller can check progress
|
||
* using raft_is_leaving() and raft_left(). */
|
||
void
|
||
raft_leave(struct raft *raft)
|
||
{
|
||
if (raft->joining || raft->failed || raft->leaving || raft->left) {
|
||
return;
|
||
}
|
||
VLOG_INFO(SID_FMT": starting to leave cluster "CID_FMT,
|
||
SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
|
||
raft->leaving = true;
|
||
raft_send_leave_requests(raft);
|
||
}
|
||
|
||
/* Returns true if 'raft' is currently attempting to leave its cluster. */
|
||
bool
|
||
raft_is_leaving(const struct raft *raft)
|
||
{
|
||
return raft->leaving;
|
||
}
|
||
|
||
/* Returns true if 'raft' successfully left its cluster. */
|
||
bool
|
||
raft_left(const struct raft *raft)
|
||
{
|
||
return raft->left;
|
||
}
|
||
|
||
/* Returns true if 'raft' has experienced a disk I/O failure. When this
|
||
* returns true, only closing and reopening 'raft' allows for recovery. */
|
||
bool
|
||
raft_failed(const struct raft *raft)
|
||
{
|
||
return raft->failed;
|
||
}
|
||
|
||
/* Forces 'raft' to attempt to take leadership of the cluster by deposing the
|
||
* current cluster. */
|
||
void
|
||
raft_take_leadership(struct raft *raft)
|
||
{
|
||
if (raft->role != RAFT_LEADER) {
|
||
raft_start_election(raft, false, true);
|
||
}
|
||
}
|
||
|
||
static void raft_run_cb(void *arg);
|
||
|
||
/* Closes everything owned by 'raft' that might be visible outside the process:
|
||
* network connections, commands, etc. This is part of closing 'raft'; it is
|
||
* also used if 'raft' has failed in an unrecoverable way. */
|
||
static void
|
||
raft_close__(struct raft *raft)
|
||
{
|
||
if (!hmap_node_is_null(&raft->hmap_node)) {
|
||
hmap_remove(&all_rafts, &raft->hmap_node);
|
||
hmap_node_nullify(&raft->hmap_node);
|
||
}
|
||
|
||
raft_complete_all_commands(raft, RAFT_CMD_SHUTDOWN);
|
||
|
||
struct raft_server *rs = raft->remove_server;
|
||
if (rs) {
|
||
raft_send_remove_server_reply__(raft, &rs->sid, &rs->requester_sid,
|
||
rs->requester_conn, false,
|
||
RAFT_SERVER_SHUTDOWN);
|
||
raft_server_destroy(raft->remove_server);
|
||
raft->remove_server = NULL;
|
||
}
|
||
|
||
struct raft_conn *conn;
|
||
LIST_FOR_EACH_SAFE (conn, list_node, &raft->conns) {
|
||
raft_conn_close(conn);
|
||
}
|
||
|
||
cooperative_multitasking_remove(&raft_run_cb, raft);
|
||
}
|
||
|
||
/* Closes and frees 'raft'.
|
||
*
|
||
* A server's cluster membership is independent of whether the server is
|
||
* actually running. When a server that is a member of a cluster closes, the
|
||
* cluster treats this as a server failure. */
|
||
void
|
||
raft_close(struct raft *raft)
|
||
{
|
||
if (!raft) {
|
||
return;
|
||
}
|
||
|
||
raft_transfer_leadership(raft, "this server is shutting down");
|
||
|
||
raft_close__(raft);
|
||
|
||
ovsdb_log_close(raft->log);
|
||
|
||
raft_servers_destroy(&raft->servers);
|
||
|
||
for (uint64_t index = raft->log_start; index < raft->log_end; index++) {
|
||
struct raft_entry *e = &raft->entries[index - raft->log_start];
|
||
raft_entry_uninit(e);
|
||
}
|
||
free(raft->entries);
|
||
|
||
raft_entry_uninit(&raft->snap);
|
||
|
||
raft_waiters_destroy(raft);
|
||
|
||
raft_servers_destroy(&raft->add_servers);
|
||
|
||
hmap_destroy(&raft->commands);
|
||
|
||
pstream_close(raft->listener);
|
||
|
||
sset_destroy(&raft->remote_addresses);
|
||
free(raft->local_address);
|
||
free(raft->local_nickname);
|
||
free(raft->name);
|
||
|
||
free(raft);
|
||
}
|
||
|
||
static bool
|
||
raft_conn_receive(struct raft *raft, struct raft_conn *conn,
|
||
union raft_rpc *rpc)
|
||
{
|
||
struct jsonrpc_msg *msg = jsonrpc_session_recv(conn->js);
|
||
if (!msg) {
|
||
return false;
|
||
}
|
||
|
||
struct ovsdb_error *error = raft_rpc_from_jsonrpc(&raft->cid, &raft->sid,
|
||
msg, rpc);
|
||
jsonrpc_msg_destroy(msg);
|
||
if (error) {
|
||
char *s = ovsdb_error_to_string_free(error);
|
||
VLOG_INFO("%s: %s", jsonrpc_session_get_name(conn->js), s);
|
||
free(s);
|
||
return false;
|
||
}
|
||
|
||
if (uuid_is_zero(&conn->sid)) {
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
|
||
conn->sid = rpc->common.sid;
|
||
VLOG_INFO_RL(&rl, "%s: learned server ID "SID_FMT,
|
||
jsonrpc_session_get_name(conn->js), SID_ARGS(&conn->sid));
|
||
} else if (!uuid_equals(&conn->sid, &rpc->common.sid)) {
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
|
||
VLOG_WARN_RL(&rl, "%s: ignoring message with unexpected server ID "
|
||
SID_FMT" (expected "SID_FMT")",
|
||
jsonrpc_session_get_name(conn->js),
|
||
SID_ARGS(&rpc->common.sid), SID_ARGS(&conn->sid));
|
||
raft_rpc_uninit(rpc);
|
||
return false;
|
||
}
|
||
|
||
const char *address = (rpc->type == RAFT_RPC_HELLO_REQUEST
|
||
? rpc->hello_request.address
|
||
: rpc->type == RAFT_RPC_ADD_SERVER_REQUEST
|
||
? rpc->add_server_request.address
|
||
: NULL);
|
||
if (address) {
|
||
char *new_nickname = raft_address_to_nickname(address, &conn->sid);
|
||
if (strcmp(conn->nickname, new_nickname)) {
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(50, 50);
|
||
VLOG_INFO_RL(&rl, "%s: learned remote address %s",
|
||
jsonrpc_session_get_name(conn->js), address);
|
||
|
||
free(conn->nickname);
|
||
conn->nickname = new_nickname;
|
||
} else {
|
||
free(new_nickname);
|
||
}
|
||
}
|
||
|
||
return true;
|
||
}
|
||
|
||
static const char *
|
||
raft_get_nickname(const struct raft *raft, const struct uuid *sid,
|
||
char buf[SID_LEN + 1], size_t bufsize)
|
||
{
|
||
if (uuid_equals(sid, &raft->sid)) {
|
||
return raft->local_nickname;
|
||
}
|
||
|
||
const char *s = raft_servers_get_nickname__(&raft->servers, sid);
|
||
if (s) {
|
||
return s;
|
||
}
|
||
|
||
return raft_servers_get_nickname(&raft->add_servers, sid, buf, bufsize);
|
||
}
|
||
|
||
static void
|
||
log_rpc(const union raft_rpc *rpc, const char *direction,
|
||
const struct raft_conn *conn, int line_number)
|
||
{
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(600, 600);
|
||
if (!raft_rpc_is_heartbeat(rpc) && !VLOG_DROP_DBG(&rl)) {
|
||
struct ds s = DS_EMPTY_INITIALIZER;
|
||
if (line_number) {
|
||
ds_put_format(&s, "raft.c:%d ", line_number);
|
||
}
|
||
ds_put_format(&s, "%s%s ", direction, conn->nickname);
|
||
raft_rpc_format(rpc, &s);
|
||
VLOG_DBG("%s", ds_cstr(&s));
|
||
ds_destroy(&s);
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_send_add_server_request(struct raft *raft, struct raft_conn *conn)
|
||
{
|
||
union raft_rpc rq = {
|
||
.add_server_request = {
|
||
.common = {
|
||
.type = RAFT_RPC_ADD_SERVER_REQUEST,
|
||
.sid = UUID_ZERO,
|
||
.comment = NULL,
|
||
},
|
||
.address = raft->local_address,
|
||
},
|
||
};
|
||
raft_send_to_conn(raft, &rq, conn);
|
||
}
|
||
|
||
static void
|
||
raft_conn_run(struct raft *raft, struct raft_conn *conn)
|
||
{
|
||
if (failure_test == FT_STOP_RAFT_RPC) {
|
||
return;
|
||
}
|
||
|
||
jsonrpc_session_run(conn->js);
|
||
|
||
unsigned int new_seqno = jsonrpc_session_get_seqno(conn->js);
|
||
bool reconnected = new_seqno != conn->js_seqno;
|
||
bool just_connected = (reconnected
|
||
&& jsonrpc_session_is_connected(conn->js));
|
||
|
||
if (reconnected) {
|
||
/* Clear 'install_snapshot_request_in_progress' since it might not
|
||
* reach the destination or server was restarted. */
|
||
struct raft_server *server = raft_find_server(raft, &conn->sid);
|
||
if (server) {
|
||
server->install_snapshot_request_in_progress = false;
|
||
}
|
||
}
|
||
|
||
conn->js_seqno = new_seqno;
|
||
if (just_connected) {
|
||
if (raft->joining) {
|
||
raft_send_add_server_request(raft, conn);
|
||
} else if (raft->leaving) {
|
||
union raft_rpc rq = {
|
||
.remove_server_request = {
|
||
.common = {
|
||
.type = RAFT_RPC_REMOVE_SERVER_REQUEST,
|
||
.sid = conn->sid,
|
||
},
|
||
.sid = raft->sid,
|
||
},
|
||
};
|
||
raft_send_to_conn(raft, &rq, conn);
|
||
} else {
|
||
union raft_rpc rq = (union raft_rpc) {
|
||
.hello_request = {
|
||
.common = {
|
||
.type = RAFT_RPC_HELLO_REQUEST,
|
||
.sid = conn->sid,
|
||
},
|
||
.address = raft->local_address,
|
||
},
|
||
};
|
||
raft_send_to_conn(raft, &rq, conn);
|
||
}
|
||
}
|
||
|
||
for (size_t i = 0; i < 50; i++) {
|
||
union raft_rpc rpc;
|
||
if (!raft_conn_receive(raft, conn, &rpc)) {
|
||
break;
|
||
}
|
||
|
||
log_rpc(&rpc, "<--", conn, 0);
|
||
raft_handle_rpc(raft, &rpc);
|
||
raft_rpc_uninit(&rpc);
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_waiter_complete_rpc(struct raft *raft, const union raft_rpc *rpc)
|
||
{
|
||
uint64_t term = raft_rpc_get_term(rpc);
|
||
if (term && term < raft->term) {
|
||
/* Drop the message because it's for an expired term. */
|
||
return;
|
||
}
|
||
|
||
if (!raft_is_rpc_synced(raft, rpc)) {
|
||
/* This is a bug. A reply message is deferred because some state in
|
||
* the message, such as a term or index, has not been committed to
|
||
* disk, and they should only be completed when that commit is done.
|
||
* But this message is being completed before the commit is finished.
|
||
* Complain, and hope that someone reports the bug. */
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
|
||
if (VLOG_DROP_ERR(&rl)) {
|
||
return;
|
||
}
|
||
|
||
struct ds s = DS_EMPTY_INITIALIZER;
|
||
|
||
if (term > raft->synced_term) {
|
||
ds_put_format(&s, " because message term %"PRIu64" is "
|
||
"past synced term %"PRIu64,
|
||
term, raft->synced_term);
|
||
}
|
||
|
||
uint64_t index = raft_rpc_get_min_sync_index(rpc);
|
||
if (index > raft->log_synced) {
|
||
ds_put_format(&s, " %s message index %"PRIu64" is past last "
|
||
"synced index %"PRIu64,
|
||
s.length ? "and" : "because",
|
||
index, raft->log_synced);
|
||
}
|
||
|
||
const struct uuid *vote = raft_rpc_get_vote(rpc);
|
||
if (vote && !uuid_equals(vote, &raft->synced_vote)) {
|
||
char buf1[SID_LEN + 1];
|
||
char buf2[SID_LEN + 1];
|
||
ds_put_format(&s, " %s vote %s differs from synced vote %s",
|
||
s.length ? "and" : "because",
|
||
raft_get_nickname(raft, vote, buf1, sizeof buf1),
|
||
raft_get_nickname(raft, &raft->synced_vote,
|
||
buf2, sizeof buf2));
|
||
}
|
||
|
||
char buf[SID_LEN + 1];
|
||
ds_put_format(&s, ": %s ",
|
||
raft_get_nickname(raft, &rpc->common.sid,
|
||
buf, sizeof buf));
|
||
raft_rpc_format(rpc, &s);
|
||
VLOG_ERR("internal error: deferred %s message completed "
|
||
"but not ready to send%s",
|
||
raft_rpc_type_to_string(rpc->type), ds_cstr(&s));
|
||
ds_destroy(&s);
|
||
|
||
return;
|
||
}
|
||
|
||
struct raft_conn *dst = raft_find_conn_by_sid(raft, &rpc->common.sid);
|
||
if (dst) {
|
||
raft_send_to_conn(raft, rpc, dst);
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_waiter_complete(struct raft *raft, struct raft_waiter *w)
|
||
{
|
||
switch (w->type) {
|
||
case RAFT_W_ENTRY:
|
||
if (raft->role == RAFT_LEADER) {
|
||
raft_update_our_match_index(raft, w->entry.index);
|
||
}
|
||
raft->log_synced = w->entry.index;
|
||
break;
|
||
|
||
case RAFT_W_TERM:
|
||
raft->synced_term = w->term.term;
|
||
raft->synced_vote = w->term.vote;
|
||
break;
|
||
|
||
case RAFT_W_RPC:
|
||
raft_waiter_complete_rpc(raft, w->rpc);
|
||
break;
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_waiter_destroy(struct raft_waiter *w)
|
||
{
|
||
if (!w) {
|
||
return;
|
||
}
|
||
|
||
ovs_list_remove(&w->list_node);
|
||
|
||
switch (w->type) {
|
||
case RAFT_W_ENTRY:
|
||
case RAFT_W_TERM:
|
||
break;
|
||
|
||
case RAFT_W_RPC:
|
||
raft_rpc_uninit(w->rpc);
|
||
free(w->rpc);
|
||
break;
|
||
}
|
||
free(w);
|
||
}
|
||
|
||
static void
|
||
raft_waiters_run(struct raft *raft)
|
||
{
|
||
if (ovs_list_is_empty(&raft->waiters)) {
|
||
return;
|
||
}
|
||
|
||
uint64_t cur = ovsdb_log_commit_progress(raft->log);
|
||
struct raft_waiter *w;
|
||
LIST_FOR_EACH_SAFE (w, list_node, &raft->waiters) {
|
||
if (cur < w->commit_ticket) {
|
||
break;
|
||
}
|
||
raft_waiter_complete(raft, w);
|
||
raft_waiter_destroy(w);
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_waiters_wait(struct raft *raft)
|
||
{
|
||
struct raft_waiter *w;
|
||
LIST_FOR_EACH (w, list_node, &raft->waiters) {
|
||
ovsdb_log_commit_wait(raft->log, w->commit_ticket);
|
||
break;
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_waiters_destroy(struct raft *raft)
|
||
{
|
||
struct raft_waiter *w;
|
||
LIST_FOR_EACH_SAFE (w, list_node, &raft->waiters) {
|
||
raft_waiter_destroy(w);
|
||
}
|
||
}
|
||
|
||
static bool OVS_WARN_UNUSED_RESULT
|
||
raft_set_term(struct raft *raft, uint64_t term, const struct uuid *vote)
|
||
{
|
||
struct ovsdb_error *error = raft_write_state(raft->log, term, vote);
|
||
if (!raft_handle_write_error(raft, error)) {
|
||
return false;
|
||
}
|
||
|
||
struct raft_waiter *w = raft_waiter_create(raft, RAFT_W_TERM, true);
|
||
raft->term = w->term.term = term;
|
||
raft->vote = w->term.vote = vote ? *vote : UUID_ZERO;
|
||
return true;
|
||
}
|
||
|
||
static bool
|
||
raft_accept_vote(struct raft *raft, struct raft_server *s,
|
||
const struct uuid *vote)
|
||
{
|
||
if (uuid_equals(&s->vote, vote)) {
|
||
return false;
|
||
}
|
||
if (!uuid_is_zero(&s->vote)) {
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
|
||
char buf1[SID_LEN + 1];
|
||
char buf2[SID_LEN + 1];
|
||
VLOG_WARN_RL(&rl, "server %s changed its vote from %s to %s",
|
||
s->nickname,
|
||
raft_get_nickname(raft, &s->vote, buf1, sizeof buf1),
|
||
raft_get_nickname(raft, vote, buf2, sizeof buf2));
|
||
}
|
||
s->vote = *vote;
|
||
if (uuid_equals(vote, &raft->sid)
|
||
&& ++raft->n_votes > hmap_count(&raft->servers) / 2) {
|
||
return true;
|
||
}
|
||
return false;
|
||
}
|
||
|
||
static void
|
||
raft_start_election(struct raft *raft, bool is_prevote,
|
||
bool leadership_transfer)
|
||
{
|
||
/* Leadership transfer doesn't use pre-vote. */
|
||
ovs_assert(!is_prevote || !leadership_transfer);
|
||
|
||
struct raft_server *me = raft_find_server(raft, &raft->sid);
|
||
if (!me) {
|
||
return;
|
||
}
|
||
|
||
if (!is_prevote && !raft_set_term(raft, raft->term + 1, &raft->sid)) {
|
||
return;
|
||
}
|
||
|
||
ovs_assert(raft->role != RAFT_LEADER);
|
||
|
||
raft->leader_sid = UUID_ZERO;
|
||
raft->prevote_passed = !is_prevote;
|
||
raft_role_transition(raft, RAFT_CANDIDATE);
|
||
|
||
if (is_prevote || leadership_transfer) {
|
||
/* If there was no leader elected since last election, we know we are
|
||
* retrying now. */
|
||
raft->candidate_retrying = !raft->had_leader;
|
||
raft->had_leader = false;
|
||
|
||
raft->election_start = time_msec();
|
||
raft->election_won = 0;
|
||
}
|
||
|
||
raft->n_votes = 0;
|
||
|
||
raft->leadership_transfer = leadership_transfer;
|
||
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
|
||
if (!VLOG_DROP_INFO(&rl)) {
|
||
long long int now = time_msec();
|
||
char *comment = is_prevote ? "pre-vote" : "vote";
|
||
if (now >= raft->election_timeout) {
|
||
VLOG_INFO("term %"PRIu64": %lld ms timeout expired, "
|
||
"starting election (%s)",
|
||
raft->term, now - raft->election_base, comment);
|
||
} else {
|
||
VLOG_INFO("term %"PRIu64": starting election (%s)",
|
||
raft->term, comment);
|
||
}
|
||
}
|
||
raft_reset_election_timer(raft);
|
||
|
||
struct raft_server *peer;
|
||
HMAP_FOR_EACH (peer, hmap_node, &raft->servers) {
|
||
peer->vote = UUID_ZERO;
|
||
if (uuid_equals(&raft->sid, &peer->sid)) {
|
||
continue;
|
||
}
|
||
|
||
union raft_rpc rq = {
|
||
.vote_request = {
|
||
.common = {
|
||
.type = RAFT_RPC_VOTE_REQUEST,
|
||
.sid = peer->sid,
|
||
},
|
||
.term = raft->term,
|
||
.last_log_index = raft->log_end - 1,
|
||
.last_log_term = (
|
||
raft->log_end > raft->log_start
|
||
? raft->entries[raft->log_end - raft->log_start - 1].term
|
||
: raft->snap.term),
|
||
.leadership_transfer = leadership_transfer,
|
||
.is_prevote = is_prevote,
|
||
},
|
||
};
|
||
if (failure_test != FT_DONT_SEND_VOTE_REQUEST) {
|
||
raft_send(raft, &rq);
|
||
}
|
||
}
|
||
|
||
/* Vote for ourselves. */
|
||
if (raft_accept_vote(raft, me, &raft->sid)) {
|
||
/* We just started vote, so it shouldn't be accepted yet unless this is
|
||
* a one-node cluster. In such case we don't do pre-vote, and become
|
||
* leader immediately. */
|
||
ovs_assert(!is_prevote);
|
||
raft_become_leader(raft);
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_open_conn(struct raft *raft, const char *address, const struct uuid *sid)
|
||
{
|
||
if (strcmp(address, raft->local_address)
|
||
&& !raft_find_conn_by_address(raft, address)
|
||
&& failure_test != FT_STOP_RAFT_RPC) {
|
||
raft_add_conn(raft, jsonrpc_session_open(address, true), sid, false);
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_conn_close(struct raft_conn *conn)
|
||
{
|
||
VLOG_DBG("closing connection to server %s (%s)",
|
||
conn->nickname, jsonrpc_session_get_name(conn->js));
|
||
jsonrpc_session_close(conn->js);
|
||
ovs_list_remove(&conn->list_node);
|
||
free(conn->nickname);
|
||
free(conn);
|
||
}
|
||
|
||
/* Returns true if 'conn' should stay open, false if it should be closed. */
|
||
static bool
|
||
raft_conn_should_stay_open(struct raft *raft, struct raft_conn *conn)
|
||
{
|
||
/* Close the connection if it's actually dead. If necessary, we'll
|
||
* initiate a new session later. */
|
||
if (!jsonrpc_session_is_alive(conn->js)) {
|
||
return false;
|
||
}
|
||
|
||
/* Keep incoming sessions. We trust the originator to decide to drop
|
||
* it. */
|
||
if (conn->incoming) {
|
||
return true;
|
||
}
|
||
|
||
/* If we are joining the cluster, keep sessions to the remote addresses
|
||
* that are supposed to be part of the cluster we're joining. */
|
||
if (raft->joining && sset_contains(&raft->remote_addresses,
|
||
jsonrpc_session_get_name(conn->js))) {
|
||
return true;
|
||
}
|
||
|
||
/* Keep the connection until we send a RemoveServerReply. */
|
||
if (raft->remove_server
|
||
&& uuid_equals(&conn->sid, &raft->remove_server->sid)) {
|
||
return true;
|
||
}
|
||
|
||
/* We have joined the cluster. If we did that "recently", then there is a
|
||
* chance that we do not have the most recent server configuration log
|
||
* entry. If so, it's a waste to disconnect from the servers that were in
|
||
* remote_addresses and that will probably appear in the configuration,
|
||
* just to reconnect to them a moment later when we do get the
|
||
* configuration update. If we are not ourselves in the configuration,
|
||
* then we know that there must be a new configuration coming up, so in
|
||
* that case keep the connection. */
|
||
if (!raft_find_server(raft, &raft->sid)) {
|
||
return true;
|
||
}
|
||
|
||
/* Keep the connection only if the server is part of the configuration. */
|
||
return raft_find_server(raft, &conn->sid);
|
||
}
|
||
|
||
/* Allows 'raft' to maintain the distributed log. Call this function as part
|
||
* of the process's main loop. */
|
||
void
|
||
raft_run(struct raft *raft)
|
||
{
|
||
if (raft->left || raft->failed) {
|
||
return;
|
||
}
|
||
|
||
if (failure_test == FT_TRANSFER_LEADERSHIP) {
|
||
/* Using this function as it conveniently implements all we need and
|
||
* snapshotting is the main test scenario for leadership transfer. */
|
||
raft_notify_snapshot_recommended(raft);
|
||
failure_test = FT_NO_TEST;
|
||
}
|
||
|
||
raft_waiters_run(raft);
|
||
|
||
if (!raft->listener && time_msec() >= raft->listen_backoff) {
|
||
char *paddr = raft_make_address_passive(raft->local_address);
|
||
int error = pstream_open(paddr, &raft->listener, DSCP_DEFAULT);
|
||
if (error) {
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
|
||
VLOG_WARN_RL(&rl, "%s: listen failed (%s)",
|
||
paddr, ovs_strerror(error));
|
||
raft->listen_backoff = time_msec() + 1000;
|
||
}
|
||
free(paddr);
|
||
}
|
||
|
||
if (raft->listener && failure_test != FT_STOP_RAFT_RPC) {
|
||
struct stream *stream;
|
||
int error = pstream_accept(raft->listener, &stream);
|
||
if (!error) {
|
||
raft_add_conn(raft, jsonrpc_session_open_unreliably(
|
||
jsonrpc_open(stream), DSCP_DEFAULT), NULL,
|
||
true);
|
||
} else if (error != EAGAIN) {
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
|
||
VLOG_WARN_RL(&rl, "%s: accept failed: %s",
|
||
pstream_get_name(raft->listener),
|
||
ovs_strerror(error));
|
||
}
|
||
}
|
||
|
||
/* Run RPCs for all open sessions. */
|
||
struct raft_conn *conn;
|
||
LIST_FOR_EACH (conn, list_node, &raft->conns) {
|
||
raft_conn_run(raft, conn);
|
||
}
|
||
|
||
/* Close unneeded sessions. */
|
||
struct raft_server *server;
|
||
LIST_FOR_EACH_SAFE (conn, list_node, &raft->conns) {
|
||
if (!raft_conn_should_stay_open(raft, conn)) {
|
||
server = raft_find_new_server(raft, &conn->sid);
|
||
if (server) {
|
||
/* We only have one incoming connection from joining servers,
|
||
* so if it's closed, we need to destroy the record about the
|
||
* server. This way the process can be started over on the
|
||
* next join request. */
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
|
||
VLOG_INFO_RL(&rl, "cluster "CID_FMT": server %s (%s) "
|
||
"disconnected while joining",
|
||
CID_ARGS(&raft->cid),
|
||
server->nickname, server->address);
|
||
hmap_remove(&raft->add_servers, &server->hmap_node);
|
||
raft_server_destroy(server);
|
||
}
|
||
raft->n_disconnections++;
|
||
raft_conn_close(conn);
|
||
}
|
||
}
|
||
|
||
/* Open needed sessions. */
|
||
HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
|
||
raft_open_conn(raft, server->address, &server->sid);
|
||
}
|
||
if (raft->joining) {
|
||
const char *address;
|
||
SSET_FOR_EACH (address, &raft->remote_addresses) {
|
||
raft_open_conn(raft, address, NULL);
|
||
}
|
||
}
|
||
|
||
if (!raft->joining && time_msec() >= raft->election_timeout) {
|
||
if (raft->role == RAFT_LEADER) {
|
||
/* Check if majority of followers replied, then reset
|
||
* election_timeout and reset s->replied. Otherwise, become
|
||
* follower.
|
||
*
|
||
* Raft paper section 6.2: Leaders: A server might be in the leader
|
||
* state, but if it isn't the current leader, it could be
|
||
* needlessly delaying client requests. For example, suppose a
|
||
* leader is partitioned from the rest of the cluster, but it can
|
||
* still communicate with a particular client. Without additional
|
||
* mechanism, it could delay a request from that client forever,
|
||
* being unable to replicate a log entry to any other servers.
|
||
* Meanwhile, there might be another leader of a newer term that is
|
||
* able to communicate with a majority of the cluster and would be
|
||
* able to commit the client's request. Thus, a leader in Raft
|
||
* steps down if an election timeout elapses without a successful
|
||
* round of heartbeats to a majority of its cluster; this allows
|
||
* clients to retry their requests with another server. */
|
||
int count = 0;
|
||
HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
|
||
if (server->replied) {
|
||
count ++;
|
||
}
|
||
}
|
||
VLOG_DBG("%d out of %"PRIuSIZE" servers replied",
|
||
count, hmap_count(&raft->servers));
|
||
if (count >= hmap_count(&raft->servers) / 2) {
|
||
HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
|
||
server->replied = false;
|
||
}
|
||
raft_reset_election_timer(raft);
|
||
} else {
|
||
raft_become_follower(raft);
|
||
raft_start_election(raft, true, false);
|
||
}
|
||
} else {
|
||
raft_start_election(raft, hmap_count(&raft->servers) > 1, false);
|
||
}
|
||
|
||
}
|
||
|
||
if (raft->leaving && time_msec() >= raft->leave_timeout) {
|
||
raft_send_leave_requests(raft);
|
||
}
|
||
|
||
if (raft->joining && time_msec() >= raft->join_timeout) {
|
||
raft->join_timeout = time_msec() + RAFT_JOIN_TIMEOUT_MS;
|
||
LIST_FOR_EACH (conn, list_node, &raft->conns) {
|
||
raft_send_add_server_request(raft, conn);
|
||
}
|
||
}
|
||
|
||
long long int now = time_msec();
|
||
if (now >= raft->ping_timeout) {
|
||
if (raft->role == RAFT_LEADER) {
|
||
raft_send_heartbeats(raft);
|
||
}
|
||
/* Check if any commands timeout. Timeout is set to twice the time of
|
||
* election base time so that commands can complete properly during
|
||
* leader election. E.g. a leader crashed and current node with pending
|
||
* commands becomes new leader: the pending commands can still complete
|
||
* if the crashed leader has replicated the transactions to majority of
|
||
* followers before it crashed. */
|
||
struct raft_command *cmd;
|
||
HMAP_FOR_EACH_SAFE (cmd, hmap_node, &raft->commands) {
|
||
if (cmd->timestamp
|
||
&& now - cmd->timestamp > raft->election_timer * 2) {
|
||
if (cmd->index && raft->role != RAFT_LEADER) {
|
||
/* This server lost leadership and command didn't complete
|
||
* in time. Likely, it wasn't replicated to the majority
|
||
* of servers before losing the leadership. */
|
||
raft_command_complete(raft, cmd, RAFT_CMD_LOST_LEADERSHIP);
|
||
} else {
|
||
raft_command_complete(raft, cmd, RAFT_CMD_TIMEOUT);
|
||
}
|
||
}
|
||
}
|
||
raft_reset_ping_timer(raft);
|
||
}
|
||
|
||
uint64_t interval = RAFT_TIMER_THRESHOLD(raft->election_timer);
|
||
|
||
if (raft->joining) {
|
||
interval = RAFT_JOIN_TIMEOUT_MS;
|
||
} else if (uuid_is_zero(&raft->leader_sid)) {
|
||
/* There are no heartbeats to handle when there is no leader. */
|
||
interval = raft->election_timer;
|
||
}
|
||
cooperative_multitasking_set(
|
||
&raft_run_cb, (void *) raft, time_msec(),
|
||
interval + interval / 10, "raft_run");
|
||
|
||
/* Do this only at the end; if we did it as soon as we set raft->left or
|
||
* raft->failed in handling the RemoveServerReply, then it could easily
|
||
* cause references to freed memory in RPC sessions, etc. */
|
||
if (raft->left || raft->failed) {
|
||
raft_close__(raft);
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_run_cb(void *arg)
|
||
{
|
||
struct raft *raft = (struct raft *) arg;
|
||
|
||
raft_run(raft);
|
||
}
|
||
|
||
static void
|
||
raft_wait_session(struct jsonrpc_session *js)
|
||
{
|
||
if (js && failure_test != FT_STOP_RAFT_RPC) {
|
||
jsonrpc_session_wait(js);
|
||
jsonrpc_session_recv_wait(js);
|
||
}
|
||
}
|
||
|
||
/* Causes the next call to poll_block() to wake up when 'raft' needs to do
|
||
* something. */
|
||
void
|
||
raft_wait(struct raft *raft)
|
||
{
|
||
if (raft->left || raft->failed) {
|
||
return;
|
||
}
|
||
|
||
raft_waiters_wait(raft);
|
||
|
||
if (failure_test != FT_STOP_RAFT_RPC) {
|
||
if (raft->listener) {
|
||
pstream_wait(raft->listener);
|
||
} else {
|
||
poll_timer_wait_until(raft->listen_backoff);
|
||
}
|
||
}
|
||
|
||
struct raft_conn *conn;
|
||
LIST_FOR_EACH (conn, list_node, &raft->conns) {
|
||
raft_wait_session(conn->js);
|
||
}
|
||
|
||
if (!raft->joining) {
|
||
poll_timer_wait_until(raft->election_timeout);
|
||
} else {
|
||
poll_timer_wait_until(raft->join_timeout);
|
||
}
|
||
if (raft->leaving) {
|
||
poll_timer_wait_until(raft->leave_timeout);
|
||
}
|
||
if (raft->role == RAFT_LEADER || !hmap_is_empty(&raft->commands)) {
|
||
poll_timer_wait_until(raft->ping_timeout);
|
||
}
|
||
}
|
||
|
||
static struct raft_waiter *
|
||
raft_waiter_create(struct raft *raft, enum raft_waiter_type type,
|
||
bool start_commit)
|
||
{
|
||
struct raft_waiter *w = xzalloc(sizeof *w);
|
||
ovs_list_push_back(&raft->waiters, &w->list_node);
|
||
w->commit_ticket = start_commit ? ovsdb_log_commit_start(raft->log) : 0;
|
||
w->type = type;
|
||
return w;
|
||
}
|
||
|
||
/* Returns a human-readable representation of 'status' (or NULL if 'status' is
|
||
* invalid). */
|
||
const char *
|
||
raft_command_status_to_string(enum raft_command_status status)
|
||
{
|
||
switch (status) {
|
||
case RAFT_CMD_INCOMPLETE:
|
||
return "operation still in progress";
|
||
case RAFT_CMD_SUCCESS:
|
||
return "success";
|
||
case RAFT_CMD_NOT_LEADER:
|
||
return "not leader";
|
||
case RAFT_CMD_BAD_PREREQ:
|
||
return "prerequisite check failed";
|
||
case RAFT_CMD_LOST_LEADERSHIP:
|
||
return "lost leadership";
|
||
case RAFT_CMD_SHUTDOWN:
|
||
return "server shutdown";
|
||
case RAFT_CMD_IO_ERROR:
|
||
return "I/O error";
|
||
case RAFT_CMD_TIMEOUT:
|
||
return "timeout";
|
||
default:
|
||
return NULL;
|
||
}
|
||
}
|
||
|
||
/* Converts human-readable status in 's' into status code in '*statusp'.
|
||
* Returns true if successful, false if 's' is unknown. */
|
||
bool
|
||
raft_command_status_from_string(const char *s,
|
||
enum raft_command_status *statusp)
|
||
{
|
||
for (enum raft_command_status status = 0; ; status++) {
|
||
const char *s2 = raft_command_status_to_string(status);
|
||
if (!s2) {
|
||
*statusp = 0;
|
||
return false;
|
||
} else if (!strcmp(s, s2)) {
|
||
*statusp = status;
|
||
return true;
|
||
}
|
||
}
|
||
}
|
||
|
||
static const struct uuid *
|
||
raft_get_eid(const struct raft *raft, uint64_t index)
|
||
{
|
||
for (; index >= raft->log_start; index--) {
|
||
const struct raft_entry *e = raft_get_entry(raft, index);
|
||
if (raft_entry_has_data(e)) {
|
||
return &e->eid;
|
||
}
|
||
}
|
||
return &raft->snap.eid;
|
||
}
|
||
|
||
static const struct uuid *
|
||
raft_current_eid(const struct raft *raft)
|
||
{
|
||
return raft_get_eid(raft, raft->log_end - 1);
|
||
}
|
||
|
||
bool
|
||
raft_precheck_prereq(const struct raft *raft, const struct uuid *prereq)
|
||
{
|
||
if (!uuid_equals(raft_current_eid(raft), prereq)) {
|
||
VLOG_DBG("%s: prerequisites (" UUID_FMT ") "
|
||
"do not match current eid (" UUID_FMT ")",
|
||
__func__, UUID_ARGS(prereq),
|
||
UUID_ARGS(raft_current_eid(raft)));
|
||
return false;
|
||
}
|
||
|
||
/* Incomplete commands on a leader will not change the leader's current
|
||
* 'eid' on commit as they are already part of the leader's log. */
|
||
if (raft->role == RAFT_LEADER) {
|
||
return true;
|
||
}
|
||
|
||
/* Having incomplete commands on a follower means that the leader has
|
||
* these commands and they will change the prerequisites once added to
|
||
* the leader's log.
|
||
*
|
||
* There is a chance that all these commands will actually fail and the
|
||
* record with current prerequisites will in fact succeed, but, since
|
||
* these are our own commands, the chances are low. */
|
||
struct raft_command *cmd;
|
||
HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
|
||
/* Skip commands that are already part of the log (have non-zero
|
||
* index) and ones that do not carry any data (have zero 'eid'),
|
||
* as they can't change prerequisites.
|
||
*
|
||
* Database will not re-run triggers unless the data changes or
|
||
* one of the data-carrying triggers completes. So, pre-check must
|
||
* not fail if there are no outstanding data-carrying commands. */
|
||
if (!cmd->index && !uuid_is_zero(&cmd->eid)) {
|
||
VLOG_DBG("%s: follower still has an incomplete command "
|
||
UUID_FMT, __func__, UUID_ARGS(&cmd->eid));
|
||
return false;
|
||
}
|
||
}
|
||
|
||
return true;
|
||
}
|
||
|
||
static struct raft_command *
|
||
raft_command_create_completed(enum raft_command_status status)
|
||
{
|
||
ovs_assert(status != RAFT_CMD_INCOMPLETE);
|
||
|
||
struct raft_command *cmd = xzalloc(sizeof *cmd);
|
||
cmd->n_refs = 1;
|
||
cmd->status = status;
|
||
return cmd;
|
||
}
|
||
|
||
static struct raft_command *
|
||
raft_command_create_incomplete(struct raft *raft, uint64_t index)
|
||
{
|
||
struct raft_command *cmd = xzalloc(sizeof *cmd);
|
||
cmd->n_refs = 2; /* One for client, one for raft->commands. */
|
||
cmd->index = index;
|
||
cmd->status = RAFT_CMD_INCOMPLETE;
|
||
hmap_insert(&raft->commands, &cmd->hmap_node, cmd->index);
|
||
return cmd;
|
||
}
|
||
|
||
static struct raft_command * OVS_WARN_UNUSED_RESULT
|
||
raft_command_initiate(struct raft *raft,
|
||
const struct json *data, const struct json *servers,
|
||
uint64_t election_timer, const struct uuid *eid)
|
||
{
|
||
/* Write to local log. */
|
||
uint64_t index = raft->log_end;
|
||
if (!raft_handle_write_error(
|
||
raft, raft_write_entry(
|
||
raft, raft->term, json_nullable_clone(data), eid,
|
||
json_nullable_clone(servers),
|
||
election_timer))) {
|
||
return raft_command_create_completed(RAFT_CMD_IO_ERROR);
|
||
}
|
||
|
||
struct raft_command *cmd = raft_command_create_incomplete(raft, index);
|
||
ovs_assert(eid);
|
||
cmd->eid = *eid;
|
||
cmd->timestamp = time_msec();
|
||
|
||
raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index = cmd->index;
|
||
|
||
if (failure_test == FT_CRASH_BEFORE_SEND_APPEND_REQ) {
|
||
ovs_fatal(0, "Raft test: crash before sending append_request.");
|
||
}
|
||
/* Write to remote logs. */
|
||
struct raft_server *s;
|
||
HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
|
||
if (!uuid_equals(&s->sid, &raft->sid) && s->next_index == index) {
|
||
raft_send_append_request(raft, s, 1, "execute command");
|
||
s->next_index++;
|
||
}
|
||
}
|
||
if (failure_test == FT_CRASH_AFTER_SEND_APPEND_REQ) {
|
||
ovs_fatal(0, "Raft test: crash after sending append_request.");
|
||
}
|
||
if (failure_test == FT_TRANSFER_LEADERSHIP_AFTER_SEND_APPEND_REQ) {
|
||
failure_test = FT_TRANSFER_LEADERSHIP;
|
||
}
|
||
raft_reset_ping_timer(raft);
|
||
|
||
return cmd;
|
||
}
|
||
|
||
static void
|
||
log_all_commands(struct raft *raft)
|
||
{
|
||
if (!VLOG_IS_DBG_ENABLED()) {
|
||
return;
|
||
}
|
||
|
||
struct raft_command *cmd;
|
||
HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
|
||
VLOG_DBG("raft command eid: "UUID_FMT, UUID_ARGS(&cmd->eid));
|
||
}
|
||
}
|
||
|
||
static struct raft_command * OVS_WARN_UNUSED_RESULT
|
||
raft_command_execute__(struct raft *raft, const struct json *data,
|
||
const struct json *servers, uint64_t election_timer,
|
||
const struct uuid *prereq, struct uuid *result)
|
||
{
|
||
if (raft->joining || raft->left || raft->failed) {
|
||
return raft_command_create_completed(RAFT_CMD_SHUTDOWN);
|
||
}
|
||
|
||
if (raft->role != RAFT_LEADER) {
|
||
/* Consider proxying the command to the leader. We can only do that if
|
||
* we know the leader and the command does not change the set of
|
||
* servers. We do not proxy commands without prerequisites, even
|
||
* though we could, because in an OVSDB context a log entry doesn't
|
||
* make sense without context. */
|
||
if (servers || election_timer || !data
|
||
|| raft->role != RAFT_FOLLOWER || uuid_is_zero(&raft->leader_sid)
|
||
|| !prereq) {
|
||
return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
|
||
}
|
||
}
|
||
|
||
struct uuid eid = data ? uuid_random() : UUID_ZERO;
|
||
if (result) {
|
||
*result = eid;
|
||
}
|
||
|
||
if (raft->role != RAFT_LEADER) {
|
||
const union raft_rpc rpc = {
|
||
.execute_command_request = {
|
||
.common = {
|
||
.type = RAFT_RPC_EXECUTE_COMMAND_REQUEST,
|
||
.sid = raft->leader_sid,
|
||
},
|
||
.data = CONST_CAST(struct json *, data),
|
||
.prereq = *prereq,
|
||
.result = eid,
|
||
}
|
||
};
|
||
if (failure_test == FT_CRASH_BEFORE_SEND_EXEC_REQ) {
|
||
ovs_fatal(0, "Raft test: crash before sending "
|
||
"execute_command_request");
|
||
}
|
||
if (!raft_send(raft, &rpc)) {
|
||
/* Couldn't send command, so it definitely failed. */
|
||
return raft_command_create_completed(RAFT_CMD_NOT_LEADER);
|
||
}
|
||
if (failure_test == FT_CRASH_AFTER_SEND_EXEC_REQ) {
|
||
ovs_fatal(0, "Raft test: crash after sending "
|
||
"execute_command_request");
|
||
}
|
||
|
||
struct raft_command *cmd = raft_command_create_incomplete(raft, 0);
|
||
cmd->timestamp = time_msec();
|
||
cmd->eid = eid;
|
||
log_all_commands(raft);
|
||
return cmd;
|
||
}
|
||
|
||
const struct uuid *current_eid = raft_current_eid(raft);
|
||
if (prereq && !uuid_equals(prereq, current_eid)) {
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
|
||
VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
|
||
"prerequisite "UUID_FMT,
|
||
UUID_ARGS(current_eid), UUID_ARGS(prereq));
|
||
return raft_command_create_completed(RAFT_CMD_BAD_PREREQ);
|
||
}
|
||
|
||
return raft_command_initiate(raft, data, servers, election_timer, &eid);
|
||
}
|
||
|
||
/* Initiates appending a log entry to 'raft'. The log entry consists of 'data'
|
||
* and, if 'prereq' is nonnull, it is only added to the log if the previous
|
||
* entry in the log has entry ID 'prereq'. If 'result' is nonnull, it is
|
||
* populated with the entry ID for the new log entry.
|
||
*
|
||
* Returns a "struct raft_command" that may be used to track progress adding
|
||
* the log entry. The caller must eventually free the returned structure, with
|
||
* raft_command_unref(). */
|
||
struct raft_command * OVS_WARN_UNUSED_RESULT
|
||
raft_command_execute(struct raft *raft, const struct json *data,
|
||
const struct uuid *prereq, struct uuid *result)
|
||
{
|
||
return raft_command_execute__(raft, data, NULL, 0, prereq, result);
|
||
}
|
||
|
||
/* Returns the status of 'cmd'. */
|
||
enum raft_command_status
|
||
raft_command_get_status(const struct raft_command *cmd)
|
||
{
|
||
ovs_assert(cmd->n_refs > 0);
|
||
return cmd->status;
|
||
}
|
||
|
||
/* Returns the index of the log entry at which 'cmd' was committed.
|
||
*
|
||
* This function works only with successful commands. */
|
||
uint64_t
|
||
raft_command_get_commit_index(const struct raft_command *cmd)
|
||
{
|
||
ovs_assert(cmd->n_refs > 0);
|
||
ovs_assert(cmd->status == RAFT_CMD_SUCCESS);
|
||
return cmd->index;
|
||
}
|
||
|
||
/* Frees 'cmd'. */
|
||
void
|
||
raft_command_unref(struct raft_command *cmd)
|
||
{
|
||
if (cmd) {
|
||
ovs_assert(cmd->n_refs > 0);
|
||
if (!--cmd->n_refs) {
|
||
free(cmd);
|
||
}
|
||
}
|
||
}
|
||
|
||
/* Causes poll_block() to wake up when 'cmd' has status to report. */
|
||
void
|
||
raft_command_wait(const struct raft_command *cmd)
|
||
{
|
||
if (cmd->status != RAFT_CMD_INCOMPLETE) {
|
||
poll_immediate_wake();
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_command_complete(struct raft *raft,
|
||
struct raft_command *cmd,
|
||
enum raft_command_status status)
|
||
{
|
||
VLOG_DBG("raft_command_complete eid "UUID_FMT" status: %s",
|
||
UUID_ARGS(&cmd->eid), raft_command_status_to_string(status));
|
||
if (!uuid_is_zero(&cmd->sid)) {
|
||
uint64_t commit_index = status == RAFT_CMD_SUCCESS ? cmd->index : 0;
|
||
raft_send_execute_command_reply(raft, &cmd->sid, &cmd->eid, status,
|
||
commit_index);
|
||
}
|
||
|
||
ovs_assert(cmd->status == RAFT_CMD_INCOMPLETE);
|
||
ovs_assert(cmd->n_refs > 0);
|
||
hmap_remove(&raft->commands, &cmd->hmap_node);
|
||
cmd->status = status;
|
||
raft_command_unref(cmd);
|
||
}
|
||
|
||
static void
|
||
raft_complete_all_commands(struct raft *raft, enum raft_command_status status)
|
||
{
|
||
struct raft_command *cmd;
|
||
HMAP_FOR_EACH_SAFE (cmd, hmap_node, &raft->commands) {
|
||
raft_command_complete(raft, cmd, status);
|
||
}
|
||
}
|
||
|
||
static struct raft_command *
|
||
raft_find_command_by_eid(struct raft *raft, const struct uuid *eid)
|
||
{
|
||
struct raft_command *cmd;
|
||
|
||
HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
|
||
if (uuid_equals(&cmd->eid, eid)) {
|
||
return cmd;
|
||
}
|
||
}
|
||
return NULL;
|
||
}
|
||
|
||
#define RAFT_RPC(ENUM, NAME) \
|
||
static void raft_handle_##NAME(struct raft *, const struct raft_##NAME *);
|
||
RAFT_RPC_TYPES
|
||
#undef RAFT_RPC
|
||
|
||
static void
|
||
raft_handle_hello_request(struct raft *raft OVS_UNUSED,
|
||
const struct raft_hello_request *hello OVS_UNUSED)
|
||
{
|
||
}
|
||
|
||
/* 'sid' is the server being added. */
|
||
static void
|
||
raft_send_add_server_reply__(struct raft *raft, const struct uuid *sid,
|
||
const char *address,
|
||
bool success, const char *comment)
|
||
{
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
|
||
if (!VLOG_DROP_INFO(&rl)) {
|
||
struct ds s = DS_EMPTY_INITIALIZER;
|
||
char buf[SID_LEN + 1];
|
||
ds_put_format(&s, "adding %s ("SID_FMT" at %s) "
|
||
"to cluster "CID_FMT" %s",
|
||
raft_get_nickname(raft, sid, buf, sizeof buf),
|
||
SID_ARGS(sid), address, CID_ARGS(&raft->cid),
|
||
success ? "succeeded" : "failed");
|
||
if (comment) {
|
||
ds_put_format(&s, " (%s)", comment);
|
||
}
|
||
VLOG_INFO("%s", ds_cstr(&s));
|
||
ds_destroy(&s);
|
||
}
|
||
|
||
union raft_rpc rpy = {
|
||
.add_server_reply = {
|
||
.common = {
|
||
.type = RAFT_RPC_ADD_SERVER_REPLY,
|
||
.sid = *sid,
|
||
.comment = CONST_CAST(char *, comment),
|
||
},
|
||
.success = success,
|
||
}
|
||
};
|
||
|
||
struct sset *remote_addresses = &rpy.add_server_reply.remote_addresses;
|
||
sset_init(remote_addresses);
|
||
if (!raft->joining) {
|
||
struct raft_server *s;
|
||
HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
|
||
if (!uuid_equals(&s->sid, &raft->sid)) {
|
||
sset_add(remote_addresses, s->address);
|
||
}
|
||
}
|
||
}
|
||
|
||
raft_send(raft, &rpy);
|
||
|
||
sset_destroy(remote_addresses);
|
||
}
|
||
|
||
static void
|
||
raft_send_remove_server_reply_rpc(struct raft *raft,
|
||
const struct uuid *dst_sid,
|
||
const struct uuid *target_sid,
|
||
bool success, const char *comment)
|
||
{
|
||
if (uuid_equals(&raft->sid, dst_sid)) {
|
||
if (success && uuid_equals(&raft->sid, target_sid)) {
|
||
raft_finished_leaving_cluster(raft);
|
||
}
|
||
return;
|
||
}
|
||
|
||
const union raft_rpc rpy = {
|
||
.remove_server_reply = {
|
||
.common = {
|
||
.type = RAFT_RPC_REMOVE_SERVER_REPLY,
|
||
.sid = *dst_sid,
|
||
.comment = CONST_CAST(char *, comment),
|
||
},
|
||
.target_sid = (uuid_equals(dst_sid, target_sid)
|
||
? UUID_ZERO
|
||
: *target_sid),
|
||
.success = success,
|
||
}
|
||
};
|
||
raft_send(raft, &rpy);
|
||
}
|
||
|
||
static void
|
||
raft_send_remove_server_reply__(struct raft *raft,
|
||
const struct uuid *target_sid,
|
||
const struct uuid *requester_sid,
|
||
struct unixctl_conn *requester_conn,
|
||
bool success, const char *comment)
|
||
{
|
||
struct ds s = DS_EMPTY_INITIALIZER;
|
||
ds_put_format(&s, "request ");
|
||
if (!uuid_is_zero(requester_sid)) {
|
||
char buf[SID_LEN + 1];
|
||
ds_put_format(&s, "by %s",
|
||
raft_get_nickname(raft, requester_sid, buf, sizeof buf));
|
||
} else {
|
||
ds_put_cstr(&s, "via unixctl");
|
||
}
|
||
ds_put_cstr(&s, " to remove ");
|
||
if (!requester_conn && uuid_equals(target_sid, requester_sid)) {
|
||
ds_put_cstr(&s, "itself");
|
||
} else {
|
||
char buf[SID_LEN + 1];
|
||
ds_put_cstr(&s, raft_get_nickname(raft, target_sid, buf, sizeof buf));
|
||
if (uuid_equals(target_sid, &raft->sid)) {
|
||
ds_put_cstr(&s, " (ourselves)");
|
||
}
|
||
}
|
||
ds_put_format(&s, " from cluster "CID_FMT" %s",
|
||
CID_ARGS(&raft->cid),
|
||
success ? "succeeded" : "failed");
|
||
if (comment) {
|
||
ds_put_format(&s, " (%s)", comment);
|
||
}
|
||
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
|
||
VLOG_INFO_RL(&rl, "%s", ds_cstr(&s));
|
||
|
||
/* Send RemoveServerReply to the requester (which could be a server or a
|
||
* unixctl connection. Also always send it to the removed server; this
|
||
* allows it to be sure that it's really removed and update its log and
|
||
* disconnect permanently. */
|
||
if (!uuid_is_zero(requester_sid)) {
|
||
raft_send_remove_server_reply_rpc(raft, requester_sid, target_sid,
|
||
success, comment);
|
||
}
|
||
if (!uuid_equals(requester_sid, target_sid)) {
|
||
raft_send_remove_server_reply_rpc(raft, target_sid, target_sid,
|
||
success, comment);
|
||
}
|
||
if (requester_conn) {
|
||
if (success) {
|
||
unixctl_command_reply(requester_conn, ds_cstr(&s));
|
||
} else {
|
||
unixctl_command_reply_error(requester_conn, ds_cstr(&s));
|
||
}
|
||
}
|
||
|
||
ds_destroy(&s);
|
||
}
|
||
|
||
static void
|
||
raft_send_add_server_reply(struct raft *raft,
|
||
const struct raft_add_server_request *rq,
|
||
bool success, const char *comment)
|
||
{
|
||
return raft_send_add_server_reply__(raft, &rq->common.sid, rq->address,
|
||
success, comment);
|
||
}
|
||
|
||
static void
|
||
raft_send_remove_server_reply(struct raft *raft,
|
||
const struct raft_remove_server_request *rq,
|
||
bool success, const char *comment)
|
||
{
|
||
return raft_send_remove_server_reply__(raft, &rq->sid, &rq->common.sid,
|
||
rq->requester_conn, success,
|
||
comment);
|
||
}
|
||
|
||
static void
|
||
raft_become_follower(struct raft *raft)
|
||
{
|
||
raft->leader_sid = UUID_ZERO;
|
||
if (raft->role == RAFT_FOLLOWER) {
|
||
return;
|
||
}
|
||
|
||
raft_role_transition(raft, RAFT_FOLLOWER);
|
||
raft_reset_election_timer(raft);
|
||
|
||
/* Notify clients about lost leadership.
|
||
*
|
||
* We do not reverse our changes to 'raft->servers' because the new
|
||
* configuration is already part of the log. Possibly the configuration
|
||
* log entry will not be committed, but until we know that we must use the
|
||
* new configuration. Our AppendEntries processing will properly update
|
||
* the server configuration later, if necessary.
|
||
*
|
||
* However, since we're sending replies about a failure to add, those new
|
||
* servers has to be cleaned up. Otherwise, they will stuck in a 'CATCHUP'
|
||
* phase in case this server regains leadership before they join through
|
||
* the current new leader. They are not yet in 'raft->servers', so not
|
||
* part of the shared configuration.
|
||
*
|
||
* Also we do not complete commands here, as they can still be completed
|
||
* if their log entries have already been replicated to other servers.
|
||
* If the entries were actually committed according to the new leader, our
|
||
* AppendEntries processing will complete the corresponding commands.
|
||
*/
|
||
struct raft_server *s;
|
||
HMAP_FOR_EACH_POP (s, hmap_node, &raft->add_servers) {
|
||
raft_send_add_server_reply__(raft, &s->sid, s->address, false,
|
||
RAFT_SERVER_LOST_LEADERSHIP);
|
||
raft_server_destroy(s);
|
||
}
|
||
if (raft->remove_server) {
|
||
raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
|
||
&raft->remove_server->requester_sid,
|
||
raft->remove_server->requester_conn,
|
||
false, RAFT_SERVER_LOST_LEADERSHIP);
|
||
raft_server_destroy(raft->remove_server);
|
||
raft->remove_server = NULL;
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_send_append_request(struct raft *raft,
|
||
struct raft_server *peer, unsigned int n,
|
||
const char *comment)
|
||
{
|
||
ovs_assert(raft->role == RAFT_LEADER);
|
||
|
||
const union raft_rpc rq = {
|
||
.append_request = {
|
||
.common = {
|
||
.type = RAFT_RPC_APPEND_REQUEST,
|
||
.sid = peer->sid,
|
||
.comment = CONST_CAST(char *, comment),
|
||
},
|
||
.term = raft->term,
|
||
.prev_log_index = peer->next_index - 1,
|
||
.prev_log_term = (peer->next_index - 1 >= raft->log_start
|
||
? raft->entries[peer->next_index - 1
|
||
- raft->log_start].term
|
||
: raft->snap.term),
|
||
.leader_commit = raft->commit_index,
|
||
.entries = &raft->entries[peer->next_index - raft->log_start],
|
||
.n_entries = n,
|
||
},
|
||
};
|
||
raft_send(raft, &rq);
|
||
}
|
||
|
||
static void
|
||
raft_send_heartbeats(struct raft *raft)
|
||
{
|
||
struct raft_server *s;
|
||
HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
|
||
if (!uuid_equals(&raft->sid, &s->sid)) {
|
||
raft_send_append_request(raft, s, 0, "heartbeat");
|
||
}
|
||
}
|
||
|
||
/* Send anyone waiting for a command to complete a ping to let them
|
||
* know we're still working on it. */
|
||
struct raft_command *cmd;
|
||
HMAP_FOR_EACH (cmd, hmap_node, &raft->commands) {
|
||
if (!uuid_is_zero(&cmd->sid)) {
|
||
raft_send_execute_command_reply(raft, &cmd->sid,
|
||
&cmd->eid,
|
||
RAFT_CMD_INCOMPLETE, 0);
|
||
}
|
||
}
|
||
|
||
raft_reset_ping_timer(raft);
|
||
}
|
||
|
||
static void
|
||
raft_join_complete(struct raft *raft)
|
||
{
|
||
raft->joining = false;
|
||
raft_update_probe_intervals(raft);
|
||
}
|
||
|
||
/* Initializes the fields in 's' that represent the leader's view of the
|
||
* server. */
|
||
static void
|
||
raft_server_init_leader(struct raft *raft, struct raft_server *s)
|
||
{
|
||
s->next_index = raft->log_end;
|
||
s->match_index = 0;
|
||
s->phase = RAFT_PHASE_STABLE;
|
||
s->replied = false;
|
||
s->install_snapshot_request_in_progress = false;
|
||
}
|
||
|
||
static void
|
||
raft_set_leader(struct raft *raft, const struct uuid *sid)
|
||
{
|
||
raft->leader_sid = *sid;
|
||
raft->ever_had_leader = raft->had_leader = true;
|
||
raft->candidate_retrying = false;
|
||
}
|
||
|
||
static const char *
|
||
raft_role_to_string(enum raft_role role)
|
||
{
|
||
switch (role) {
|
||
case RAFT_FOLLOWER: return "follower";
|
||
case RAFT_CANDIDATE: return "candidate";
|
||
case RAFT_LEADER: return "leader";
|
||
default: return "<error>";
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_role_transition_at(struct raft *raft, enum raft_role role,
|
||
const char *func, const char *source)
|
||
{
|
||
VLOG_DBG("%s(%s): role transition: %s --> %s", func, source,
|
||
raft_role_to_string(raft->role), raft_role_to_string(role));
|
||
raft->role = role;
|
||
}
|
||
|
||
static void
|
||
raft_become_leader(struct raft *raft)
|
||
{
|
||
log_all_commands(raft);
|
||
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
|
||
VLOG_INFO_RL(&rl, "term %"PRIu64": elected leader by %d+ of "
|
||
"%"PRIuSIZE" servers", raft->term,
|
||
raft->n_votes, hmap_count(&raft->servers));
|
||
|
||
ovs_assert(raft->role != RAFT_LEADER);
|
||
raft_role_transition(raft, RAFT_LEADER);
|
||
raft->election_won = time_msec();
|
||
raft_set_leader(raft, &raft->sid);
|
||
raft_reset_election_timer(raft);
|
||
raft_reset_ping_timer(raft);
|
||
|
||
if (raft->joining) {
|
||
/* It is possible that the server committing this one to the list of
|
||
* servers lost leadership before the entry is committed but after
|
||
* it was already replicated to majority of servers. In this case
|
||
* other servers will recognize this one as a valid cluster member
|
||
* and may transfer leadership to it and vote for it. This way
|
||
* we're becoming a cluster leader without receiving reply for a
|
||
* join request and will commit addition of this server ourselves. */
|
||
VLOG_INFO_RL(&rl, "elected as leader while joining");
|
||
raft_join_complete(raft);
|
||
}
|
||
|
||
struct raft_server *s;
|
||
HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
|
||
raft_server_init_leader(raft, s);
|
||
}
|
||
|
||
raft->election_timer_new = 0;
|
||
|
||
raft_update_our_match_index(raft, raft->log_end - 1);
|
||
|
||
/* Write the fact that we are leader to the log. This is not used by the
|
||
* algorithm (although it could be, for quick restart), but it is used for
|
||
* offline analysis to check for conformance with the properties that Raft
|
||
* guarantees. */
|
||
struct raft_record r = {
|
||
.type = RAFT_REC_LEADER,
|
||
.term = raft->term,
|
||
.sid = raft->sid,
|
||
};
|
||
ovsdb_error_destroy(
|
||
ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
|
||
|
||
/* Initiate a no-op commit. Otherwise we might never find out what's in
|
||
* the log. See section 6.4 item 1:
|
||
*
|
||
* The Leader Completeness Property guarantees that a leader has all
|
||
* committed entries, but at the start of its term, it may not know
|
||
* which those are. To find out, it needs to commit an entry from its
|
||
* term. Raft handles this by having each leader commit a blank no-op
|
||
* entry into the log at the start of its term. As soon as this no-op
|
||
* entry is committed, the leader's commit index will be at least as
|
||
* large as any other servers' during its term.
|
||
*/
|
||
raft_command_unref(raft_command_execute__(raft, NULL, NULL, 0, NULL,
|
||
NULL));
|
||
}
|
||
|
||
/* Processes term 'term' received as part of RPC 'common'. Returns true if the
|
||
* caller should continue processing the RPC, false if the caller should reject
|
||
* it due to a stale term. */
|
||
static bool
|
||
raft_receive_term__(struct raft *raft, const struct raft_rpc_common *common,
|
||
uint64_t term)
|
||
{
|
||
/* Section 3.3 says:
|
||
*
|
||
* Current terms are exchanged whenever servers communicate; if one
|
||
* server's current term is smaller than the other's, then it updates
|
||
* its current term to the larger value. If a candidate or leader
|
||
* discovers that its term is out of date, it immediately reverts to
|
||
* follower state. If a server receives a request with a stale term
|
||
* number, it rejects the request.
|
||
*/
|
||
if (term > raft->term) {
|
||
if (!raft_set_term(raft, term, NULL)) {
|
||
/* Failed to update the term to 'term'. */
|
||
return false;
|
||
}
|
||
raft_become_follower(raft);
|
||
} else if (term < raft->term) {
|
||
char buf[SID_LEN + 1];
|
||
VLOG_INFO("rejecting term %"PRIu64" < current term %"PRIu64" received "
|
||
"in %s message from server %s",
|
||
term, raft->term,
|
||
raft_rpc_type_to_string(common->type),
|
||
raft_get_nickname(raft, &common->sid, buf, sizeof buf));
|
||
return false;
|
||
}
|
||
return true;
|
||
}
|
||
|
||
static void
|
||
raft_get_servers_from_log(struct raft *raft, enum vlog_level level)
|
||
{
|
||
const struct json *servers_json = raft->snap.servers;
|
||
for (uint64_t index = raft->log_end - 1; index >= raft->log_start;
|
||
index--) {
|
||
struct raft_entry *e = &raft->entries[index - raft->log_start];
|
||
if (e->servers) {
|
||
servers_json = e->servers;
|
||
break;
|
||
}
|
||
}
|
||
|
||
struct hmap servers;
|
||
struct ovsdb_error *error = raft_servers_from_json(servers_json, &servers);
|
||
ovs_assert(!error);
|
||
raft_set_servers(raft, &servers, level);
|
||
raft_servers_destroy(&servers);
|
||
}
|
||
|
||
/* Truncates the log, so that raft->log_end becomes 'new_end'.
|
||
*
|
||
* Doesn't write anything to disk. In theory, we could truncate the on-disk
|
||
* log file, but we don't have the right information to know how long it should
|
||
* be. What we actually do is to append entries for older indexes to the
|
||
* on-disk log; when we re-read it later, these entries truncate the log.
|
||
*
|
||
* Returns true if any of the removed log entries were server configuration
|
||
* entries, false otherwise. */
|
||
static bool
|
||
raft_truncate(struct raft *raft, uint64_t new_end)
|
||
{
|
||
ovs_assert(new_end >= raft->log_start);
|
||
if (raft->log_end > new_end) {
|
||
char buf[SID_LEN + 1];
|
||
VLOG_INFO("%s truncating %"PRIu64 " entries from end of log",
|
||
raft_get_nickname(raft, &raft->sid, buf, sizeof buf),
|
||
raft->log_end - new_end);
|
||
}
|
||
|
||
bool servers_changed = false;
|
||
while (raft->log_end > new_end) {
|
||
struct raft_entry *entry = &raft->entries[--raft->log_end
|
||
- raft->log_start];
|
||
if (entry->servers) {
|
||
servers_changed = true;
|
||
}
|
||
raft_entry_uninit(entry);
|
||
}
|
||
return servers_changed;
|
||
}
|
||
|
||
static const struct raft_entry *
|
||
raft_peek_next_entry(struct raft *raft)
|
||
{
|
||
/* Invariant: log_start - 2 <= last_applied <= commit_index < log_end. */
|
||
ovs_assert(raft->log_start <= raft->last_applied + 2);
|
||
ovs_assert(raft->last_applied <= raft->commit_index);
|
||
ovs_assert(raft->commit_index < raft->log_end);
|
||
|
||
if (raft->joining || raft->failed) {
|
||
return NULL;
|
||
}
|
||
|
||
if (raft->log_start == raft->last_applied + 2) {
|
||
return &raft->snap;
|
||
}
|
||
|
||
while (raft->last_applied < raft->commit_index) {
|
||
const struct raft_entry *e = raft_get_entry(raft,
|
||
raft->last_applied + 1);
|
||
if (raft_entry_has_data(e)) {
|
||
return e;
|
||
}
|
||
raft->last_applied++;
|
||
}
|
||
return NULL;
|
||
}
|
||
|
||
/* Updates commit index in raft log. If commit index is already up-to-date
|
||
* it does nothing and return false, otherwise, returns true. */
|
||
static bool
|
||
raft_update_commit_index(struct raft *raft, uint64_t new_commit_index)
|
||
{
|
||
if (new_commit_index <= raft->commit_index) {
|
||
return false;
|
||
}
|
||
|
||
while (raft->commit_index < new_commit_index) {
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
|
||
uint64_t index = ++raft->commit_index;
|
||
const struct raft_entry *e = raft_get_entry(raft, index);
|
||
|
||
if (raft_entry_has_data(e)) {
|
||
struct raft_command *cmd = raft_find_command_by_eid(raft, &e->eid);
|
||
|
||
if (cmd) {
|
||
if (!cmd->index && raft->role == RAFT_LEADER) {
|
||
VLOG_INFO_RL(&rl,
|
||
"command completed after role change from "
|
||
"follower to leader (eid: "UUID_FMT", "
|
||
"commit index: %"PRIu64")", UUID_ARGS(&e->eid), index);
|
||
} else if (!cmd->index && raft->role != RAFT_LEADER) {
|
||
/* This can happen when leader fail-over before sending
|
||
* execute_command_reply. */
|
||
VLOG_INFO_RL(&rl,
|
||
"command completed without reply (eid: "UUID_FMT", "
|
||
"commit index: %"PRIu64")", UUID_ARGS(&e->eid), index);
|
||
} else if (cmd->index && raft->role != RAFT_LEADER) {
|
||
/* This can happen if current server lost leadership after
|
||
* sending append requests to the majority of servers, but
|
||
* before receiving majority of append replies. */
|
||
VLOG_INFO_RL(&rl,
|
||
"command completed after role change from "
|
||
"leader to follower (eid: "UUID_FMT", "
|
||
"commit index: %"PRIu64")", UUID_ARGS(&e->eid), index);
|
||
/* Clearing 'sid' to avoid sending cmd execution reply. */
|
||
cmd->sid = UUID_ZERO;
|
||
} else {
|
||
/* (cmd->index && raft->role == RAFT_LEADER)
|
||
* Normal command completion on a leader. */
|
||
}
|
||
cmd->index = index;
|
||
raft_command_complete(raft, cmd, RAFT_CMD_SUCCESS);
|
||
}
|
||
}
|
||
if (e->election_timer) {
|
||
VLOG_INFO("Election timer changed from %"PRIu64" to %"PRIu64,
|
||
raft->election_timer, e->election_timer);
|
||
raft->election_timer = e->election_timer;
|
||
raft->election_timer_new = 0;
|
||
raft_update_probe_intervals(raft);
|
||
}
|
||
if (e->servers && raft->role == RAFT_LEADER) {
|
||
/* raft_run_reconfigure() can write a new Raft entry, which can
|
||
* reallocate raft->entries, which would invalidate 'e', so
|
||
* this case must be last, after the one for 'e->data'. */
|
||
raft_run_reconfigure(raft);
|
||
} else if (e->servers && !raft_has_uncommitted_configuration(raft)) {
|
||
struct ovsdb_error *error;
|
||
struct raft_server *s;
|
||
struct hmap servers;
|
||
|
||
error = raft_servers_from_json(e->servers, &servers);
|
||
ovs_assert(!error);
|
||
HMAP_FOR_EACH (s, hmap_node, &servers) {
|
||
struct raft_server *server = raft_find_server(raft, &s->sid);
|
||
|
||
if (server && server->phase == RAFT_PHASE_COMMITTING) {
|
||
/* This server lost leadership while committing
|
||
* server 's', but it was committed later by a
|
||
* new leader. */
|
||
server->phase = RAFT_PHASE_STABLE;
|
||
}
|
||
|
||
if (raft->joining && uuid_equals(&s->sid, &raft->sid)) {
|
||
/* Leadership change happened before previous leader
|
||
* could commit the change of a servers list, but it
|
||
* was replicated and a new leader committed it. */
|
||
VLOG_INFO_RL(&rl,
|
||
"added to configuration without reply "
|
||
"(eid: "UUID_FMT", commit index: %"PRIu64")",
|
||
UUID_ARGS(&e->eid), index);
|
||
raft_join_complete(raft);
|
||
}
|
||
}
|
||
raft_servers_destroy(&servers);
|
||
}
|
||
}
|
||
|
||
/* Write the commit index to the log. The next time we restart, this
|
||
* allows us to start exporting a reasonably fresh log, instead of a log
|
||
* that only contains the snapshot. */
|
||
struct raft_record r = {
|
||
.type = RAFT_REC_COMMIT_INDEX,
|
||
.commit_index = raft->commit_index,
|
||
};
|
||
ovsdb_error_destroy(
|
||
ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
|
||
return true;
|
||
}
|
||
|
||
/* This doesn't use rq->entries (but it does use rq->n_entries). */
|
||
static void
|
||
raft_send_append_reply(struct raft *raft, const struct raft_append_request *rq,
|
||
enum raft_append_result result, const char *comment)
|
||
{
|
||
/* Figure 3.1: "If leaderCommit > commitIndex, set commitIndex =
|
||
* min(leaderCommit, index of last new entry)" */
|
||
if (result == RAFT_APPEND_OK && rq->leader_commit > raft->commit_index) {
|
||
raft_update_commit_index(
|
||
raft, MIN(rq->leader_commit, rq->prev_log_index + rq->n_entries));
|
||
}
|
||
|
||
/* Send reply. */
|
||
union raft_rpc reply = {
|
||
.append_reply = {
|
||
.common = {
|
||
.type = RAFT_RPC_APPEND_REPLY,
|
||
.sid = rq->common.sid,
|
||
.comment = CONST_CAST(char *, comment),
|
||
},
|
||
.term = raft->term,
|
||
.log_end = raft->log_end,
|
||
.prev_log_index = rq->prev_log_index,
|
||
.prev_log_term = rq->prev_log_term,
|
||
.n_entries = rq->n_entries,
|
||
.result = result,
|
||
}
|
||
};
|
||
raft_send(raft, &reply);
|
||
}
|
||
|
||
/* If 'prev_log_index' exists in 'raft''s log, in term 'prev_log_term', returns
|
||
* NULL. Otherwise, returns an explanation for the mismatch. */
|
||
static const char *
|
||
match_index_and_term(const struct raft *raft,
|
||
uint64_t prev_log_index, uint64_t prev_log_term)
|
||
{
|
||
if (prev_log_index < raft->log_start - 1) {
|
||
return "mismatch before start of log";
|
||
} else if (prev_log_index == raft->log_start - 1) {
|
||
if (prev_log_term != raft->snap.term) {
|
||
return "prev_term mismatch";
|
||
}
|
||
} else if (prev_log_index < raft->log_end) {
|
||
if (raft->entries[prev_log_index - raft->log_start].term
|
||
!= prev_log_term) {
|
||
return "term mismatch";
|
||
}
|
||
} else {
|
||
/* prev_log_index >= raft->log_end */
|
||
return "mismatch past end of log";
|
||
}
|
||
return NULL;
|
||
}
|
||
|
||
static void
|
||
raft_handle_append_entries(struct raft *raft,
|
||
const struct raft_append_request *rq,
|
||
uint64_t prev_log_index, uint64_t prev_log_term,
|
||
const struct raft_entry *entries,
|
||
unsigned int n_entries)
|
||
{
|
||
/* Section 3.5: "When sending an AppendEntries RPC, the leader includes
|
||
* the index and term of the entry in its log that immediately precedes
|
||
* the new entries. If the follower does not find an entry in its log
|
||
* with the same index and term, then it refuses the new entries." */
|
||
const char *mismatch = match_index_and_term(raft, prev_log_index,
|
||
prev_log_term);
|
||
if (mismatch) {
|
||
VLOG_INFO("rejecting append_request because previous entry "
|
||
"%"PRIu64",%"PRIu64" not in local log (%s)",
|
||
prev_log_term, prev_log_index, mismatch);
|
||
raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY, mismatch);
|
||
return;
|
||
}
|
||
|
||
/* Figure 3.1: "If an existing entry conflicts with a new one (same
|
||
* index but different terms), delete the existing entry and all that
|
||
* follow it." */
|
||
unsigned int i;
|
||
bool servers_changed = false;
|
||
for (i = 0; ; i++) {
|
||
if (i >= n_entries) {
|
||
/* No change. */
|
||
if (rq->common.comment
|
||
&& !strcmp(rq->common.comment, "heartbeat")) {
|
||
raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "heartbeat");
|
||
} else {
|
||
raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
|
||
}
|
||
return;
|
||
}
|
||
|
||
uint64_t log_index = (prev_log_index + 1) + i;
|
||
if (log_index >= raft->log_end) {
|
||
break;
|
||
}
|
||
if (raft->entries[log_index - raft->log_start].term
|
||
!= entries[i].term) {
|
||
if (raft_truncate(raft, log_index)) {
|
||
servers_changed = true;
|
||
}
|
||
break;
|
||
}
|
||
}
|
||
|
||
if (failure_test == FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE) {
|
||
ovs_fatal(0, "Raft test: crash after receiving append_request with "
|
||
"update.");
|
||
}
|
||
/* Figure 3.1: "Append any entries not already in the log." */
|
||
struct ovsdb_error *error = NULL;
|
||
bool any_written = false;
|
||
for (; i < n_entries; i++) {
|
||
const struct raft_entry *e = &entries[i];
|
||
error = raft_write_entry(raft, e->term,
|
||
json_nullable_clone(
|
||
raft_entry_get_parsed_data(e)),
|
||
&e->eid,
|
||
json_nullable_clone(e->servers),
|
||
e->election_timer);
|
||
if (error) {
|
||
break;
|
||
}
|
||
any_written = true;
|
||
if (e->servers) {
|
||
servers_changed = true;
|
||
}
|
||
}
|
||
|
||
if (any_written) {
|
||
raft_waiter_create(raft, RAFT_W_ENTRY, true)->entry.index
|
||
= raft->log_end - 1;
|
||
}
|
||
if (servers_changed) {
|
||
/* The set of servers might have changed; check. */
|
||
raft_get_servers_from_log(raft, VLL_INFO);
|
||
}
|
||
|
||
if (error) {
|
||
char *s = ovsdb_error_to_string_free(error);
|
||
VLOG_ERR("%s", s);
|
||
free(s);
|
||
raft_send_append_reply(raft, rq, RAFT_APPEND_IO_ERROR, "I/O error");
|
||
return;
|
||
}
|
||
|
||
raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "log updated");
|
||
}
|
||
|
||
static bool
|
||
raft_update_leader(struct raft *raft, const struct uuid *sid)
|
||
{
|
||
if (raft->role == RAFT_LEADER) {
|
||
char buf[SID_LEN + 1];
|
||
VLOG_ERR("this server is leader but server %s claims to be",
|
||
raft_get_nickname(raft, sid, buf, sizeof buf));
|
||
return false;
|
||
} else if (!uuid_equals(sid, &raft->leader_sid)) {
|
||
if (!uuid_is_zero(&raft->leader_sid)) {
|
||
char buf1[SID_LEN + 1];
|
||
char buf2[SID_LEN + 1];
|
||
VLOG_ERR("leader for term %"PRIu64" changed from %s to %s",
|
||
raft->term,
|
||
raft_get_nickname(raft, &raft->leader_sid,
|
||
buf1, sizeof buf1),
|
||
raft_get_nickname(raft, sid, buf2, sizeof buf2));
|
||
} else {
|
||
char buf[SID_LEN + 1];
|
||
VLOG_INFO("server %s is leader for term %"PRIu64,
|
||
raft_get_nickname(raft, sid, buf, sizeof buf),
|
||
raft->term);
|
||
}
|
||
raft_set_leader(raft, sid);
|
||
|
||
/* Record the leader to the log. This is not used by the algorithm
|
||
* (although it could be, for quick restart), but it is used for
|
||
* offline analysis to check for conformance with the properties
|
||
* that Raft guarantees. */
|
||
struct raft_record r = {
|
||
.type = RAFT_REC_LEADER,
|
||
.term = raft->term,
|
||
.sid = *sid,
|
||
};
|
||
ovsdb_error_destroy
|
||
(ovsdb_log_write_and_free(raft->log, raft_record_to_json(&r)));
|
||
}
|
||
if (raft->role == RAFT_CANDIDATE) {
|
||
/* Section 3.4: While waiting for votes, a candidate may
|
||
* receive an AppendEntries RPC from another server claiming to
|
||
* be leader. If the leader's term (included in its RPC) is at
|
||
* least as large as the candidate's current term, then the
|
||
* candidate recognizes the leader as legitimate and returns to
|
||
* follower state. */
|
||
raft_role_transition(raft, RAFT_FOLLOWER);
|
||
}
|
||
return true;
|
||
}
|
||
|
||
static void
|
||
raft_handle_append_request(struct raft *raft,
|
||
const struct raft_append_request *rq)
|
||
{
|
||
/* We do not check whether the server that sent the request is part of the
|
||
* cluster. As section 4.1 says, "A server accepts AppendEntries requests
|
||
* from a leader that is not part of the server's latest configuration.
|
||
* Otherwise, a new server could never be added to the cluster (it would
|
||
* never accept any log entries preceding the configuration entry that adds
|
||
* the server)." */
|
||
if (!raft_update_leader(raft, &rq->common.sid)) {
|
||
raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
|
||
"usurped leadership");
|
||
return;
|
||
}
|
||
raft_reset_election_timer(raft);
|
||
|
||
/* First check for the common case, where the AppendEntries request is
|
||
* entirely for indexes covered by 'log_start' ... 'log_end - 1', something
|
||
* like this:
|
||
*
|
||
* rq->prev_log_index
|
||
* | first_entry_index
|
||
* | | nth_entry_index
|
||
* | | |
|
||
* v v v
|
||
* +---+---+---+---+
|
||
* T | T | T | T | T |
|
||
* +---+-------+---+
|
||
* +---+---+---+---+
|
||
* T | T | T | T | T |
|
||
* +---+---+---+---+
|
||
* ^ ^
|
||
* | |
|
||
* log_start log_end
|
||
* */
|
||
uint64_t first_entry_index = rq->prev_log_index + 1;
|
||
uint64_t nth_entry_index = rq->prev_log_index + rq->n_entries;
|
||
if (OVS_LIKELY(first_entry_index >= raft->log_start)) {
|
||
raft_handle_append_entries(raft, rq,
|
||
rq->prev_log_index, rq->prev_log_term,
|
||
rq->entries, rq->n_entries);
|
||
return;
|
||
}
|
||
|
||
/* Now a series of checks for odd cases, where the AppendEntries request
|
||
* extends earlier than the beginning of our log, into the log entries
|
||
* discarded by the most recent snapshot. */
|
||
|
||
/*
|
||
* Handle the case where the indexes covered by rq->entries[] are entirely
|
||
* disjoint with 'log_start - 1' ... 'log_end - 1', as shown below. So,
|
||
* everything in the AppendEntries request must already have been
|
||
* committed, and we might as well return true.
|
||
*
|
||
* rq->prev_log_index
|
||
* | first_entry_index
|
||
* | | nth_entry_index
|
||
* | | |
|
||
* v v v
|
||
* +---+---+---+---+
|
||
* T | T | T | T | T |
|
||
* +---+-------+---+
|
||
* +---+---+---+---+
|
||
* T | T | T | T | T |
|
||
* +---+---+---+---+
|
||
* ^ ^
|
||
* | |
|
||
* log_start log_end
|
||
*/
|
||
if (nth_entry_index < raft->log_start - 1) {
|
||
raft_send_append_reply(raft, rq, RAFT_APPEND_OK,
|
||
"append before log start");
|
||
return;
|
||
}
|
||
|
||
/*
|
||
* Handle the case where the last entry in rq->entries[] has the same index
|
||
* as 'log_start - 1', so we can compare their terms:
|
||
*
|
||
* rq->prev_log_index
|
||
* | first_entry_index
|
||
* | | nth_entry_index
|
||
* | | |
|
||
* v v v
|
||
* +---+---+---+---+
|
||
* T | T | T | T | T |
|
||
* +---+-------+---+
|
||
* +---+---+---+---+
|
||
* T | T | T | T | T |
|
||
* +---+---+---+---+
|
||
* ^ ^
|
||
* | |
|
||
* log_start log_end
|
||
*
|
||
* There's actually a sub-case where rq->n_entries == 0, in which we
|
||
* compare rq->prev_term:
|
||
*
|
||
* rq->prev_log_index
|
||
* |
|
||
* |
|
||
* |
|
||
* v
|
||
* T
|
||
*
|
||
* +---+---+---+---+
|
||
* T | T | T | T | T |
|
||
* +---+---+---+---+
|
||
* ^ ^
|
||
* | |
|
||
* log_start log_end
|
||
*/
|
||
if (nth_entry_index == raft->log_start - 1) {
|
||
if (rq->n_entries
|
||
? raft->snap.term == rq->entries[rq->n_entries - 1].term
|
||
: raft->snap.term == rq->prev_log_term) {
|
||
raft_send_append_reply(raft, rq, RAFT_APPEND_OK, "no change");
|
||
} else {
|
||
raft_send_append_reply(raft, rq, RAFT_APPEND_INCONSISTENCY,
|
||
"term mismatch");
|
||
}
|
||
return;
|
||
}
|
||
|
||
/*
|
||
* We now know that the data in rq->entries[] overlaps the data in
|
||
* raft->entries[], as shown below, with some positive 'ofs':
|
||
*
|
||
* rq->prev_log_index
|
||
* | first_entry_index
|
||
* | | nth_entry_index
|
||
* | | |
|
||
* v v v
|
||
* +---+---+---+---+---+
|
||
* T | T | T | T | T | T |
|
||
* +---+-------+---+---+
|
||
* +---+---+---+---+
|
||
* T | T | T | T | T |
|
||
* +---+---+---+---+
|
||
* ^ ^
|
||
* | |
|
||
* log_start log_end
|
||
*
|
||
* |<-- ofs -->|
|
||
*
|
||
* We transform this into the following by trimming the first 'ofs'
|
||
* elements off of rq->entries[], ending up with the following. Notice how
|
||
* we retain the term but not the data for rq->entries[ofs - 1]:
|
||
*
|
||
* first_entry_index + ofs - 1
|
||
* | first_entry_index + ofs
|
||
* | | nth_entry_index + ofs
|
||
* | | |
|
||
* v v v
|
||
* +---+---+
|
||
* T | T | T |
|
||
* +---+---+
|
||
* +---+---+---+---+
|
||
* T | T | T | T | T |
|
||
* +---+---+---+---+
|
||
* ^ ^
|
||
* | |
|
||
* log_start log_end
|
||
*/
|
||
uint64_t ofs = raft->log_start - first_entry_index;
|
||
raft_handle_append_entries(raft, rq,
|
||
raft->log_start - 1, rq->entries[ofs - 1].term,
|
||
&rq->entries[ofs], rq->n_entries - ofs);
|
||
}
|
||
|
||
/* Returns true if 'raft' has another log entry or snapshot to read. */
|
||
bool
|
||
raft_has_next_entry(const struct raft *raft_)
|
||
{
|
||
struct raft *raft = CONST_CAST(struct raft *, raft_);
|
||
return raft_peek_next_entry(raft) != NULL;
|
||
}
|
||
|
||
/* Returns the next log entry or snapshot from 'raft', or NULL if there are
|
||
* none left to read. Stores the entry ID of the log entry in '*eid'.
|
||
*
|
||
* The caller takes ownership of the result. */
|
||
struct json * OVS_WARN_UNUSED_RESULT
|
||
raft_next_entry(struct raft *raft, struct uuid *eid)
|
||
{
|
||
const struct raft_entry *e = raft_peek_next_entry(raft);
|
||
|
||
if (!e) {
|
||
return NULL;
|
||
}
|
||
|
||
raft->last_applied++;
|
||
*eid = e->eid;
|
||
|
||
/* DB will only read each entry once, so we don't need to store the fully
|
||
* parsed json object any longer. The serialized version is sufficient
|
||
* for sending to other cluster members or writing to the log. */
|
||
return raft_entry_steal_parsed_data(CONST_CAST(struct raft_entry *, e));
|
||
}
|
||
|
||
/* Returns the log index of the last-read snapshot or log entry. */
|
||
uint64_t
|
||
raft_get_applied_index(const struct raft *raft)
|
||
{
|
||
return raft->last_applied;
|
||
}
|
||
|
||
/* Returns the log index of the last snapshot or log entry that is available to
|
||
* be read. */
|
||
uint64_t
|
||
raft_get_commit_index(const struct raft *raft)
|
||
{
|
||
return raft->commit_index;
|
||
}
|
||
|
||
static struct raft_server *
|
||
raft_find_peer(struct raft *raft, const struct uuid *uuid)
|
||
{
|
||
struct raft_server *s = raft_find_server(raft, uuid);
|
||
return s && !uuid_equals(&raft->sid, &s->sid) ? s : NULL;
|
||
}
|
||
|
||
/* Figure 3.1: "If there exists an N such that N > commitIndex, a
|
||
* majority of matchIndex[i] >= N, and log[N].term == currentTerm, set
|
||
* commitIndex = N (sections 3.5 and 3.6)." */
|
||
static void
|
||
raft_consider_updating_commit_index(struct raft *raft)
|
||
{
|
||
/* This loop cannot just bail out when it comes across a log entry that
|
||
* does not match the criteria. For example, Figure 3.7(d2) shows a
|
||
* case where the log entry for term 2 cannot be committed directly
|
||
* (because it is not for the current term) but it can be committed as
|
||
* a side effect of commit the entry for term 4 (the current term).
|
||
* XXX Is there a more efficient way to do this? */
|
||
ovs_assert(raft->role == RAFT_LEADER);
|
||
|
||
uint64_t new_commit_index = raft->commit_index;
|
||
for (uint64_t idx = MAX(raft->commit_index + 1, raft->log_start);
|
||
idx < raft->log_end; idx++) {
|
||
if (raft->entries[idx - raft->log_start].term == raft->term) {
|
||
size_t count = 0;
|
||
struct raft_server *s2;
|
||
HMAP_FOR_EACH (s2, hmap_node, &raft->servers) {
|
||
if (s2->match_index >= idx) {
|
||
count++;
|
||
}
|
||
}
|
||
if (count > hmap_count(&raft->servers) / 2) {
|
||
VLOG_DBG("index %"PRIu64" committed to %"PRIuSIZE" servers, "
|
||
"applying", idx, count);
|
||
new_commit_index = idx;
|
||
}
|
||
}
|
||
}
|
||
if (raft_update_commit_index(raft, new_commit_index)) {
|
||
raft_send_heartbeats(raft);
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_update_match_index(struct raft *raft, struct raft_server *s,
|
||
uint64_t min_index)
|
||
{
|
||
ovs_assert(raft->role == RAFT_LEADER);
|
||
if (min_index > s->match_index) {
|
||
s->match_index = min_index;
|
||
raft_consider_updating_commit_index(raft);
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_update_our_match_index(struct raft *raft, uint64_t min_index)
|
||
{
|
||
struct raft_server *server = raft_find_server(raft, &raft->sid);
|
||
if (server) {
|
||
raft_update_match_index(raft, server, min_index);
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_send_install_snapshot_request(struct raft *raft,
|
||
const struct raft_server *s,
|
||
const char *comment)
|
||
{
|
||
const struct json *data = raft_entry_get_serialized_data(&raft->snap);
|
||
union raft_rpc rpc = {
|
||
.install_snapshot_request = {
|
||
.common = {
|
||
.type = RAFT_RPC_INSTALL_SNAPSHOT_REQUEST,
|
||
.sid = s->sid,
|
||
.comment = CONST_CAST(char *, comment),
|
||
},
|
||
.term = raft->term,
|
||
.last_index = raft->log_start - 1,
|
||
.last_term = raft->snap.term,
|
||
.last_servers = raft->snap.servers,
|
||
.last_eid = raft->snap.eid,
|
||
.data = CONST_CAST(struct json *, data),
|
||
.election_timer = raft->election_timer, /* use latest value */
|
||
}
|
||
};
|
||
|
||
if (s->install_snapshot_request_in_progress) {
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
|
||
|
||
VLOG_INFO_RL(&rl, "not sending snapshot to server %s, "
|
||
"already in progress", s->nickname);
|
||
return;
|
||
}
|
||
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
|
||
VLOG_INFO_RL(&rl, "sending snapshot to server %s, %"PRIu64":%"PRIu64".",
|
||
s->nickname, raft->term, raft->log_start - 1);
|
||
CONST_CAST(struct raft_server *, s)->install_snapshot_request_in_progress
|
||
= raft_send(raft, &rpc);
|
||
}
|
||
|
||
static void
|
||
raft_handle_append_reply(struct raft *raft,
|
||
const struct raft_append_reply *rpy)
|
||
{
|
||
if (raft->role != RAFT_LEADER) {
|
||
VLOG_INFO("rejected append_reply (not leader)");
|
||
return;
|
||
}
|
||
|
||
/* Most commonly we'd be getting an AppendEntries reply from a configured
|
||
* server (e.g. a peer), but we can also get them from servers in the
|
||
* process of being added. */
|
||
struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
|
||
if (!s) {
|
||
s = raft_find_new_server(raft, &rpy->common.sid);
|
||
if (!s) {
|
||
VLOG_INFO("rejected append_reply from unknown server "SID_FMT,
|
||
SID_ARGS(&rpy->common.sid));
|
||
return;
|
||
}
|
||
}
|
||
|
||
s->replied = true;
|
||
if (rpy->result == RAFT_APPEND_OK) {
|
||
/* Figure 3.1: "If successful, update nextIndex and matchIndex for
|
||
* follower (section 3.5)." */
|
||
uint64_t min_index = rpy->prev_log_index + rpy->n_entries + 1;
|
||
if (s->next_index < min_index) {
|
||
s->next_index = min_index;
|
||
}
|
||
raft_update_match_index(raft, s, min_index - 1);
|
||
} else {
|
||
/* Figure 3.1: "If AppendEntries fails because of log inconsistency,
|
||
* decrement nextIndex and retry (section 3.5)."
|
||
*
|
||
* We also implement the optimization suggested in section 4.2.1:
|
||
* "Various approaches can make nextIndex converge to its correct value
|
||
* more quickly, including those described in Chapter 3. The simplest
|
||
* approach to solving this particular problem of adding a new server,
|
||
* however, is to have followers return the length of their logs in the
|
||
* AppendEntries response; this allows the leader to cap the follower's
|
||
* nextIndex accordingly." */
|
||
s->next_index = (s->next_index > 0
|
||
? MIN(s->next_index - 1, rpy->log_end)
|
||
: 0);
|
||
|
||
if (rpy->result == RAFT_APPEND_IO_ERROR) {
|
||
/* Append failed but not because of a log inconsistency. Because
|
||
* of the I/O error, there's no point in re-sending the append
|
||
* immediately. */
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
|
||
VLOG_INFO_RL(&rl, "%s reported I/O error", s->nickname);
|
||
return;
|
||
}
|
||
}
|
||
|
||
/*
|
||
* Our behavior here must depend on the value of next_index relative to
|
||
* log_start and log_end. There are three cases:
|
||
*
|
||
* Case 1 | Case 2 | Case 3
|
||
* <---------------->|<------------->|<------------------>
|
||
* | |
|
||
*
|
||
* +---+---+---+---+
|
||
* T | T | T | T | T |
|
||
* +---+---+---+---+
|
||
* ^ ^
|
||
* | |
|
||
* log_start log_end
|
||
*/
|
||
if (s->next_index < raft->log_start) {
|
||
/* Case 1. */
|
||
raft_send_install_snapshot_request(raft, s, NULL);
|
||
} else if (s->next_index < raft->log_end) {
|
||
/* Case 2. */
|
||
raft_send_append_request(raft, s, raft->log_end - s->next_index, NULL);
|
||
} else {
|
||
/* Case 3. */
|
||
if (s->phase == RAFT_PHASE_CATCHUP) {
|
||
s->phase = RAFT_PHASE_CAUGHT_UP;
|
||
raft_run_reconfigure(raft);
|
||
}
|
||
}
|
||
}
|
||
|
||
static bool
|
||
raft_should_suppress_disruptive_server(struct raft *raft,
|
||
const union raft_rpc *rpc)
|
||
{
|
||
if (rpc->type != RAFT_RPC_VOTE_REQUEST) {
|
||
return false;
|
||
}
|
||
|
||
/* Section 4.2.3 "Disruptive Servers" says:
|
||
*
|
||
* ...if a server receives a RequestVote request within the minimum
|
||
* election timeout of hearing from a current leader, it does not update
|
||
* its term or grant its vote...
|
||
*
|
||
* ...This change conflicts with the leadership transfer mechanism as
|
||
* described in Chapter 3, in which a server legitimately starts an
|
||
* election without waiting an election timeout. In that case,
|
||
* RequestVote messages should be processed by other servers even when
|
||
* they believe a current cluster leader exists. Those RequestVote
|
||
* requests can include a special flag to indicate this behavior ("I
|
||
* have permission to disrupt the leader--it told me to!").
|
||
*
|
||
* This clearly describes how the followers should act, but not the leader.
|
||
* We just ignore vote requests that arrive at a current leader. This
|
||
* seems to be fairly safe, since a majority other than the current leader
|
||
* can still elect a new leader and the first AppendEntries from that new
|
||
* leader will depose the current leader. */
|
||
const struct raft_vote_request *rq = raft_vote_request_cast(rpc);
|
||
if (rq->leadership_transfer) {
|
||
return false;
|
||
}
|
||
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
|
||
long long int now = time_msec();
|
||
switch (raft->role) {
|
||
case RAFT_LEADER:
|
||
VLOG_WARN_RL(&rl, "ignoring vote request received as leader");
|
||
return true;
|
||
|
||
case RAFT_FOLLOWER:
|
||
if (now < raft->election_base + raft->election_timer) {
|
||
VLOG_WARN_RL(&rl, "ignoring vote request received after only "
|
||
"%lld ms (minimum election time is %"PRIu64" ms)",
|
||
now - raft->election_base, raft->election_timer);
|
||
return true;
|
||
}
|
||
return false;
|
||
|
||
case RAFT_CANDIDATE:
|
||
return false;
|
||
|
||
default:
|
||
OVS_NOT_REACHED();
|
||
}
|
||
}
|
||
|
||
/* Returns true if a reply should be sent. */
|
||
static bool
|
||
raft_handle_vote_request__(struct raft *raft,
|
||
const struct raft_vote_request *rq)
|
||
{
|
||
/* Figure 3.1: "If votedFor is null or candidateId, and candidate's vote is
|
||
* at least as up-to-date as receiver's log, grant vote (sections 3.4,
|
||
* 3.6)." */
|
||
if (uuid_equals(&raft->vote, &rq->common.sid)) {
|
||
/* Already voted for this candidate in this term. Resend vote. */
|
||
return true;
|
||
} else if (!uuid_is_zero(&raft->vote)) {
|
||
/* Already voted for different candidate in this term. Send a reply
|
||
* saying what candidate we did vote for. This isn't a necessary part
|
||
* of the Raft protocol but it can make debugging easier. */
|
||
return true;
|
||
}
|
||
|
||
/* Section 3.6.1: "The RequestVote RPC implements this restriction: the RPC
|
||
* includes information about the candidate's log, and the voter denies its
|
||
* vote if its own log is more up-to-date than that of the candidate. Raft
|
||
* determines which of two logs is more up-to-date by comparing the index
|
||
* and term of the last entries in the logs. If the logs have last entries
|
||
* with different terms, then the log with the later term is more
|
||
* up-to-date. If the logs end with the same term, then whichever log is
|
||
* longer is more up-to-date." */
|
||
uint64_t last_term = (raft->log_end > raft->log_start
|
||
? raft->entries[raft->log_end - 1
|
||
- raft->log_start].term
|
||
: raft->snap.term);
|
||
if (last_term > rq->last_log_term
|
||
|| (last_term == rq->last_log_term
|
||
&& raft->log_end - 1 > rq->last_log_index)) {
|
||
/* Our log is more up-to-date than the peer's. Withhold vote. */
|
||
return false;
|
||
}
|
||
|
||
if (rq->is_prevote) {
|
||
return true;
|
||
}
|
||
|
||
/* Record a vote for the peer. */
|
||
if (!raft_set_term(raft, raft->term, &rq->common.sid)) {
|
||
return false;
|
||
}
|
||
|
||
raft_reset_election_timer(raft);
|
||
|
||
return true;
|
||
}
|
||
|
||
static void
|
||
raft_send_vote_reply(struct raft *raft, const struct uuid *dst,
|
||
const struct uuid *vote, bool is_prevote)
|
||
{
|
||
union raft_rpc rpy = {
|
||
.vote_reply = {
|
||
.common = {
|
||
.type = RAFT_RPC_VOTE_REPLY,
|
||
.sid = *dst,
|
||
},
|
||
.term = raft->term,
|
||
.vote = *vote,
|
||
.is_prevote = is_prevote,
|
||
},
|
||
};
|
||
raft_send(raft, &rpy);
|
||
}
|
||
|
||
static void
|
||
raft_handle_vote_request(struct raft *raft,
|
||
const struct raft_vote_request *rq)
|
||
{
|
||
if (raft_handle_vote_request__(raft, rq)) {
|
||
raft_send_vote_reply(raft, &rq->common.sid,
|
||
rq->is_prevote ? &rq->common.sid : &raft->vote,
|
||
rq->is_prevote);
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_handle_vote_reply(struct raft *raft,
|
||
const struct raft_vote_reply *rpy)
|
||
{
|
||
if (!raft_receive_term__(raft, &rpy->common, rpy->term)) {
|
||
return;
|
||
}
|
||
|
||
if (raft->role != RAFT_CANDIDATE) {
|
||
return;
|
||
}
|
||
|
||
struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
|
||
if (s) {
|
||
if (raft_accept_vote(raft, s, &rpy->vote)) {
|
||
if (raft->prevote_passed) {
|
||
raft_become_leader(raft);
|
||
} else {
|
||
/* Start the real election. */
|
||
raft_start_election(raft, false, false);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
/* Returns true if 'raft''s log contains reconfiguration entries that have not
|
||
* yet been committed. */
|
||
static bool
|
||
raft_has_uncommitted_configuration(const struct raft *raft)
|
||
{
|
||
for (uint64_t i = raft->commit_index + 1; i < raft->log_end; i++) {
|
||
ovs_assert(i >= raft->log_start);
|
||
const struct raft_entry *e = &raft->entries[i - raft->log_start];
|
||
if (e->servers) {
|
||
return true;
|
||
}
|
||
}
|
||
return false;
|
||
}
|
||
|
||
static void
|
||
raft_log_reconfiguration(struct raft *raft)
|
||
{
|
||
struct json *servers_json = raft_servers_to_json(&raft->servers);
|
||
raft_command_unref(raft_command_execute__(
|
||
raft, NULL, servers_json, 0, NULL, NULL));
|
||
json_destroy(servers_json);
|
||
}
|
||
|
||
static void
|
||
raft_run_reconfigure(struct raft *raft)
|
||
{
|
||
ovs_assert(raft->role == RAFT_LEADER);
|
||
|
||
/* Reconfiguration only progresses when configuration changes commit. */
|
||
if (raft_has_uncommitted_configuration(raft)) {
|
||
return;
|
||
}
|
||
|
||
/* If we were waiting for a configuration change to commit, it's done. */
|
||
struct raft_server *s;
|
||
HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
|
||
if (s->phase == RAFT_PHASE_COMMITTING) {
|
||
raft_send_add_server_reply__(raft, &s->sid, s->address,
|
||
true, RAFT_SERVER_COMPLETED);
|
||
s->phase = RAFT_PHASE_STABLE;
|
||
}
|
||
}
|
||
if (raft->remove_server) {
|
||
raft_send_remove_server_reply__(raft, &raft->remove_server->sid,
|
||
&raft->remove_server->requester_sid,
|
||
raft->remove_server->requester_conn,
|
||
true, RAFT_SERVER_COMPLETED);
|
||
raft_server_destroy(raft->remove_server);
|
||
raft->remove_server = NULL;
|
||
}
|
||
|
||
/* If a new server is caught up, add it to the configuration. */
|
||
HMAP_FOR_EACH (s, hmap_node, &raft->add_servers) {
|
||
if (s->phase == RAFT_PHASE_CAUGHT_UP) {
|
||
/* Move 's' from 'raft->add_servers' to 'raft->servers'. */
|
||
hmap_remove(&raft->add_servers, &s->hmap_node);
|
||
hmap_insert(&raft->servers, &s->hmap_node, uuid_hash(&s->sid));
|
||
|
||
/* Mark 's' as waiting for commit. */
|
||
s->phase = RAFT_PHASE_COMMITTING;
|
||
|
||
raft_log_reconfiguration(raft);
|
||
|
||
/* When commit completes we'll transition to RAFT_PHASE_STABLE and
|
||
* send a RAFT_SERVER_OK reply. */
|
||
|
||
return;
|
||
}
|
||
}
|
||
|
||
/* Remove a server, if one is scheduled for removal. */
|
||
HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
|
||
if (s->phase == RAFT_PHASE_REMOVE) {
|
||
hmap_remove(&raft->servers, &s->hmap_node);
|
||
raft->remove_server = s;
|
||
|
||
raft_log_reconfiguration(raft);
|
||
|
||
return;
|
||
}
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_handle_add_server_request(struct raft *raft,
|
||
const struct raft_add_server_request *rq)
|
||
{
|
||
/* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
|
||
if (raft->role != RAFT_LEADER) {
|
||
raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
|
||
return;
|
||
}
|
||
|
||
/* Check for an existing server. */
|
||
struct raft_server *s = raft_find_server(raft, &rq->common.sid);
|
||
if (s) {
|
||
/* If the server is scheduled to be removed, cancel it. */
|
||
if (s->phase == RAFT_PHASE_REMOVE) {
|
||
s->phase = RAFT_PHASE_STABLE;
|
||
raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_CANCELED);
|
||
return;
|
||
}
|
||
|
||
/* If the server is being added, then it's in progress. */
|
||
if (s->phase != RAFT_PHASE_STABLE) {
|
||
raft_send_add_server_reply(raft, rq,
|
||
false, RAFT_SERVER_IN_PROGRESS);
|
||
}
|
||
|
||
/* Nothing to do--server is already part of the configuration. */
|
||
raft_send_add_server_reply(raft, rq,
|
||
true, RAFT_SERVER_ALREADY_PRESENT);
|
||
return;
|
||
}
|
||
|
||
/* Check for a server being removed. */
|
||
if (raft->remove_server
|
||
&& uuid_equals(&rq->common.sid, &raft->remove_server->sid)) {
|
||
raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
|
||
return;
|
||
}
|
||
|
||
/* Check for a server already being added. */
|
||
if (raft_find_new_server(raft, &rq->common.sid)) {
|
||
raft_send_add_server_reply(raft, rq, false, RAFT_SERVER_IN_PROGRESS);
|
||
return;
|
||
}
|
||
|
||
/* Add server to 'add_servers'. */
|
||
s = raft_server_add(&raft->add_servers, &rq->common.sid, rq->address);
|
||
raft_server_init_leader(raft, s);
|
||
s->requester_sid = rq->common.sid;
|
||
s->requester_conn = NULL;
|
||
s->phase = RAFT_PHASE_CATCHUP;
|
||
s->last_msg_ts = time_msec();
|
||
|
||
/* Start sending the log. If this is the first time we've tried to add
|
||
* this server, then this will quickly degenerate into an InstallSnapshot
|
||
* followed by a series of AddEntries, but if it's a retry of an earlier
|
||
* AddRequest that was interrupted (e.g. by a timeout or a loss of
|
||
* leadership) then it will gracefully resume populating the log.
|
||
*
|
||
* See the last few paragraphs of section 4.2.1 for further insight. */
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
|
||
VLOG_INFO_RL(&rl,
|
||
"starting to add server %s ("SID_FMT" at %s) "
|
||
"to cluster "CID_FMT, s->nickname, SID_ARGS(&s->sid),
|
||
rq->address, CID_ARGS(&raft->cid));
|
||
raft_send_append_request(raft, s, 0, "initialize new server");
|
||
|
||
if (failure_test == FT_TRANSFER_LEADERSHIP_AFTER_STARTING_TO_ADD) {
|
||
failure_test = FT_TRANSFER_LEADERSHIP;
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_handle_add_server_reply(struct raft *raft,
|
||
const struct raft_add_server_reply *rpy)
|
||
{
|
||
if (!raft->joining) {
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
|
||
VLOG_WARN_RL(&rl, "received add_server_reply even though we're "
|
||
"already part of the cluster");
|
||
return;
|
||
}
|
||
|
||
if (rpy->success) {
|
||
raft_join_complete(raft);
|
||
|
||
/* It is tempting, at this point, to check that this server is part of
|
||
* the current configuration. However, this is not necessarily the
|
||
* case, because the log entry that added this server to the cluster
|
||
* might have been committed by a majority of the cluster that does not
|
||
* include this one. This actually happens in testing. */
|
||
} else {
|
||
const char *address;
|
||
SSET_FOR_EACH (address, &rpy->remote_addresses) {
|
||
if (sset_add(&raft->remote_addresses, address)) {
|
||
VLOG_INFO("%s: learned new server address for joining cluster",
|
||
address);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
/* This is called by raft_unixctl_kick() as well as via RPC. */
|
||
static void
|
||
raft_handle_remove_server_request(struct raft *raft,
|
||
const struct raft_remove_server_request *rq)
|
||
{
|
||
/* Figure 4.1: "1. Reply NOT_LEADER if not leader (section 6.2)." */
|
||
if (raft->role != RAFT_LEADER) {
|
||
raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_NOT_LEADER);
|
||
return;
|
||
}
|
||
|
||
/* If the server to remove is currently waiting to be added, cancel it. */
|
||
struct raft_server *target = raft_find_new_server(raft, &rq->sid);
|
||
if (target) {
|
||
raft_send_add_server_reply__(raft, &target->sid, target->address,
|
||
false, RAFT_SERVER_CANCELED);
|
||
hmap_remove(&raft->add_servers, &target->hmap_node);
|
||
raft_server_destroy(target);
|
||
return;
|
||
}
|
||
|
||
/* Check for the server already being removed. */
|
||
if (raft->remove_server
|
||
&& uuid_equals(&rq->sid, &raft->remove_server->sid)) {
|
||
raft_send_remove_server_reply(raft, rq,
|
||
false, RAFT_SERVER_IN_PROGRESS);
|
||
return;
|
||
}
|
||
|
||
/* If the server isn't configured, report that. */
|
||
target = raft_find_server(raft, &rq->sid);
|
||
if (!target) {
|
||
raft_send_remove_server_reply(raft, rq,
|
||
true, RAFT_SERVER_ALREADY_GONE);
|
||
return;
|
||
}
|
||
|
||
/* Check whether we're waiting for the addition of the server to commit. */
|
||
if (target->phase == RAFT_PHASE_COMMITTING) {
|
||
raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_COMMITTING);
|
||
return;
|
||
}
|
||
|
||
/* Check whether the server is already scheduled for removal. */
|
||
if (target->phase == RAFT_PHASE_REMOVE) {
|
||
raft_send_remove_server_reply(raft, rq,
|
||
false, RAFT_SERVER_IN_PROGRESS);
|
||
return;
|
||
}
|
||
|
||
/* Make sure that if we remove this server then that at least one other
|
||
* server will be left. We don't count servers currently being added (in
|
||
* 'add_servers') since those could fail. */
|
||
struct raft_server *s;
|
||
int n = 0;
|
||
HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
|
||
if (s != target && s->phase != RAFT_PHASE_REMOVE) {
|
||
n++;
|
||
}
|
||
}
|
||
if (!n) {
|
||
raft_send_remove_server_reply(raft, rq, false, RAFT_SERVER_EMPTY);
|
||
return;
|
||
}
|
||
|
||
/* Mark the server for removal. */
|
||
target->phase = RAFT_PHASE_REMOVE;
|
||
if (rq->requester_conn) {
|
||
target->requester_sid = UUID_ZERO;
|
||
unixctl_command_reply(rq->requester_conn, "started removal");
|
||
} else {
|
||
target->requester_sid = rq->common.sid;
|
||
target->requester_conn = NULL;
|
||
}
|
||
|
||
raft_run_reconfigure(raft);
|
||
/* Operation in progress, reply will be sent later. */
|
||
}
|
||
|
||
static void
|
||
raft_finished_leaving_cluster(struct raft *raft)
|
||
{
|
||
VLOG_INFO(SID_FMT": finished leaving cluster "CID_FMT,
|
||
SID_ARGS(&raft->sid), CID_ARGS(&raft->cid));
|
||
|
||
raft_record_note(raft, "left", "this server left the cluster");
|
||
|
||
raft->leaving = false;
|
||
raft->left = true;
|
||
}
|
||
|
||
static void
|
||
raft_handle_remove_server_reply(struct raft *raft,
|
||
const struct raft_remove_server_reply *rpc)
|
||
{
|
||
if (rpc->success
|
||
&& (uuid_is_zero(&rpc->target_sid)
|
||
|| uuid_equals(&rpc->target_sid, &raft->sid))) {
|
||
raft_finished_leaving_cluster(raft);
|
||
}
|
||
}
|
||
|
||
static bool
|
||
raft_handle_write_error(struct raft *raft, struct ovsdb_error *error)
|
||
{
|
||
if (error && !raft->failed) {
|
||
raft->failed = true;
|
||
|
||
char *s = ovsdb_error_to_string_free(error);
|
||
VLOG_WARN("%s: entering failure mode due to I/O error (%s)",
|
||
raft->name, s);
|
||
free(s);
|
||
}
|
||
return !raft->failed;
|
||
}
|
||
|
||
static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
|
||
raft_write_snapshot(struct raft *raft, struct ovsdb_log *log,
|
||
uint64_t new_log_start,
|
||
const struct raft_entry *new_snapshot)
|
||
{
|
||
/* Ensure that new snapshot contains serialized data object, so it will
|
||
* not be allocated while serializing the on-stack raft header object. */
|
||
ovs_assert(raft_entry_get_serialized_data(new_snapshot));
|
||
|
||
struct raft_header h = {
|
||
.sid = raft->sid,
|
||
.cid = raft->cid,
|
||
.name = raft->name,
|
||
.local_address = raft->local_address,
|
||
.snap_index = new_log_start - 1,
|
||
.snap = *new_snapshot,
|
||
};
|
||
struct ovsdb_error *error = ovsdb_log_write_and_free(
|
||
log, raft_header_to_json(&h));
|
||
if (error) {
|
||
return error;
|
||
}
|
||
ovsdb_log_mark_base(log);
|
||
|
||
/* Write log records. */
|
||
for (uint64_t index = new_log_start; index < raft->log_end; index++) {
|
||
const struct raft_entry *e = &raft->entries[index - raft->log_start];
|
||
const struct json *log_data = raft_entry_get_serialized_data(e);
|
||
struct raft_record r = {
|
||
.type = RAFT_REC_ENTRY,
|
||
.term = e->term,
|
||
.entry = {
|
||
.index = index,
|
||
.data = CONST_CAST(struct json *, log_data),
|
||
.servers = e->servers,
|
||
.election_timer = e->election_timer,
|
||
.eid = e->eid,
|
||
},
|
||
};
|
||
error = ovsdb_log_write_and_free(log, raft_record_to_json(&r));
|
||
if (error) {
|
||
return error;
|
||
}
|
||
}
|
||
|
||
/* Write term and vote (if any).
|
||
*
|
||
* The term is redundant if we wrote a log record for that term above. The
|
||
* vote, if any, is never redundant.
|
||
*/
|
||
error = raft_write_state(log, raft->term, &raft->vote);
|
||
if (error) {
|
||
return error;
|
||
}
|
||
|
||
/* Write commit_index if it's beyond the new start of the log. */
|
||
if (raft->commit_index >= new_log_start) {
|
||
struct raft_record r = {
|
||
.type = RAFT_REC_COMMIT_INDEX,
|
||
.commit_index = raft->commit_index,
|
||
};
|
||
return ovsdb_log_write_and_free(log, raft_record_to_json(&r));
|
||
}
|
||
return NULL;
|
||
}
|
||
|
||
static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
|
||
raft_save_snapshot(struct raft *raft,
|
||
uint64_t new_start, const struct raft_entry *new_snapshot)
|
||
|
||
{
|
||
struct ovsdb_log *new_log;
|
||
struct ovsdb_error *error;
|
||
error = ovsdb_log_replace_start(raft->log, &new_log);
|
||
if (error) {
|
||
return error;
|
||
}
|
||
|
||
error = raft_write_snapshot(raft, new_log, new_start, new_snapshot);
|
||
if (error) {
|
||
ovsdb_log_replace_abort(new_log);
|
||
return error;
|
||
}
|
||
|
||
return ovsdb_log_replace_commit(raft->log, new_log);
|
||
}
|
||
|
||
static bool
|
||
raft_handle_install_snapshot_request__(
|
||
struct raft *raft, const struct raft_install_snapshot_request *rq)
|
||
{
|
||
raft_reset_election_timer(raft);
|
||
|
||
/*
|
||
* Our behavior here depend on new_log_start in the snapshot compared to
|
||
* log_start and log_end. There are three cases:
|
||
*
|
||
* Case 1 | Case 2 | Case 3
|
||
* <---------------->|<------------->|<------------------>
|
||
* | |
|
||
*
|
||
* +---+---+---+---+
|
||
* T | T | T | T | T |
|
||
* +---+---+---+---+
|
||
* ^ ^
|
||
* | |
|
||
* log_start log_end
|
||
*/
|
||
uint64_t new_log_start = rq->last_index + 1;
|
||
if (new_log_start < raft->log_start) {
|
||
/* Case 1: The new snapshot covers less than our current one. Nothing
|
||
* to do. */
|
||
return true;
|
||
} else if (new_log_start < raft->log_end) {
|
||
/* Case 2: The new snapshot starts in the middle of our log. We could
|
||
* discard the first 'new_log_start - raft->log_start' entries in the
|
||
* log. But there's not much value in that, since snapshotting is
|
||
* supposed to be a local decision. Just skip it. */
|
||
return true;
|
||
}
|
||
|
||
/* Case 3: The new snapshot starts past the end of our current log, so
|
||
* discard all of our current log. */
|
||
struct raft_entry new_snapshot = {
|
||
.term = rq->last_term,
|
||
.eid = rq->last_eid,
|
||
.servers = json_clone(rq->last_servers),
|
||
.election_timer = rq->election_timer,
|
||
};
|
||
raft_entry_set_parsed_data(&new_snapshot, rq->data);
|
||
|
||
struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
|
||
&new_snapshot);
|
||
if (error) {
|
||
char *error_s = ovsdb_error_to_string_free(error);
|
||
VLOG_WARN("could not save snapshot: %s", error_s);
|
||
free(error_s);
|
||
raft_entry_uninit(&new_snapshot);
|
||
return false;
|
||
}
|
||
|
||
for (size_t i = 0; i < raft->log_end - raft->log_start; i++) {
|
||
raft_entry_uninit(&raft->entries[i]);
|
||
}
|
||
raft->log_start = raft->log_end = new_log_start;
|
||
raft->log_synced = raft->log_end - 1;
|
||
raft->commit_index = raft->log_start - 1;
|
||
if (raft->last_applied < raft->commit_index) {
|
||
raft->last_applied = raft->log_start - 2;
|
||
}
|
||
|
||
raft_entry_uninit(&raft->snap);
|
||
raft->snap = new_snapshot;
|
||
|
||
raft_get_servers_from_log(raft, VLL_INFO);
|
||
raft_get_election_timer_from_log(raft);
|
||
|
||
return true;
|
||
}
|
||
|
||
static void
|
||
raft_handle_install_snapshot_request(
|
||
struct raft *raft, const struct raft_install_snapshot_request *rq)
|
||
{
|
||
if (failure_test == FT_CRASH_BEFORE_SEND_SNAPSHOT_REP) {
|
||
ovs_fatal(0, "Raft test: crash before sending install_snapshot_reply");
|
||
}
|
||
|
||
if (raft_handle_install_snapshot_request__(raft, rq)) {
|
||
union raft_rpc rpy = {
|
||
.install_snapshot_reply = {
|
||
.common = {
|
||
.type = RAFT_RPC_INSTALL_SNAPSHOT_REPLY,
|
||
.sid = rq->common.sid,
|
||
},
|
||
.term = raft->term,
|
||
.last_index = rq->last_index,
|
||
.last_term = rq->last_term,
|
||
},
|
||
};
|
||
raft_send(raft, &rpy);
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_handle_install_snapshot_reply(
|
||
struct raft *raft, const struct raft_install_snapshot_reply *rpy)
|
||
{
|
||
/* We might get an InstallSnapshot reply from a configured server (e.g. a
|
||
* peer) or a server in the process of being added. */
|
||
struct raft_server *s = raft_find_peer(raft, &rpy->common.sid);
|
||
if (!s) {
|
||
s = raft_find_new_server(raft, &rpy->common.sid);
|
||
if (!s) {
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
|
||
VLOG_INFO_RL(&rl, "cluster "CID_FMT": received %s from "
|
||
"unknown server "SID_FMT, CID_ARGS(&raft->cid),
|
||
raft_rpc_type_to_string(rpy->common.type),
|
||
SID_ARGS(&rpy->common.sid));
|
||
return;
|
||
}
|
||
}
|
||
|
||
s->install_snapshot_request_in_progress = false;
|
||
|
||
if (rpy->last_index != raft->log_start - 1 ||
|
||
rpy->last_term != raft->snap.term) {
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
|
||
VLOG_INFO_RL(&rl, "cluster "CID_FMT": server %s installed "
|
||
"out-of-date snapshot, starting over",
|
||
CID_ARGS(&raft->cid), s->nickname);
|
||
raft_send_install_snapshot_request(raft, s,
|
||
"installed obsolete snapshot");
|
||
return;
|
||
}
|
||
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 10);
|
||
VLOG_INFO_RL(&rl, "cluster "CID_FMT": installed snapshot on server %s "
|
||
" up to %"PRIu64":%"PRIu64, CID_ARGS(&raft->cid),
|
||
s->nickname, rpy->last_term, rpy->last_index);
|
||
s->next_index = raft->log_start;
|
||
raft_send_append_request(raft, s, raft->log_end - s->next_index,
|
||
"snapshot installed");
|
||
}
|
||
|
||
/* Returns true if 'raft' has grown enough since the last snapshot that
|
||
* reducing the log to a snapshot would be valuable, false otherwise. */
|
||
bool
|
||
raft_grew_lots(const struct raft *raft)
|
||
{
|
||
return ovsdb_log_grew_lots(raft->log);
|
||
}
|
||
|
||
/* Returns the number of log entries that could be trimmed off the on-disk log
|
||
* by snapshotting. */
|
||
uint64_t
|
||
raft_get_log_length(const struct raft *raft)
|
||
{
|
||
return (raft->last_applied < raft->log_start
|
||
? 0
|
||
: raft->last_applied - raft->log_start + 1);
|
||
}
|
||
|
||
/* Returns true if taking a snapshot of 'raft', with raft_store_snapshot(), is
|
||
* possible. */
|
||
bool
|
||
raft_may_snapshot(const struct raft *raft)
|
||
{
|
||
return (!raft->joining
|
||
&& !raft->leaving
|
||
&& !raft->left
|
||
&& !raft->failed
|
||
&& (raft->role == RAFT_FOLLOWER || hmap_count(&raft->servers) == 1)
|
||
&& raft->last_applied >= raft->log_start);
|
||
}
|
||
|
||
/* Prepares for soon snapshotting. */
|
||
void
|
||
raft_notify_snapshot_recommended(struct raft *raft)
|
||
{
|
||
if (raft->role == RAFT_LEADER) {
|
||
/* Leader is about to write database snapshot to the disk and this
|
||
* might take significant amount of time. Stepping back from the
|
||
* leadership to keep the cluster functional during this process. */
|
||
VLOG_INFO("Transferring leadership to write a snapshot.");
|
||
raft_transfer_leadership(raft, "preparing to write snapshot");
|
||
raft_become_follower(raft);
|
||
}
|
||
}
|
||
|
||
/* Replaces the log for 'raft', up to the last log entry read, by
|
||
* 'new_snapshot_data'. Returns NULL if successful, otherwise an error that
|
||
* the caller must eventually free.
|
||
*
|
||
* This function can only succeed if raft_may_snapshot() returns true. It is
|
||
* only valuable to call it if raft_get_log_length() is significant and
|
||
* especially if raft_grew_lots() returns true. */
|
||
struct ovsdb_error * OVS_WARN_UNUSED_RESULT
|
||
raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data,
|
||
uint64_t applied_index)
|
||
{
|
||
if (raft->joining) {
|
||
return ovsdb_error(NULL,
|
||
"cannot store a snapshot while joining cluster");
|
||
} else if (raft->leaving) {
|
||
return ovsdb_error(NULL,
|
||
"cannot store a snapshot while leaving cluster");
|
||
} else if (raft->left) {
|
||
return ovsdb_error(NULL,
|
||
"cannot store a snapshot after leaving cluster");
|
||
} else if (raft->failed) {
|
||
return ovsdb_error(NULL,
|
||
"cannot store a snapshot following failure");
|
||
}
|
||
|
||
uint64_t new_log_start = applied_index ? applied_index + 1
|
||
: raft->last_applied + 1;
|
||
if (new_log_start <= raft->log_start) {
|
||
return ovsdb_error(NULL, "not storing a duplicate snapshot");
|
||
}
|
||
|
||
struct raft_entry new_snapshot = {
|
||
.term = raft_get_term(raft, new_log_start - 1),
|
||
.eid = *raft_get_eid(raft, new_log_start - 1),
|
||
.servers = json_clone(raft_servers_for_index(raft, new_log_start - 1)),
|
||
.election_timer = raft->election_timer,
|
||
};
|
||
raft_entry_set_parsed_data(&new_snapshot, new_snapshot_data);
|
||
|
||
struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
|
||
&new_snapshot);
|
||
if (error) {
|
||
raft_entry_uninit(&new_snapshot);
|
||
return error;
|
||
}
|
||
|
||
raft->log_synced = raft->log_end - 1;
|
||
raft_entry_uninit(&raft->snap);
|
||
raft->snap = new_snapshot;
|
||
for (size_t i = 0; i < new_log_start - raft->log_start; i++) {
|
||
raft_entry_uninit(&raft->entries[i]);
|
||
}
|
||
memmove(&raft->entries[0], &raft->entries[new_log_start - raft->log_start],
|
||
(raft->log_end - new_log_start) * sizeof *raft->entries);
|
||
raft->log_start = new_log_start;
|
||
/* It's a snapshot of the current database state, ovsdb-server will not
|
||
* read it back. Destroying the parsed json object to not waste memory. */
|
||
json_destroy(raft_entry_steal_parsed_data(&raft->snap));
|
||
return NULL;
|
||
}
|
||
|
||
static void
|
||
raft_handle_become_leader(struct raft *raft,
|
||
const struct raft_become_leader *rq)
|
||
{
|
||
if (raft->role == RAFT_FOLLOWER) {
|
||
char buf[SID_LEN + 1];
|
||
VLOG_INFO("received leadership transfer from %s in term %"PRIu64,
|
||
raft_get_nickname(raft, &rq->common.sid, buf, sizeof buf),
|
||
rq->term);
|
||
raft_start_election(raft, false, true);
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_send_execute_command_reply(struct raft *raft,
|
||
const struct uuid *sid,
|
||
const struct uuid *eid,
|
||
enum raft_command_status status,
|
||
uint64_t commit_index)
|
||
{
|
||
if (failure_test == FT_CRASH_BEFORE_SEND_EXEC_REP) {
|
||
ovs_fatal(0, "Raft test: crash before sending execute_command_reply");
|
||
}
|
||
union raft_rpc rpc = {
|
||
.execute_command_reply = {
|
||
.common = {
|
||
.type = RAFT_RPC_EXECUTE_COMMAND_REPLY,
|
||
.sid = *sid,
|
||
},
|
||
.result = *eid,
|
||
.status = status,
|
||
.commit_index = commit_index,
|
||
},
|
||
};
|
||
raft_send(raft, &rpc);
|
||
if (failure_test == FT_CRASH_AFTER_SEND_EXEC_REP) {
|
||
ovs_fatal(0, "Raft test: crash after sending execute_command_reply.");
|
||
}
|
||
}
|
||
|
||
static enum raft_command_status
|
||
raft_handle_execute_command_request__(
|
||
struct raft *raft, const struct raft_execute_command_request *rq)
|
||
{
|
||
if (raft->role != RAFT_LEADER) {
|
||
return RAFT_CMD_NOT_LEADER;
|
||
}
|
||
|
||
const struct uuid *current_eid = raft_current_eid(raft);
|
||
if (!uuid_equals(&rq->prereq, current_eid)) {
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
|
||
VLOG_INFO_RL(&rl, "current entry eid "UUID_FMT" does not match "
|
||
"prerequisite "UUID_FMT" in execute_command_request",
|
||
UUID_ARGS(current_eid), UUID_ARGS(&rq->prereq));
|
||
return RAFT_CMD_BAD_PREREQ;
|
||
}
|
||
|
||
struct raft_command *cmd = raft_command_initiate(raft, rq->data,
|
||
NULL, 0, &rq->result);
|
||
cmd->sid = rq->common.sid;
|
||
|
||
enum raft_command_status status = cmd->status;
|
||
raft_command_unref(cmd);
|
||
return status;
|
||
}
|
||
|
||
static void
|
||
raft_handle_execute_command_request(
|
||
struct raft *raft, const struct raft_execute_command_request *rq)
|
||
{
|
||
enum raft_command_status status
|
||
= raft_handle_execute_command_request__(raft, rq);
|
||
if (status != RAFT_CMD_INCOMPLETE) {
|
||
raft_send_execute_command_reply(raft, &rq->common.sid, &rq->result,
|
||
status, 0);
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_handle_execute_command_reply(
|
||
struct raft *raft, const struct raft_execute_command_reply *rpy)
|
||
{
|
||
struct raft_command *cmd = raft_find_command_by_eid(raft, &rpy->result);
|
||
if (!cmd) {
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
|
||
char buf[SID_LEN + 1];
|
||
VLOG_INFO_RL(&rl,
|
||
"%s received \"%s\" reply from %s for unknown command",
|
||
raft->local_nickname,
|
||
raft_command_status_to_string(rpy->status),
|
||
raft_get_nickname(raft, &rpy->common.sid,
|
||
buf, sizeof buf));
|
||
return;
|
||
}
|
||
|
||
if (rpy->status == RAFT_CMD_INCOMPLETE) {
|
||
cmd->timestamp = time_msec();
|
||
} else {
|
||
cmd->index = rpy->commit_index;
|
||
raft_command_complete(raft, cmd, rpy->status);
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_handle_rpc(struct raft *raft, const union raft_rpc *rpc)
|
||
{
|
||
struct raft_server *s = raft_find_server(raft, &rpc->common.sid);
|
||
if (s) {
|
||
s->last_msg_ts = time_msec();
|
||
}
|
||
|
||
uint64_t term = raft_rpc_get_term(rpc);
|
||
if (term
|
||
&& !raft_should_suppress_disruptive_server(raft, rpc)
|
||
&& !raft_receive_term__(raft, &rpc->common, term)) {
|
||
if (rpc->type == RAFT_RPC_APPEND_REQUEST) {
|
||
/* Section 3.3: "If a server receives a request with a stale term
|
||
* number, it rejects the request." */
|
||
raft_send_append_reply(raft, raft_append_request_cast(rpc),
|
||
RAFT_APPEND_INCONSISTENCY, "stale term");
|
||
}
|
||
return;
|
||
}
|
||
|
||
switch (rpc->type) {
|
||
#define RAFT_RPC(ENUM, NAME) \
|
||
case ENUM: \
|
||
raft_handle_##NAME(raft, &rpc->NAME); \
|
||
break;
|
||
RAFT_RPC_TYPES
|
||
#undef RAFT_RPC
|
||
default:
|
||
OVS_NOT_REACHED();
|
||
}
|
||
}
|
||
|
||
static bool
|
||
raft_rpc_is_heartbeat(const union raft_rpc *rpc)
|
||
{
|
||
return ((rpc->type == RAFT_RPC_APPEND_REQUEST
|
||
|| rpc->type == RAFT_RPC_APPEND_REPLY)
|
||
&& rpc->common.comment
|
||
&& !strcmp(rpc->common.comment, "heartbeat"));
|
||
}
|
||
|
||
|
||
static bool
|
||
raft_send_to_conn_at(struct raft *raft, const union raft_rpc *rpc,
|
||
struct raft_conn *conn, int line_number)
|
||
{
|
||
log_rpc(rpc, "-->", conn, line_number);
|
||
return failure_test == FT_STOP_RAFT_RPC
|
||
|| !jsonrpc_session_send(
|
||
conn->js, raft_rpc_to_jsonrpc(&raft->cid, &raft->sid, rpc));
|
||
}
|
||
|
||
static bool
|
||
raft_is_rpc_synced(const struct raft *raft, const union raft_rpc *rpc)
|
||
{
|
||
uint64_t term = raft_rpc_get_term(rpc);
|
||
uint64_t index = raft_rpc_get_min_sync_index(rpc);
|
||
const struct uuid *vote = raft_rpc_get_vote(rpc);
|
||
|
||
return (term <= raft->synced_term
|
||
&& index <= raft->log_synced
|
||
&& (!vote || uuid_equals(vote, &raft->synced_vote)));
|
||
}
|
||
|
||
static bool
|
||
raft_send_at(struct raft *raft, const union raft_rpc *rpc, int line_number)
|
||
{
|
||
const struct uuid *dst = &rpc->common.sid;
|
||
if (uuid_equals(dst, &raft->sid)) {
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
|
||
VLOG_WARN_RL(&rl, "attempted to send RPC to self from raft.c:%d",
|
||
line_number);
|
||
return false;
|
||
}
|
||
|
||
struct raft_conn *conn = raft_find_conn_by_sid(raft, dst);
|
||
if (!conn) {
|
||
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
|
||
char buf[SID_LEN + 1];
|
||
VLOG_DBG_RL(&rl, "%s: no connection to %s, cannot send RPC "
|
||
"from raft.c:%d", raft->local_nickname,
|
||
raft_get_nickname(raft, dst, buf, sizeof buf),
|
||
line_number);
|
||
return false;
|
||
}
|
||
|
||
if (!raft_is_rpc_synced(raft, rpc)) {
|
||
raft_waiter_create(raft, RAFT_W_RPC, false)->rpc = raft_rpc_clone(rpc);
|
||
return true;
|
||
}
|
||
|
||
return raft_send_to_conn_at(raft, rpc, conn, line_number);
|
||
}
|
||
|
||
static struct raft *
|
||
raft_lookup_by_name(const char *name)
|
||
{
|
||
struct raft *raft;
|
||
|
||
HMAP_FOR_EACH_WITH_HASH (raft, hmap_node, hash_string(name, 0),
|
||
&all_rafts) {
|
||
if (!strcmp(raft->name, name)) {
|
||
return raft;
|
||
}
|
||
}
|
||
return NULL;
|
||
}
|
||
|
||
static void
|
||
raft_unixctl_cid(struct unixctl_conn *conn,
|
||
int argc OVS_UNUSED, const char *argv[],
|
||
void *aux OVS_UNUSED)
|
||
{
|
||
struct raft *raft = raft_lookup_by_name(argv[1]);
|
||
if (!raft) {
|
||
unixctl_command_reply_error(conn, "unknown cluster");
|
||
} else if (uuid_is_zero(&raft->cid)) {
|
||
unixctl_command_reply_error(conn, "cluster id not yet known");
|
||
} else {
|
||
char *uuid = uuid_to_string(&raft->cid);
|
||
unixctl_command_reply(conn, uuid);
|
||
free(uuid);
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_unixctl_sid(struct unixctl_conn *conn,
|
||
int argc OVS_UNUSED, const char *argv[],
|
||
void *aux OVS_UNUSED)
|
||
{
|
||
struct raft *raft = raft_lookup_by_name(argv[1]);
|
||
if (!raft) {
|
||
unixctl_command_reply_error(conn, "unknown cluster");
|
||
} else {
|
||
char *uuid = uuid_to_string(&raft->sid);
|
||
unixctl_command_reply(conn, uuid);
|
||
free(uuid);
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_put_sid(const char *title, const struct uuid *sid,
|
||
const struct raft *raft, struct ds *s)
|
||
{
|
||
ds_put_format(s, "%s: ", title);
|
||
if (uuid_equals(sid, &raft->sid)) {
|
||
ds_put_cstr(s, "self");
|
||
} else if (uuid_is_zero(sid)) {
|
||
ds_put_cstr(s, "unknown");
|
||
} else {
|
||
char buf[SID_LEN + 1];
|
||
ds_put_cstr(s, raft_get_nickname(raft, sid, buf, sizeof buf));
|
||
}
|
||
ds_put_char(s, '\n');
|
||
}
|
||
|
||
static void
|
||
raft_unixctl_status(struct unixctl_conn *conn,
|
||
int argc OVS_UNUSED, const char *argv[],
|
||
void *aux OVS_UNUSED)
|
||
{
|
||
struct raft *raft = raft_lookup_by_name(argv[1]);
|
||
if (!raft) {
|
||
unixctl_command_reply_error(conn, "unknown cluster");
|
||
return;
|
||
}
|
||
|
||
struct ds s = DS_EMPTY_INITIALIZER;
|
||
ds_put_format(&s, "%s\n", raft->local_nickname);
|
||
ds_put_format(&s, "Name: %s\n", raft->name);
|
||
ds_put_format(&s, "Cluster ID: ");
|
||
if (!uuid_is_zero(&raft->cid)) {
|
||
ds_put_format(&s, CID_FMT" ("UUID_FMT")\n",
|
||
CID_ARGS(&raft->cid), UUID_ARGS(&raft->cid));
|
||
} else {
|
||
ds_put_format(&s, "not yet known\n");
|
||
}
|
||
ds_put_format(&s, "Server ID: "SID_FMT" ("UUID_FMT")\n",
|
||
SID_ARGS(&raft->sid), UUID_ARGS(&raft->sid));
|
||
ds_put_format(&s, "Address: %s\n", raft->local_address);
|
||
ds_put_format(&s, "Status: %s\n",
|
||
raft->joining ? "joining cluster"
|
||
: raft->leaving ? "leaving cluster"
|
||
: raft->left ? "left cluster"
|
||
: raft->failed ? "failed"
|
||
: raft->candidate_retrying
|
||
? "disconnected from the cluster (election timeout)"
|
||
: "cluster member");
|
||
if (raft->joining) {
|
||
ds_put_format(&s, "Remotes for joining:");
|
||
const char *address;
|
||
SSET_FOR_EACH (address, &raft->remote_addresses) {
|
||
ds_put_format(&s, " %s", address);
|
||
}
|
||
ds_put_char(&s, '\n');
|
||
}
|
||
if (raft->role == RAFT_LEADER) {
|
||
struct raft_server *as;
|
||
HMAP_FOR_EACH (as, hmap_node, &raft->add_servers) {
|
||
ds_put_format(&s, "Adding server %s ("SID_FMT" at %s) (%s)\n",
|
||
as->nickname, SID_ARGS(&as->sid), as->address,
|
||
raft_server_phase_to_string(as->phase));
|
||
}
|
||
|
||
struct raft_server *rs = raft->remove_server;
|
||
if (rs) {
|
||
ds_put_format(&s, "Removing server %s ("SID_FMT" at %s) (%s)\n",
|
||
rs->nickname, SID_ARGS(&rs->sid), rs->address,
|
||
raft_server_phase_to_string(rs->phase));
|
||
}
|
||
}
|
||
|
||
ds_put_format(&s, "Role: %s\n", raft_role_to_string(raft->role));
|
||
ds_put_format(&s, "Term: %"PRIu64"\n", raft->term);
|
||
raft_put_sid("Leader", &raft->leader_sid, raft, &s);
|
||
raft_put_sid("Vote", &raft->vote, raft, &s);
|
||
ds_put_char(&s, '\n');
|
||
|
||
if (raft->election_start) {
|
||
ds_put_format(&s,
|
||
"Last Election started %"PRIu64" ms ago, reason: %s\n",
|
||
(uint64_t) (time_msec() - raft->election_start),
|
||
raft->leadership_transfer
|
||
? "leadership_transfer" : "timeout");
|
||
}
|
||
if (raft->election_won) {
|
||
ds_put_format(&s, "Last Election won: %"PRIu64" ms ago\n",
|
||
(uint64_t) (time_msec() - raft->election_won));
|
||
}
|
||
ds_put_format(&s, "Election timer: %"PRIu64, raft->election_timer);
|
||
if (raft->role == RAFT_LEADER && raft->election_timer_new) {
|
||
ds_put_format(&s, " (changing to %"PRIu64")",
|
||
raft->election_timer_new);
|
||
}
|
||
ds_put_char(&s, '\n');
|
||
|
||
ds_put_format(&s, "Log: [%"PRIu64", %"PRIu64"]\n",
|
||
raft->log_start, raft->log_end);
|
||
|
||
uint64_t n_uncommitted = raft->log_end - raft->commit_index - 1;
|
||
ds_put_format(&s, "Entries not yet committed: %"PRIu64"\n", n_uncommitted);
|
||
|
||
uint64_t n_unapplied = raft->log_end - raft->last_applied - 1;
|
||
ds_put_format(&s, "Entries not yet applied: %"PRIu64"\n", n_unapplied);
|
||
|
||
const struct raft_conn *c;
|
||
ds_put_cstr(&s, "Connections:");
|
||
LIST_FOR_EACH (c, list_node, &raft->conns) {
|
||
bool connected = jsonrpc_session_is_connected(c->js);
|
||
ds_put_format(&s, " %s%s%s%s",
|
||
connected ? "" : "(",
|
||
c->incoming ? "<-" : "->", c->nickname,
|
||
connected ? "" : ")");
|
||
}
|
||
ds_put_char(&s, '\n');
|
||
|
||
ds_put_format(&s, "Disconnections: %u\n", raft->n_disconnections);
|
||
|
||
ds_put_cstr(&s, "Servers:\n");
|
||
struct raft_server *server;
|
||
HMAP_FOR_EACH (server, hmap_node, &raft->servers) {
|
||
ds_put_format(&s, " %s ("SID_FMT" at %s)",
|
||
server->nickname,
|
||
SID_ARGS(&server->sid), server->address);
|
||
if (uuid_equals(&server->sid, &raft->sid)) {
|
||
ds_put_cstr(&s, " (self)");
|
||
}
|
||
if (server->phase != RAFT_PHASE_STABLE) {
|
||
ds_put_format (&s, " (%s)",
|
||
raft_server_phase_to_string(server->phase));
|
||
}
|
||
if (raft->role == RAFT_CANDIDATE) {
|
||
if (!uuid_is_zero(&server->vote)) {
|
||
char buf[SID_LEN + 1];
|
||
ds_put_format(&s, " (voted for %s)",
|
||
raft_get_nickname(raft, &server->vote,
|
||
buf, sizeof buf));
|
||
}
|
||
} else if (raft->role == RAFT_LEADER) {
|
||
ds_put_format(&s, " next_index=%"PRIu64" match_index=%"PRIu64,
|
||
server->next_index, server->match_index);
|
||
}
|
||
if (server->last_msg_ts) {
|
||
ds_put_format(&s, " last msg %"PRIu64" ms ago",
|
||
(uint64_t) (time_msec() - server->last_msg_ts));
|
||
}
|
||
ds_put_char(&s, '\n');
|
||
}
|
||
|
||
unixctl_command_reply(conn, ds_cstr(&s));
|
||
ds_destroy(&s);
|
||
}
|
||
|
||
static void
|
||
raft_unixctl_leave__(struct unixctl_conn *conn, struct raft *raft)
|
||
{
|
||
if (raft_is_leaving(raft)) {
|
||
unixctl_command_reply_error(conn,
|
||
"already in progress leaving cluster");
|
||
} else if (raft_is_joining(raft)) {
|
||
unixctl_command_reply_error(conn,
|
||
"can't leave while join in progress");
|
||
} else if (raft_failed(raft)) {
|
||
unixctl_command_reply_error(conn,
|
||
"can't leave after failure");
|
||
} else {
|
||
raft_leave(raft);
|
||
unixctl_command_reply(conn, NULL);
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_unixctl_leave(struct unixctl_conn *conn, int argc OVS_UNUSED,
|
||
const char *argv[], void *aux OVS_UNUSED)
|
||
{
|
||
struct raft *raft = raft_lookup_by_name(argv[1]);
|
||
if (!raft) {
|
||
unixctl_command_reply_error(conn, "unknown cluster");
|
||
return;
|
||
}
|
||
|
||
raft_unixctl_leave__(conn, raft);
|
||
}
|
||
|
||
static struct raft_server *
|
||
raft_lookup_server_best_match(struct raft *raft, const char *id)
|
||
{
|
||
struct raft_server *best = NULL;
|
||
int best_score = -1;
|
||
int n_best = 0;
|
||
|
||
struct raft_server *s;
|
||
HMAP_FOR_EACH (s, hmap_node, &raft->servers) {
|
||
int score = (!strcmp(id, s->address)
|
||
? INT_MAX
|
||
: uuid_is_partial_match(&s->sid, id));
|
||
if (score > best_score) {
|
||
best = s;
|
||
best_score = score;
|
||
n_best = 1;
|
||
} else if (score == best_score) {
|
||
n_best++;
|
||
}
|
||
}
|
||
return n_best == 1 ? best : NULL;
|
||
}
|
||
|
||
static void
|
||
raft_unixctl_kick(struct unixctl_conn *conn, int argc OVS_UNUSED,
|
||
const char *argv[], void *aux OVS_UNUSED)
|
||
{
|
||
const char *cluster_name = argv[1];
|
||
const char *server_name = argv[2];
|
||
|
||
struct raft *raft = raft_lookup_by_name(cluster_name);
|
||
if (!raft) {
|
||
unixctl_command_reply_error(conn, "unknown cluster");
|
||
return;
|
||
}
|
||
|
||
struct raft_server *server = raft_lookup_server_best_match(raft,
|
||
server_name);
|
||
if (!server) {
|
||
unixctl_command_reply_error(conn, "unknown server");
|
||
return;
|
||
}
|
||
|
||
if (uuid_equals(&server->sid, &raft->sid)) {
|
||
raft_unixctl_leave__(conn, raft);
|
||
} else if (raft->role == RAFT_LEADER) {
|
||
const struct raft_remove_server_request rq = {
|
||
.sid = server->sid,
|
||
.requester_conn = conn,
|
||
};
|
||
raft_handle_remove_server_request(raft, &rq);
|
||
} else {
|
||
const union raft_rpc rpc = {
|
||
.remove_server_request = {
|
||
.common = {
|
||
.type = RAFT_RPC_REMOVE_SERVER_REQUEST,
|
||
.sid = raft->leader_sid,
|
||
.comment = "via unixctl"
|
||
},
|
||
.sid = server->sid,
|
||
}
|
||
};
|
||
if (raft_send(raft, &rpc)) {
|
||
unixctl_command_reply(conn, "sent removal request to leader");
|
||
} else {
|
||
unixctl_command_reply_error(conn,
|
||
"failed to send removal request");
|
||
}
|
||
}
|
||
}
|
||
|
||
static void
|
||
raft_get_election_timer_from_log(struct raft *raft)
|
||
{
|
||
if (raft->snap.election_timer) {
|
||
raft->election_timer = raft->snap.election_timer;
|
||
}
|
||
for (uint64_t index = raft->commit_index; index >= raft->log_start;
|
||
index--) {
|
||
struct raft_entry *e = &raft->entries[index - raft->log_start];
|
||
if (e->election_timer) {
|
||
raft->election_timer = e->election_timer;
|
||
break;
|
||
}
|
||
}
|
||
raft_update_probe_intervals(raft);
|
||
}
|
||
|
||
static void
|
||
raft_log_election_timer(struct raft *raft)
|
||
{
|
||
raft_command_unref(raft_command_execute__(raft, NULL, NULL,
|
||
raft->election_timer_new, NULL,
|
||
NULL));
|
||
}
|
||
|
||
static void
|
||
raft_unixctl_change_election_timer(struct unixctl_conn *conn,
|
||
int argc OVS_UNUSED, const char *argv[],
|
||
void *aux OVS_UNUSED)
|
||
{
|
||
const char *cluster_name = argv[1];
|
||
const char *election_timer_str = argv[2];
|
||
|
||
struct raft *raft = raft_lookup_by_name(cluster_name);
|
||
if (!raft) {
|
||
unixctl_command_reply_error(conn, "unknown cluster");
|
||
return;
|
||
}
|
||
|
||
if (raft->role != RAFT_LEADER) {
|
||
unixctl_command_reply_error(conn, "election timer must be changed"
|
||
" through leader.");
|
||
return;
|
||
}
|
||
|
||
/* If there are pending changes for election timer, reject it. */
|
||
if (raft->election_timer_new) {
|
||
unixctl_command_reply_error(conn, "election timer change pending.");
|
||
return;
|
||
}
|
||
|
||
uint64_t election_timer = atoll(election_timer_str);
|
||
if (election_timer == raft->election_timer) {
|
||
unixctl_command_reply(conn, "change election timer to current value.");
|
||
return;
|
||
}
|
||
|
||
/* Election timer smaller than 100ms or bigger than 10min doesn't make
|
||
* sense. */
|
||
if (election_timer < 100 || election_timer > 600000) {
|
||
unixctl_command_reply_error(conn, "election timer must be between "
|
||
"100 and 600000, in msec.");
|
||
return;
|
||
}
|
||
|
||
/* If election timer is to be enlarged, it should be done gradually so that
|
||
* it won't cause timeout when new value is applied on leader but not yet
|
||
* applied on some of the followers. */
|
||
if (election_timer > raft->election_timer * 2) {
|
||
unixctl_command_reply_error(conn, "election timer increase should not "
|
||
"exceed the current value x 2.");
|
||
return;
|
||
}
|
||
|
||
raft->election_timer_new = election_timer;
|
||
raft_log_election_timer(raft);
|
||
unixctl_command_reply(conn, "change of election timer initiated.");
|
||
}
|
||
|
||
static void
|
||
raft_unixctl_set_backlog_threshold(struct unixctl_conn *conn,
|
||
int argc OVS_UNUSED, const char *argv[],
|
||
void *aux OVS_UNUSED)
|
||
{
|
||
const char *cluster_name = argv[1];
|
||
unsigned long long n_msgs, n_bytes;
|
||
struct raft_conn *r_conn;
|
||
|
||
struct raft *raft = raft_lookup_by_name(cluster_name);
|
||
if (!raft) {
|
||
unixctl_command_reply_error(conn, "unknown cluster");
|
||
return;
|
||
}
|
||
|
||
if (!str_to_ullong(argv[2], 10, &n_msgs)
|
||
|| !str_to_ullong(argv[3], 10, &n_bytes)) {
|
||
unixctl_command_reply_error(conn, "invalid argument");
|
||
return;
|
||
}
|
||
|
||
if (n_msgs < 50 || n_msgs > SIZE_MAX || n_bytes > SIZE_MAX) {
|
||
unixctl_command_reply_error(conn, "values out of range");
|
||
return;
|
||
}
|
||
|
||
raft->conn_backlog_max_n_msgs = n_msgs;
|
||
raft->conn_backlog_max_n_bytes = n_bytes;
|
||
|
||
LIST_FOR_EACH (r_conn, list_node, &raft->conns) {
|
||
jsonrpc_session_set_backlog_threshold(r_conn->js, n_msgs, n_bytes);
|
||
}
|
||
|
||
unixctl_command_reply(conn, NULL);
|
||
}
|
||
|
||
static void
|
||
raft_unixctl_failure_test(struct unixctl_conn *conn OVS_UNUSED,
|
||
int argc OVS_UNUSED, const char *argv[],
|
||
void *aux OVS_UNUSED)
|
||
{
|
||
const char *test = argv[1];
|
||
if (!strcmp(test, "crash-before-sending-append-request")) {
|
||
failure_test = FT_CRASH_BEFORE_SEND_APPEND_REQ;
|
||
} else if (!strcmp(test, "crash-after-sending-append-request")) {
|
||
failure_test = FT_CRASH_AFTER_SEND_APPEND_REQ;
|
||
} else if (!strcmp(test, "crash-before-sending-execute-command-reply")) {
|
||
failure_test = FT_CRASH_BEFORE_SEND_EXEC_REP;
|
||
} else if (!strcmp(test, "crash-after-sending-execute-command-reply")) {
|
||
failure_test = FT_CRASH_AFTER_SEND_EXEC_REP;
|
||
} else if (!strcmp(test, "crash-before-sending-execute-command-request")) {
|
||
failure_test = FT_CRASH_BEFORE_SEND_EXEC_REQ;
|
||
} else if (!strcmp(test, "crash-after-sending-execute-command-request")) {
|
||
failure_test = FT_CRASH_AFTER_SEND_EXEC_REQ;
|
||
} else if (!strcmp(test, "crash-after-receiving-append-request-update")) {
|
||
failure_test = FT_CRASH_AFTER_RECV_APPEND_REQ_UPDATE;
|
||
} else if (!strcmp(test, "crash-before-sending-install-snapshot-reply")) {
|
||
failure_test = FT_CRASH_BEFORE_SEND_SNAPSHOT_REP;
|
||
} else if (!strcmp(test, "delay-election")) {
|
||
failure_test = FT_DELAY_ELECTION;
|
||
struct raft *raft;
|
||
HMAP_FOR_EACH (raft, hmap_node, &all_rafts) {
|
||
if (raft->role == RAFT_FOLLOWER) {
|
||
raft_reset_election_timer(raft);
|
||
}
|
||
}
|
||
} else if (!strcmp(test, "dont-send-vote-request")) {
|
||
failure_test = FT_DONT_SEND_VOTE_REQUEST;
|
||
} else if (!strcmp(test, "stop-raft-rpc")) {
|
||
failure_test = FT_STOP_RAFT_RPC;
|
||
} else if (!strcmp(test,
|
||
"transfer-leadership-after-sending-append-request")) {
|
||
failure_test = FT_TRANSFER_LEADERSHIP_AFTER_SEND_APPEND_REQ;
|
||
} else if (!strcmp(test, "transfer-leadership-after-starting-to-add")) {
|
||
failure_test = FT_TRANSFER_LEADERSHIP_AFTER_STARTING_TO_ADD;
|
||
} else if (!strcmp(test, "transfer-leadership")) {
|
||
failure_test = FT_TRANSFER_LEADERSHIP;
|
||
} else if (!strcmp(test, "clear")) {
|
||
failure_test = FT_NO_TEST;
|
||
unixctl_command_reply(conn, "test dismissed");
|
||
return;
|
||
} else {
|
||
unixctl_command_reply_error(conn, "unknown test scenario");
|
||
return;
|
||
}
|
||
unixctl_command_reply(conn, "test engaged");
|
||
}
|
||
|
||
static void
|
||
raft_init(void)
|
||
{
|
||
static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
|
||
if (!ovsthread_once_start(&once)) {
|
||
return;
|
||
}
|
||
unixctl_command_register("cluster/cid", "DB", 1, 1,
|
||
raft_unixctl_cid, NULL);
|
||
unixctl_command_register("cluster/sid", "DB", 1, 1,
|
||
raft_unixctl_sid, NULL);
|
||
unixctl_command_register("cluster/status", "DB", 1, 1,
|
||
raft_unixctl_status, NULL);
|
||
unixctl_command_register("cluster/leave", "DB", 1, 1,
|
||
raft_unixctl_leave, NULL);
|
||
unixctl_command_register("cluster/kick", "DB SERVER", 2, 2,
|
||
raft_unixctl_kick, NULL);
|
||
unixctl_command_register("cluster/change-election-timer", "DB TIME", 2, 2,
|
||
raft_unixctl_change_election_timer, NULL);
|
||
unixctl_command_register("cluster/set-backlog-threshold",
|
||
"DB N_MSGS N_BYTES", 3, 3,
|
||
raft_unixctl_set_backlog_threshold, NULL);
|
||
unixctl_command_register("cluster/failure-test", "FAILURE SCENARIO", 1, 1,
|
||
raft_unixctl_failure_test, NULL);
|
||
ovsthread_once_done(&once);
|
||
}
|