This code implements VM transaction protocol. Like buffered_file, it sits between savevm and migration layer. With this architecture, VM transaction protocol is implemented mostly independent from other existing code. Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@xxxxxxxxxxxxx> Signed-off-by: OHMURA Kei <ohmura.kei@xxxxxxxxxxxxx> --- Makefile.objs | 1 + ft_trans_file.c | 626 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ ft_trans_file.h | 72 +++++++ migration.c | 3 + trace-events | 16 ++ 5 files changed, 718 insertions(+), 0 deletions(-) create mode 100644 ft_trans_file.c create mode 100644 ft_trans_file.h diff --git a/Makefile.objs b/Makefile.objs index c3e52c5..de38579 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -100,6 +100,7 @@ common-obj-y += msmouse.o ps2.o common-obj-y += qdev.o qdev-properties.o common-obj-y += block-migration.o common-obj-y += pflib.o +common-obj-y += ft_trans_file.o common-obj-$(CONFIG_BRLAPI) += baum.o common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o migration-fd.o diff --git a/ft_trans_file.c b/ft_trans_file.c new file mode 100644 index 0000000..2672d40 --- /dev/null +++ b/ft_trans_file.c @@ -0,0 +1,626 @@ +/* + * Fault tolerant VM transaction QEMUFile + * + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * This source code is based on buffered_file.c. + * Copyright IBM, Corp. 2008 + * Authors: + * Anthony Liguori <aliguori@xxxxxxxxxx> + */ + +#include "qemu-common.h" +#include "qemu-error.h" +#include "hw/hw.h" +#include "qemu-timer.h" +#include "sysemu.h" +#include "qemu-char.h" +#include "trace.h" +#include "ft_trans_file.h" + +// #define DEBUG_FT_TRANSACTION + +typedef struct FtTransHdr +{ + uint16_t cmd; + uint16_t id; + uint32_t seq; + uint32_t payload_len; +} FtTransHdr; + +typedef struct QEMUFileFtTrans +{ + FtTransPutBufferFunc *put_buffer; + FtTransGetBufferFunc *get_buffer; + FtTransPutReadyFunc *put_ready; + FtTransGetReadyFunc *get_ready; + FtTransWaitForUnfreezeFunc *wait_for_unfreeze; + FtTransCloseFunc *close; + void *opaque; + QEMUFile *file; + + enum QEMU_VM_TRANSACTION_STATE state; + uint32_t seq; + uint16_t id; + + int has_error; + + bool freeze_output; + bool freeze_input; + bool rate_limit; + bool is_sender; + bool is_payload; + + uint8_t *buf; + size_t buf_max_size; + size_t put_offset; + size_t get_offset; + + FtTransHdr header; + size_t header_offset; +} QEMUFileFtTrans; + +#define IO_BUF_SIZE 32768 + +static void ft_trans_append(QEMUFileFtTrans *s, + const uint8_t *buf, size_t size) +{ + if (size > (s->buf_max_size - s->put_offset)) { + trace_ft_trans_realloc(s->buf_max_size, size + 1024); + s->buf_max_size += size + 1024; + s->buf = qemu_realloc(s->buf, s->buf_max_size); + } + + trace_ft_trans_append(size); + memcpy(s->buf + s->put_offset, buf, size); + s->put_offset += size; +} + +static void ft_trans_flush(QEMUFileFtTrans *s) +{ + size_t offset = 0; + + if (s->has_error) { + error_report("flush when error %d, bailing\n", s->has_error); + return; + } + + while (offset < s->put_offset) { + ssize_t ret; + + ret = s->put_buffer(s->opaque, s->buf + offset, s->put_offset - offset); + if (ret == -EAGAIN) { + break; + } + + if (ret <= 0) { + error_report("error flushing data, %s\n", strerror(errno)); + s->has_error = FT_TRANS_ERR_FLUSH; + break; + } else { + offset += ret; + } + } + + trace_ft_trans_flush(offset, s->put_offset); + memmove(s->buf, s->buf + offset, s->put_offset - offset); + s->put_offset -= offset; + s->freeze_output = !!s->put_offset; +} + +static ssize_t ft_trans_put(void *opaque, void *buf, int size) +{ + QEMUFileFtTrans *s = opaque; + size_t offset = 0; + ssize_t len; + + /* flush buffered data before putting next */ + if (s->put_offset) { + ft_trans_flush(s); + } + + while (!s->freeze_output && offset < size) { + len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size - offset); + + if (len == -EAGAIN) { + trace_ft_trans_freeze_output(); + s->freeze_output = 1; + break; + } + + if (len <= 0) { + error_report("putting data failed, %s\n", strerror(errno)); + s->has_error = 1; + offset = -EINVAL; + break; + } + + offset += len; + } + + if (s->freeze_output) { + ft_trans_append(s, buf + offset, size - offset); + offset = size; + } + + return offset; +} + +static int ft_trans_send_header(QEMUFileFtTrans *s, + enum QEMU_VM_TRANSACTION_STATE state, + uint32_t payload_len) +{ + int ret; + FtTransHdr *hdr = &s->header; + + trace_ft_trans_send_header(state); + + hdr->cmd = s->state = state; + hdr->id = s->id; + hdr->seq = s->seq; + hdr->payload_len = payload_len; + + ret = ft_trans_put(s, hdr, sizeof(*hdr)); + if (ret < 0) { + error_report("send header failed\n"); + s->has_error = FT_TRANS_ERR_SEND_HDR; + } + + return ret; +} + +static int ft_trans_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size) +{ + QEMUFileFtTrans *s = opaque; + ssize_t ret; + + trace_ft_trans_put_buffer(size, pos); + + if (s->has_error) { + error_report("put_buffer when error %d, bailing\n", s->has_error); + return -EINVAL; + } + + /* assuming qemu_file_put_notify() is calling */ + if (pos == 0 && size == 0) { + trace_ft_trans_put_ready(); + ft_trans_flush(s); + + if (!s->freeze_output) { + trace_ft_trans_cb(s->put_ready); + ret = s->put_ready(); + } + + ret = 0; + goto out; + } + + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_CONTINUE, size); + if (ret < 0) { + goto out; + } + + ret = ft_trans_put(s, (uint8_t *)buf, size); + if (ret < 0) { + error_report("send palyload failed\n"); + s->has_error = FT_TRANS_ERR_SEND_PAYLOAD; + goto out; + } + + s->seq++; + +out: + return ret; +} + +static int ft_trans_fill_buffer(void *opaque, void *buf, int size) +{ + QEMUFileFtTrans *s = opaque; + size_t offset = 0; + ssize_t len; + + s->freeze_input = 0; + + while (offset < size) { + len = s->get_buffer(s->opaque, (uint8_t *)buf + offset, + 0, size - offset); + if (len == -EAGAIN) { + trace_ft_trans_freeze_input(); + s->freeze_input = 1; + break; + } + + if (len <= 0) { + error_report("fill buffer failed, %s\n", strerror(errno)); + s->has_error = 1; + return -EINVAL; + } + + offset += len; + } + + return offset; +} + +static int ft_trans_recv_header(QEMUFileFtTrans *s) +{ + int ret; + char *buf = (char *)&s->header + s->header_offset; + + ret = ft_trans_fill_buffer(s, buf, sizeof(FtTransHdr) - s->header_offset); + if (ret < 0) { + error_report("recv header failed\n"); + s->has_error = FT_TRANS_ERR_RECV_HDR; + goto out; + } + + s->header_offset += ret; + if (s->header_offset == sizeof(FtTransHdr)) { + trace_ft_trans_recv_header(s->header.cmd); + s->state = s->header.cmd; + s->header_offset = 0; + + if (!s->is_sender) { + s->id = s->header.id; + s->seq = s->header.seq; + } + } + +out: + return ret; +} + +static int ft_trans_recv_payload(QEMUFileFtTrans *s) +{ + QEMUFile *f = s->file; + int ret = -1; + + /* extend QEMUFile buf if there weren't enough space */ + if (s->header.payload_len > (s->buf_max_size - s->get_offset)) { + s->buf_max_size += (s->header.payload_len - + (s->buf_max_size - s->get_offset)); + s->buf = qemu_realloc_buffer(f, s->buf_max_size); + } + + ret = ft_trans_fill_buffer(s, s->buf + s->get_offset, + s->header.payload_len); + if (ret < 0) { + error_report("recv payload failed\n"); + s->has_error = FT_TRANS_ERR_RECV_PAYLOAD; + goto out; + } + + trace_ft_trans_recv_payload(ret, s->header.payload_len, s->get_offset); + + s->header.payload_len -= ret; + s->get_offset += ret; + s->is_payload = !!s->header.payload_len; + +out: + return ret; +} + +static int ft_trans_recv(QEMUFileFtTrans *s) +{ + int ret; + + /* get payload and return */ + if (s->is_payload) { + ret = ft_trans_recv_payload(s); + goto out; + } + + ret = ft_trans_recv_header(s); + if (ret < 0 || s->freeze_input) { + goto out; + } + + switch (s->state) { + case QEMU_VM_TRANSACTION_BEGIN: + /* CONTINUE or COMMIT should come shortly */ + s->is_payload = 0; + break; + + case QEMU_VM_TRANSACTION_CONTINUE: + /* get payload */ + s->is_payload = 1; + break; + + case QEMU_VM_TRANSACTION_COMMIT: + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); + if (ret < 0) { + goto out; + } + + trace_ft_trans_cb(s->get_ready); + if ((ret = s->get_ready(s->opaque)) < 0) { + goto out; + } + + qemu_clear_buffer(s->file); + s->get_offset = 0; + s->is_payload = 0; + + break; + + case QEMU_VM_TRANSACTION_ATOMIC: + /* not implemented yet */ + error_report("QEMU_VM_TRANSACTION_ATOMIC not implemented. %d\n", + ret); + break; + + case QEMU_VM_TRANSACTION_CANCEL: + /* return -EINVAL until migrate cancel on recevier side is supported */ + ret = -EINVAL; + break; + + default: + error_report("unknown QEMU_VM_TRANSACTION_STATE %d\n", ret); + s->has_error = FT_TRANS_ERR_STATE_INVALID; + ret = -EINVAL; + } + +out: + return ret; +} + +static int ft_trans_get_buffer(void *opaque, uint8_t *buf, + int64_t pos, int size) +{ + QEMUFileFtTrans *s = opaque; + int ret; + + if (s->has_error) { + error_report("get_buffer when error %d, bailing\n", s->has_error); + return -EINVAL; + } + + /* assuming qemu_file_get_notify() is calling */ + if (pos == 0 && size == 0) { + trace_ft_trans_get_ready(); + s->freeze_input = 0; + + /* sender should be waiting for ACK */ + if (s->is_sender) { + ret = ft_trans_recv_header(s); + if (s->freeze_input) { + ret = 0; + goto out; + } + if (ret < 0) { + error_report("recv ack failed\n"); + goto out; + } + + if (s->state != QEMU_VM_TRANSACTION_ACK) { + error_report("recv invalid state %d\n", s->state); + s->has_error = FT_TRANS_ERR_STATE_INVALID; + ret = -EINVAL; + goto out; + } + + trace_ft_trans_cb(s->get_ready); + ret = s->get_ready(s->opaque); + if (ret < 0) { + goto out; + } + + /* proceed trans id */ + s->id++; + + return 0; + } + + /* set QEMUFile buf at beginning */ + if (!s->buf) { + s->buf = buf; + } + + ret = ft_trans_recv(s); + goto out; + } + + ret = s->get_offset; + +out: + return ret; +} + +static int ft_trans_close(void *opaque) +{ + QEMUFileFtTrans *s = opaque; + int ret; + + trace_ft_trans_close(); + ret = s->close(s->opaque); + if (s->is_sender) { + qemu_free(s->buf); + } + qemu_free(s); + + return ret; +} + +static int ft_trans_rate_limit(void *opaque) +{ + QEMUFileFtTrans *s = opaque; + + if (s->has_error) { + return 0; + } + + if (s->rate_limit && s->freeze_output) { + return 1; + } + + return 0; +} + +static int64_t ft_trans_set_rate_limit(void *opaque, int64_t new_rate) +{ + QEMUFileFtTrans *s = opaque; + + if (s->has_error) { + goto out; + } + + s->rate_limit = !!new_rate; + +out: + return s->rate_limit; +} + +int ft_trans_begin(void *opaque) +{ + QEMUFileFtTrans *s = opaque; + int ret; + s->seq = 0; + + /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction */ + if (!s->is_sender) { + if (s->state != QEMU_VM_TRANSACTION_INIT) { + error_report("invalid state %d\n", s->state); + s->has_error = FT_TRANS_ERR_STATE_INVALID; + ret = -EINVAL; + } + + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); + goto out; + } + + /* sender waits for QEMU_VM_TRANSACTION_ACK to start transaction */ + if (s->state == QEMU_VM_TRANSACTION_INIT) { + retry: + ret = ft_trans_recv_header(s); + if (s->freeze_input) { + goto retry; + } + if (ret < 0) { + error_report("recv ack failed\n"); + goto out; + } + + if (s->state != QEMU_VM_TRANSACTION_ACK) { + error_report("recv invalid state %d\n", s->state); + s->has_error = FT_TRANS_ERR_STATE_INVALID; + ret = -EINVAL; + goto out; + } + } + + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_BEGIN, 0); + if (ret < 0) { + goto out; + } + + s->state = QEMU_VM_TRANSACTION_CONTINUE; + +out: + return ret; +} + +int ft_trans_commit(void *opaque) +{ + QEMUFileFtTrans *s = opaque; + int ret; + + if (!s->is_sender) { + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_ACK, 0); + goto out; + } + + /* sender should flush buf before sending COMMIT */ + qemu_fflush(s->file); + + ret = ft_trans_send_header(s, QEMU_VM_TRANSACTION_COMMIT, 0); + if (ret < 0) { + goto out; + } + + while (!s->has_error && s->put_offset) { + ft_trans_flush(s); + if (s->freeze_output) { + s->wait_for_unfreeze(s); + } + } + + if (s->has_error) { + ret = -EINVAL; + goto out; + } + + ret = ft_trans_recv_header(s); + if (s->freeze_input) { + ret = -EAGAIN; + goto out; + } + if (ret < 0) { + error_report("recv ack failed\n"); + goto out; + } + + if (s->state != QEMU_VM_TRANSACTION_ACK) { + error_report("recv invalid state %d\n", s->state); + s->has_error = FT_TRANS_ERR_STATE_INVALID; + ret = -EINVAL; + goto out; + } + + s->id++; + ret = 0; + +out: + return ret; +} + +int ft_trans_cancel(void *opaque) +{ + QEMUFileFtTrans *s = opaque; + + /* invalid until migrate cancel on recevier side is supported */ + if (!s->is_sender) { + return -EINVAL; + } + + return ft_trans_send_header(s, QEMU_VM_TRANSACTION_CANCEL, 0); +} + +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, + FtTransPutBufferFunc *put_buffer, + FtTransGetBufferFunc *get_buffer, + FtTransPutReadyFunc *put_ready, + FtTransGetReadyFunc *get_ready, + FtTransWaitForUnfreezeFunc *wait_for_unfreeze, + FtTransCloseFunc *close, + bool is_sender) +{ + QEMUFileFtTrans *s; + + s = qemu_mallocz(sizeof(*s)); + + s->opaque = opaque; + s->put_buffer = put_buffer; + s->get_buffer = get_buffer; + s->put_ready = put_ready; + s->get_ready = get_ready; + s->wait_for_unfreeze = wait_for_unfreeze; + s->close = close; + s->is_sender = is_sender; + s->id = 0; + s->seq = 0; + s->rate_limit = 1; + + if (!s->is_sender) { + s->buf_max_size = IO_BUF_SIZE; + } + + + s->file = qemu_fopen_ops(s, ft_trans_put_buffer, ft_trans_get_buffer, + ft_trans_close, ft_trans_rate_limit, + ft_trans_set_rate_limit, NULL); + + return s->file; +} diff --git a/ft_trans_file.h b/ft_trans_file.h new file mode 100644 index 0000000..d7e221b --- /dev/null +++ b/ft_trans_file.h @@ -0,0 +1,72 @@ +/* + * Fault tolerant VM transaction QEMUFile + * + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * This source code is based on buffered_file.h. + * Copyright IBM, Corp. 2008 + * Authors: + * Anthony Liguori <aliguori@xxxxxxxxxx> + */ + +#ifndef QEMU_FT_TRANSACTION_FILE_H +#define QEMU_FT_TRANSACTION_FILE_H + +#include "hw/hw.h" + +enum QEMU_VM_TRANSACTION_STATE { + QEMU_VM_TRANSACTION_NACK = -1, + QEMU_VM_TRANSACTION_INIT, + QEMU_VM_TRANSACTION_BEGIN, + QEMU_VM_TRANSACTION_CONTINUE, + QEMU_VM_TRANSACTION_COMMIT, + QEMU_VM_TRANSACTION_CANCEL, + QEMU_VM_TRANSACTION_ATOMIC, + QEMU_VM_TRANSACTION_ACK, +}; + +enum FT_MODE { + FT_ERROR = -1, + FT_OFF, + FT_INIT, + FT_TRANSACTION_BEGIN, + FT_TRANSACTION_ITER, + FT_TRANSACTION_COMMIT, + FT_TRANSACTION_ATOMIC, + FT_TRANSACTION_RECV, +}; +extern enum FT_MODE ft_mode; + +#define FT_TRANS_ERR_UNKNOWN 0x01 /* Unknown error */ +#define FT_TRANS_ERR_SEND_HDR 0x02 /* Send header failed */ +#define FT_TRANS_ERR_RECV_HDR 0x03 /* Recv header failed */ +#define FT_TRANS_ERR_SEND_PAYLOAD 0x04 /* Send payload failed */ +#define FT_TRANS_ERR_RECV_PAYLOAD 0x05 /* Recv payload failed */ +#define FT_TRANS_ERR_FLUSH 0x06 /* Flush buffered data failed */ +#define FT_TRANS_ERR_STATE_INVALID 0x07 /* Invalid state */ + +typedef ssize_t (FtTransPutBufferFunc)(void *opaque, const void *data, size_t size); +typedef int (FtTransGetBufferFunc)(void *opaque, uint8_t *buf, int64_t pos, size_t size); +typedef ssize_t (FtTransPutVectorFunc)(void *opaque, const struct iovec *iov, int iovcnt); +typedef int (FtTransPutReadyFunc)(void); +typedef int (FtTransGetReadyFunc)(void *opaque); +typedef void (FtTransWaitForUnfreezeFunc)(void *opaque); +typedef int (FtTransCloseFunc)(void *opaque); + +int ft_trans_begin(void *opaque); +int ft_trans_commit(void *opaque); +int ft_trans_cancel(void *opaque); + +QEMUFile *qemu_fopen_ops_ft_trans(void *opaque, + FtTransPutBufferFunc *put_buffer, + FtTransGetBufferFunc *get_buffer, + FtTransPutReadyFunc *put_ready, + FtTransGetReadyFunc *get_ready, + FtTransWaitForUnfreezeFunc *wait_for_unfreeze, + FtTransCloseFunc *close, + bool is_sender); + +#endif diff --git a/migration.c b/migration.c index a400199..01b723d 100644 --- a/migration.c +++ b/migration.c @@ -15,6 +15,7 @@ #include "migration.h" #include "monitor.h" #include "buffered_file.h" +#include "ft_trans_file.h" #include "sysemu.h" #include "block.h" #include "qemu_socket.h" @@ -31,6 +32,8 @@ do { } while (0) #endif +enum FT_MODE ft_mode = FT_OFF; + /* Migration speed throttling */ static int64_t max_throttle = (32 << 20); diff --git a/trace-events b/trace-events index e8fed0f..b8c6012 100644 --- a/trace-events +++ b/trace-events @@ -213,3 +213,19 @@ disable qed_aio_write_data(void *s, void *acb, int ret, uint64_t offset, size_t disable qed_aio_write_prefill(void *s, void *acb, uint64_t start, size_t len, uint64_t offset) "s %p acb %p start %"PRIu64" len %zu offset %"PRIu64"" disable qed_aio_write_postfill(void *s, void *acb, uint64_t start, size_t len, uint64_t offset) "s %p acb %p start %"PRIu64" len %zu offset %"PRIu64"" disable qed_aio_write_main(void *s, void *acb, int ret, uint64_t offset, size_t len) "s %p acb %p ret %d offset %"PRIu64" len %zu" + +# ft_trans_file.c +disable ft_trans_realloc(size_t old_size, size_t new_size) "increasing buffer from %zu by %zu" +disable ft_trans_append(size_t size) "buffering %zu bytes" +disable ft_trans_flush(size_t size, size_t req) "flushed %zu of %zu bytes" +disable ft_trans_send_header(uint16_t cmd) "send header %d" +disable ft_trans_recv_header(uint16_t cmd) "recv header %d" +disable ft_trans_put_buffer(size_t size, int64_t pos) "putting %d bytes at %"PRId64"" +disable ft_trans_recv_payload(size_t len, uint32_t hdr, size_t total) "recv %d of %d total %d" +disable ft_trans_close(void) "closing" +disable ft_trans_freeze_output(void) "backend not ready, freezing output" +disable ft_trans_freeze_input(void) "backend not ready, freezing input" +disable ft_trans_put_ready(void) "file is ready to put" +disable ft_trans_get_ready(void) "file is ready to get" +disable ft_trans_cb(void *cb) "callback %p" + -- 1.7.1.2 -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html