This code implements VM transaction protocol. Like buffered_file, it sits between savevm and migration layer. With this architecture, VM transaction protocol is implemented mostly independent from other existing code. Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@xxxxxxxxxxxxx> Signed-off-by: OHMURA Kei <ohmura.kei@xxxxxxxxxxxxx> --- Makefile.objs | 1 + ft_transaction.c | 423 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ ft_transaction.h | 57 ++++++++ migration.c | 3 + 4 files changed, 484 insertions(+), 0 deletions(-) create mode 100644 ft_transaction.c create mode 100644 ft_transaction.h diff --git a/Makefile.objs b/Makefile.objs index b73e2cb..4388fb3 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -78,6 +78,7 @@ common-obj-y += qemu-char.o savevm.o #aio.o common-obj-y += msmouse.o ps2.o common-obj-y += qdev.o qdev-properties.o common-obj-y += qemu-config.o block-migration.o +common-obj-y += ft_transaction.o common-obj-$(CONFIG_BRLAPI) += baum.o common-obj-$(CONFIG_POSIX) += migration-exec.o migration-unix.o migration-fd.o diff --git a/ft_transaction.c b/ft_transaction.c new file mode 100644 index 0000000..d0cbc99 --- /dev/null +++ b/ft_transaction.c @@ -0,0 +1,423 @@ +/* + * Fault tolerant VM transaction QEMUFile + * + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * This source code is based on buffered_file.c. + * Copyright IBM, Corp. 2008 + * Authors: + * Anthony Liguori <aliguori@xxxxxxxxxx> + */ + +#include "qemu-common.h" +#include "hw/hw.h" +#include "qemu-timer.h" +#include "sysemu.h" +#include "qemu-char.h" +#include "ft_transaction.h" + +// #define DEBUG_FT_TRANSACTION + +typedef struct QEMUFileFtTranx +{ + FtTranxPutBufferFunc *put_buffer; + FtTranxPutVectorFunc *put_vector; + FtTranxGetBufferFunc *get_buffer; + FtTranxGetVectorFunc *get_vector; + FtTranxCloseFunc *close; + void *opaque; + QEMUFile *file; + int has_error; + int is_sender; + int buf_max_size; + enum QEMU_VM_TRANSACTION_STATE tranx_state; + uint16_t tranx_id; + uint32_t seq; +} QEMUFileFtTranx; + +#define IO_BUF_SIZE 32768 + +#ifdef DEBUG_FT_TRANSACTION +#define dprintf(fmt, ...) \ + do { printf("ft_transaction: " fmt, ## __VA_ARGS__); } while (0) +#else +#define dprintf(fmt, ...) \ + do { } while (0) +#endif + +static ssize_t ft_tranx_flush_buffer(void *opaque, void *buf, int size) +{ + QEMUFileFtTranx *s = opaque; + size_t offset = 0; + ssize_t len; + + while (offset < size) { + len = s->put_buffer(s->opaque, (uint8_t *)buf + offset, size - offset); + + if (len <= 0) { + fprintf(stderr, "ft transaction flush buffer failed \n"); + s->has_error = 1; + offset = -EINVAL; + break; + } + + offset += len; + } + + return offset; +} + +static int ft_tranx_send_header(QEMUFileFtTranx *s) +{ + int ret = -1; + + dprintf("send header %d\n", s->tranx_state); + + ret = ft_tranx_flush_buffer(s, &s->tranx_state, sizeof(uint16_t)); + if (ret < 0) { + goto out; + } + ret = ft_tranx_flush_buffer(s, &s->tranx_id, sizeof(uint16_t)); + +out: + return ret; +} + +static int ft_tranx_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size) +{ + QEMUFileFtTranx *s = opaque; + ssize_t ret = -1; + + if (s->has_error) { + fprintf(stderr, "flush when error, bailing\n"); + return -EINVAL; + } + + ret = ft_tranx_send_header(s); + if (ret < 0) { + goto out; + } + + ret = ft_tranx_flush_buffer(s, &s->seq, sizeof(s->seq)); + if (ret < 0) { + goto out; + } + s->seq++; + + ret = ft_tranx_flush_buffer(s, &size, sizeof(uint32_t)); + if (ret < 0) { + goto out; + } + + ret = ft_tranx_flush_buffer(s, (uint8_t *)buf, size); + +out: + return ret; +} + +static int ft_tranx_put_vector(void *opaque, struct iovec *vector, int64_t pos, int count) +{ + QEMUFileFtTranx *s = opaque; + ssize_t ret = -1; + int i; + uint32_t size = 0; + + dprintf("putting %d vectors at %" PRId64 "\n", count, pos); + + if (s->has_error) { + dprintf("put vector when error, bailing\n"); + return -EINVAL; + } + + ret = ft_tranx_send_header(s); + if (ret < 0) { + return ret; + } + + ret = ft_tranx_flush_buffer(s, &s->seq, sizeof(s->seq)); + if (ret < 0) { + return ret; + } + s->seq++; + + for (i = 0; i < count; i++) + size += vector[i].iov_len; + + ret = ft_tranx_flush_buffer(s, &size, sizeof(uint32_t)); + if (ret < 0) { + return ret; + } + + while (count > 0) { + /* + * It will continue calling put_vector even if count > IOV_MAX. + */ + ret = s->put_vector(s->opaque, vector, + ((count>IOV_MAX)?IOV_MAX:count)); + + if (ret <= 0) { + fprintf(stderr, "ft transaction putting vector\n"); + s->has_error = 1; + return ret; + } + + for (i = 0; i < count; i++) { + /* ret represents -(length of remaining data). */ + ret -= vector[i].iov_len; + if (ret < 0) { + vector[i].iov_base += (ret + vector[i].iov_len); + vector[i].iov_len = -ret; + vector = &vector[i]; + break; + } + } + count -= i; + } + + return 0; +} + +static inline int ft_tranx_fill_buffer(void *opaque, void *buf, int size) +{ + QEMUFileFtTranx *s = opaque; + size_t offset = 0; + ssize_t len; + + while (ft_mode != FT_ERROR && offset < size) { + len = s->get_buffer(s->opaque, (uint8_t *)buf + offset, + 0, size - offset); + if (len <= 0) { + fprintf(stderr, "ft_tranx fill buffer failed\n"); + s->has_error = 1; + return -EINVAL; + } + offset += len; + } + return 0; +} + +/* return QEMU_VM_TRANSACTION type */ +static int ft_tranx_get_next(QEMUFileFtTranx *s) +{ + uint16_t header; + uint16_t tranx_id; + + if ((ft_tranx_fill_buffer(s, &header, sizeof(header)) < 0) || + (ft_tranx_fill_buffer(s, &tranx_id, sizeof(tranx_id)) < 0)) { + return QEMU_VM_TRANSACTION_CANCEL; + } + + s->tranx_id = tranx_id; + + return header; +} + +static int ft_tranx_get_buffer(void *opaque, uint8_t *buf, + int64_t pos, int size) +{ + QEMUFileFtTranx *s = opaque; + QEMUFile *f = s->file; + uint32_t payload_len; + int ret = -1, offset; + + /* get transaction header*/ + ret = ft_tranx_get_next(s); + switch (ret) { + case QEMU_VM_TRANSACTION_BEGIN: + for (offset = 0;;) { + ret = ft_tranx_get_next(s); + /* CONTINUE or COMMIT must come afer BEGIN */ + if ((ret != QEMU_VM_TRANSACTION_CONTINUE) && + (ret != QEMU_VM_TRANSACTION_COMMIT)) { + goto error_out; + } + + if (ft_tranx_fill_buffer(s, &s->seq, sizeof(s->seq)) < 0) { + goto error_out; + } + + if (ret == QEMU_VM_TRANSACTION_COMMIT) { + ret = offset; + dprintf("QEMU_VM_TRANSACTION_COMMIT %d\n", offset); + break; + } + + if (ft_tranx_fill_buffer(s, &payload_len, + sizeof(payload_len)) < 0) { + goto error_out; + } + + /* Extend QEMUFile buf if there weren't enough space. */ + if (payload_len > (s->buf_max_size - offset)) { + s->buf_max_size += (payload_len - (s->buf_max_size - offset)); + buf = qemu_realloc_buffer(f, s->buf_max_size); + } + + if (ft_tranx_fill_buffer(s, buf + offset, payload_len) < 0) { + goto error_out; + } + offset += payload_len; + } + + s->tranx_state = QEMU_VM_TRANSACTION_ACK; + if (ft_tranx_send_header(s) < 0) { + goto error_out; + } + goto out; + + case QEMU_VM_TRANSACTION_ATOMIC: + /* not implemented yet */ + fprintf(stderr, "QEMU_VM_TRANSACTION_ATOMIC not implemented. %d\n", + ret); + goto error_out; + + case QEMU_VM_TRANSACTION_CANCEL: + dprintf("ft transaction canceled %d\n", ret); + ret = -1; + ft_mode = FT_OFF; + goto out; + + default: + fprintf(stderr, "unknown QEMU_VM_TRANSACTION_STATE %d\n", ret); + } + +error_out: + ret = -1; + ft_mode = FT_ERROR; +out: + return ret; +} + +static int ft_tranx_close(void *opaque) +{ + QEMUFileFtTranx *s = opaque; + int ret = -1; + + dprintf("closing\n"); + ret = s->close(s->opaque); + qemu_free(s); + + return ret; +} + +int qemu_ft_tranx_begin(void *opaque) +{ + QEMUFileFtTranx *s = opaque; + int ret = -1; + s->seq = 0; + + if (!s->is_sender && s->tranx_state == QEMU_VM_TRANSACTION_INIT) { + /* receiver sends QEMU_VM_TRANSACTION_ACK to start transaction. */ + s->tranx_state = QEMU_VM_TRANSACTION_ACK; + ret = ft_tranx_send_header(s); + goto out; + } + + if (s->is_sender) { + if (s->tranx_state == QEMU_VM_TRANSACTION_INIT) { + ret = ft_tranx_get_next(s); + if (ret != QEMU_VM_TRANSACTION_ACK) { + fprintf(stderr, "ft_transaction receiving ack failed\n"); + ret = -1; + goto out; + } + } + + s->tranx_state = QEMU_VM_TRANSACTION_BEGIN; + if ((ret = ft_tranx_send_header(s)) < 0) { + goto out; + } + + s->tranx_state = QEMU_VM_TRANSACTION_CONTINUE; + ret = 0; + } + +out: + return ret; +} + +int qemu_ft_tranx_commit(void *opaque) +{ + QEMUFileFtTranx *s = opaque; + int ret = -1; + + if (!s->is_sender) { + s->tranx_state = QEMU_VM_TRANSACTION_ACK; + ret = ft_tranx_send_header(s); + } else { + /* flush buf before sending COMMIT */ + qemu_fflush(s->file); + + s->tranx_state = QEMU_VM_TRANSACTION_COMMIT; + ret = ft_tranx_send_header(s); + if (ret < 0) { + return ret; + } + + ret = ft_tranx_flush_buffer(s, &s->seq, sizeof(s->seq)); + if (ret < 0) { + return ret; + } + + /* FIX ME: can we remove this if statement? */ + if (ret >= 0) { + ret = ft_tranx_get_next(s); + if (ret != QEMU_VM_TRANSACTION_ACK) { + fprintf(stderr, "ft_transaction receiving ack failed\n"); + return -1; + } + } + + s->tranx_id++; + } + + return ret; +} + +int qemu_ft_tranx_cancel(void *opaque) +{ + QEMUFileFtTranx *s = opaque; + int ret = -1; + + if (s->is_sender) { + s->tranx_state = QEMU_VM_TRANSACTION_CANCEL; + if ((ret = ft_tranx_send_header(s)) < 0) { + fprintf(stderr, "ft cancel failed\n"); + } + } + + return ret; +} + +QEMUFile *qemu_fopen_ops_ft_tranx(void *opaque, + FtTranxPutBufferFunc *put_buffer, + FtTranxPutVectorFunc *put_vector, + FtTranxGetBufferFunc *get_buffer, + FtTranxGetVectorFunc *get_vector, + FtTranxCloseFunc *close, + int is_sender) +{ + QEMUFileFtTranx *s; + + s = qemu_mallocz(sizeof(*s)); + + s->opaque = opaque; + s->put_buffer = put_buffer; + s->put_vector = put_vector; + s->get_buffer = get_buffer; + s->get_vector = get_vector; + s->close = close; + s->buf_max_size = IO_BUF_SIZE; + s->is_sender = is_sender; + s->tranx_id = 0; + s->seq = 0; + + s->file = qemu_fopen_ops(s, ft_tranx_put_buffer, ft_tranx_put_vector, + ft_tranx_get_buffer, NULL, ft_tranx_close, + NULL, NULL, NULL); + + return s->file; +} diff --git a/ft_transaction.h b/ft_transaction.h new file mode 100644 index 0000000..3f7cbd2 --- /dev/null +++ b/ft_transaction.h @@ -0,0 +1,57 @@ +/* + * Fault tolerant VM transaction QEMUFile + * + * Copyright (c) 2010 Nippon Telegraph and Telephone Corporation. + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * This source code is based on buffered_file.h. + * Copyright IBM, Corp. 2008 + * Authors: + * Anthony Liguori <aliguori@xxxxxxxxxx> + */ + +#ifndef QEMU_FT_TRANSACTION_FILE_H +#define QEMU_FT_TRANSACTION_FILE_H + +#include "hw/hw.h" + +enum QEMU_VM_TRANSACTION_STATE { + QEMU_VM_TRANSACTION_INIT, + QEMU_VM_TRANSACTION_BEGIN, + QEMU_VM_TRANSACTION_CONTINUE, + QEMU_VM_TRANSACTION_COMMIT, + QEMU_VM_TRANSACTION_CANCEL, + QEMU_VM_TRANSACTION_ATOMIC, + QEMU_VM_TRANSACTION_ACK, + QEMU_VM_TRANSACTION_NACK, +}; + +enum FT_MODE { + FT_OFF, + FT_INIT, + FT_TRANSACTION, + FT_ERROR, +}; +extern enum FT_MODE ft_mode; + +typedef ssize_t (FtTranxPutBufferFunc)(void *opaque, const void *data, size_t size); +typedef ssize_t (FtTranxPutVectorFunc)(void *opaque, const struct iovec *iov, int iovcnt); +typedef QEMUFileGetBufferFunc FtTranxGetBufferFunc; +typedef QEMUFileGetVectorFunc FtTranxGetVectorFunc; +typedef int (FtTranxCloseFunc)(void *opaque); + +int qemu_ft_tranx_begin(void *opaque); +int qemu_ft_tranx_commit(void *opaque); +int qemu_ft_tranx_cancel(void *opaque); + +QEMUFile *qemu_fopen_ops_ft_tranx(void *opaque, + FtTranxPutBufferFunc *put_buffer, + FtTranxPutVectorFunc *put_vector, + FtTranxGetBufferFunc *get_buffer, + FtTranxGetVectorFunc *get_vector, + FtTranxCloseFunc *close, + int is_sender); + +#endif diff --git a/migration.c b/migration.c index 5d238f5..4eed0b7 100644 --- a/migration.c +++ b/migration.c @@ -15,6 +15,7 @@ #include "migration.h" #include "monitor.h" #include "buffered_file.h" +#include "ft_transaction.h" #include "sysemu.h" #include "block.h" #include "qemu_socket.h" @@ -31,6 +32,8 @@ do { } while (0) #endif +enum FT_MODE ft_mode = FT_OFF; + /* Migration speed throttling */ static uint32_t max_throttle = (32 << 20); -- 1.7.0.31.g1df487 -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html