2
0
mirror of https://github.com/checkpoint-restore/criu synced 2025-08-31 06:15:24 +00:00

remote: Minor improvements on img-remote.c

This commit is contained in:
Rodrigo Bruno
2018-05-14 01:29:50 +01:00
committed by Andrei Vagin
parent cb5f93675a
commit cc4cc0750c

View File

@@ -44,14 +44,26 @@ LIST_HEAD(rop_forwarding);
// List of snapshots (useful when doing incremental restores/dumps
LIST_HEAD(snapshot_head);
// Snapshot id (setup at launch time by dump or restore).
static char *snapshot_id;
bool restoring = true; // TODO - check where this is used!
// TODO - split this into two vars, recv_from_proxy, send_to_cache
bool forwarding = false; // TODO - true if proxy_to_cache_fd is being used.
// True if restoring (cache := true; proxy := false).
bool restoring = true;
// True if the proxy to cache socket is being used (receiving or sending).
bool forwarding = false;
// True if the local dump or restore is finished.
bool finished_local = false;
// True if the communication between the proxy and cache can be closed.
bool finished_remote = false;
// Proxy to cache socket fd; Local dump or restore servicing fd.
int proxy_to_cache_fd;
int local_req_fd;
// Epoll fd and event array.
int epoll_fd;
struct epoll_event *events;
@@ -457,7 +469,7 @@ static struct rimage *clear_remote_image(struct rimage *rimg)
return rimg;
}
void handle_accept_write(
struct roperation* handle_accept_write(
int cli_fd, char* snapshot_id, char* path, int flags, bool close_fd, uint64_t size)
{
struct roperation *rop = NULL;
@@ -484,25 +496,24 @@ void handle_accept_write(
rop_set_rimg(rop, rimg);
rop->size = size;
list_add_tail(&(rop->l), &rop_inprogress);
event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLIN, rop);
return;
return rop;
err:
free(rimg);
free(rop);
return NULL;
}
void handle_accept_proxy_write(
struct roperation* handle_accept_proxy_write(
int cli_fd, char* snapshot_id, char* path, int flags)
{
handle_accept_write(cli_fd, snapshot_id, path, flags, true, 0);
return handle_accept_write(cli_fd, snapshot_id, path, flags, true, 0);
}
void handle_accept_proxy_read(
struct roperation* handle_accept_proxy_read(
int cli_fd, char* snapshot_id, char* path, int flags)
{
struct roperation *rop = NULL;
struct rimage *rimg = NULL;
struct rimage *rimg = NULL;
rimg = get_rimg_by_name(snapshot_id, path);
@@ -513,40 +524,40 @@ void handle_accept_proxy_read(
pr_perror("Error writing reply header for unexisting image");
goto err;
}
close(cli_fd);
return NULL;
}
else {
if (write_reply_header(cli_fd, 0) < 0) {
pr_perror("Error writing reply header for %s:%s",
path, snapshot_id);
goto err;
}
rop = new_remote_operation(path, snapshot_id, cli_fd, flags, true);
if (rop == NULL) {
pr_perror("Error preparing remote operation");
goto err;
}
rop_set_rimg(rop, rimg);
list_add_tail(&(rop->l), &rop_inprogress);
event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLOUT, rop);
if (write_reply_header(cli_fd, 0) < 0) {
pr_perror("Error writing reply header for %s:%s",
path, snapshot_id);
goto err;
}
return;
rop = new_remote_operation(path, snapshot_id, cli_fd, flags, true);
if (rop == NULL) {
pr_perror("Error preparing remote operation");
goto err;
}
rop_set_rimg(rop, rimg);
return rop;
err:
close(cli_fd);
return NULL;
}
void finish_local()
{
int ret;
finished_local = true;
//shutdown(local_req_fd, SHUT_RD); //TODO - should this be removed?
ret = event_set(epoll_fd, EPOLL_CTL_DEL, local_req_fd, 0, 0);
if (ret) {
pr_perror("Failed to del local fd from epoll");
}
}
void handle_accept_cache_read(
struct roperation* handle_accept_cache_read(
int cli_fd, char* snapshot_id, char* path, int flags)
{
struct rimage *rimg = NULL;
@@ -556,14 +567,14 @@ void handle_accept_cache_read(
if (!strncmp(path, RESTORE_FINISH, sizeof(RESTORE_FINISH))) {
close(cli_fd);
finish_local();
return;
return NULL;
}
rop = new_remote_operation(path, snapshot_id, cli_fd, flags, true);
if (rop == NULL) {
pr_perror("Error preparing remote operation");
close(cli_fd);
return;
return NULL;
}
// Check if we already have the image.
@@ -576,28 +587,25 @@ void handle_accept_cache_read(
close(rop->fd);
}
rop_set_rimg(rop, rimg);
list_add_tail(&(rop->l), &rop_inprogress);
event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLOUT, rop);
}
// The file may exist in future.
else if (!finished_remote){
list_add_tail(&(rop->l), &rop_pending);
return rop;
}
// The file does not exist.
else {
else if (finished_remote) {
pr_info("No image %s:%s.\n", path, snapshot_id);
if (write_reply_header(cli_fd, ENOENT) < 0)
pr_perror("Error writing reply header for unexisting image");
free(rop);
close(cli_fd);
}
return NULL;
}
void forward_remote_image(struct roperation* rop)
{
uint64_t ret = 0;
// Set blocking during the setup.
// socket_set_blocking(rop->fd); // TODO - test
socket_set_blocking(rop->fd);
ret = write_remote_header(
rop->fd, rop->snapshot_id, rop->path, rop->flags, rop->size);
@@ -616,7 +624,7 @@ void forward_remote_image(struct roperation* rop)
// Go back to non-blocking
// socket_set_non_blocking(rop->fd); // TODO - test
socket_set_non_blocking(rop->fd);
forwarding = true;
event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLOUT, rop);
@@ -629,9 +637,10 @@ void handle_remote_accept(int fd)
int flags;
uint64_t size = 0;
uint64_t ret;
struct roperation* rop = NULL;
// Set blocking during the setup.
// socket_set_blocking(fd); // TODO - test!
socket_set_blocking(fd);
ret = read_remote_header(fd, snapshot_id, path, &flags, &size);
if (ret < 0) {
@@ -646,7 +655,7 @@ void handle_remote_accept(int fd)
}
// Go back to non-blocking
// socket_set_non_blocking(fd); // TODO - test!
socket_set_non_blocking(fd);
pr_info("[fd=%d] Received %s request for %s:%s with %lu bytes\n",
fd,
@@ -656,8 +665,13 @@ void handle_remote_accept(int fd)
forwarding = true;
handle_accept_write(fd, snapshot_id, path, flags, false, size);
return;
rop = handle_accept_write(fd, snapshot_id, path, flags, false, size);
if (rop != NULL) {
list_add_tail(&(rop->l), &rop_inprogress);
event_set(epoll_fd, EPOLL_CTL_ADD, rop->fd, EPOLLIN, rop);
}
return;
err:
close(fd);
}
@@ -670,6 +684,7 @@ void handle_local_accept(int fd)
int flags = 0;
struct sockaddr_in cli_addr;
socklen_t clilen = sizeof(cli_addr);
struct roperation *rop = NULL;
cli_fd = accept(fd, (struct sockaddr *) &cli_addr, &clilen);
if (cli_fd < 0) {
@@ -690,19 +705,32 @@ void handle_local_accept(int fd)
// Write/Append case (only possible in img-proxy).
if (flags != O_RDONLY) {
handle_accept_proxy_write(cli_fd, snapshot_id, path, flags);
rop = handle_accept_proxy_write(cli_fd, snapshot_id, path, flags);
}
// Read case while restoring (img-cache).
else if (restoring) {
handle_accept_cache_read(cli_fd, snapshot_id, path, flags);
rop = handle_accept_cache_read(cli_fd, snapshot_id, path, flags);
}
// Read case while dumping (img-proxy).
else {
handle_accept_proxy_read(cli_fd, snapshot_id, path, flags);
rop = handle_accept_proxy_read(cli_fd, snapshot_id, path, flags);
}
// Set socket non-blocking.
socket_set_non_blocking(cli_fd);
// If we have an operation. Check if we are ready to start or not.
if (rop != NULL) {
if (rop->rimg != NULL) {
list_add_tail(&(rop->l), &rop_inprogress);
event_set(
epoll_fd,
EPOLL_CTL_ADD,
rop->fd,
rop->flags == O_RDONLY ? EPOLLOUT : EPOLLIN,
rop);
} else {
list_add_tail(&(rop->l), &rop_pending);
}
socket_set_non_blocking(rop->fd);
}
return;
err:
@@ -760,7 +788,6 @@ void finish_cache_write(struct roperation* rop)
// Add image to list of images.
list_add_tail(&(rop->rimg->l), &rimg_head);
// TODO - what if we have multiple requests for the same name?
if (prop != NULL) {
pr_info("\t[fd=%d] Resuming pending %s for %s:%s\n",
prop->fd,
@@ -949,31 +976,17 @@ void accept_image_connections() {
// If both local and remote sockets are closed, leave.
if (finished_local && finished_remote) {
pr_info("\tFinished both local and remote, exiting\n");
pr_info("Finished both local and remote, exiting\n");
goto end;
}
}
end:
// TODO - release pending when no receiving and finished.
close(epoll_fd);
close(local_req_fd);
free(events);
}
int64_t recv_image(int fd, struct rimage *rimg, uint64_t size, int flags, bool close_fd)
{
int ret;
struct roperation *op = new_remote_operation(
rimg->path, rimg->snapshot_id, fd, flags, close_fd);
rop_set_rimg(op, rimg);
while ((ret = recv_image_async(op)) < 0)
if (ret != EAGAIN && ret != EWOULDBLOCK)
return -1;
free(op);
return ret;
}
/* Note: size is a limit on how much we want to read from the socket. Zero means
* read until the socket is closed.
*/
@@ -1028,18 +1041,6 @@ int64_t recv_image_async(struct roperation *op)
return n;
}
int64_t send_image(int fd, struct rimage *rimg, int flags, bool close_fd)
{
int ret;
struct roperation *op = new_remote_operation(
rimg->path, rimg->snapshot_id, fd, flags, close_fd);
rop_set_rimg(op, rimg);
while ((ret = send_image_async(op)) < 0)
if (ret != EAGAIN && ret != EWOULDBLOCK)
return -1;
return ret;
}
int64_t send_image_async(struct roperation *op)
{
int fd = op->fd;
@@ -1060,7 +1061,6 @@ int64_t send_image_async(struct roperation *op)
op->curr_sent_bytes = 0;
return n;
}
// TODO - cloudn't we just compare to the img size?
else if (op->curr_sent_bytes == op->curr_sent_buf->nbytes) {
if (close_fd)
close(fd);
@@ -1068,7 +1068,6 @@ int64_t send_image_async(struct roperation *op)
}
return n;
}
// TODO - clouldn't these checks be made upstream?
else if (errno == EPIPE || errno == ECONNRESET) {
pr_warn("Connection for %s:%s was closed early than expected\n",
rimg->path, rimg->snapshot_id);