* Paolo Bonzini (pbonzini@xxxxxxxxxx) wrote: > On 23/11/18 19:17, Dr. David Alan Gilbert wrote: > > * guangrong.xiao@xxxxxxxxx (guangrong.xiao@xxxxxxxxx) wrote: > >> From: Xiao Guangrong <xiaoguangrong@xxxxxxxxxxx> > >> > >> Adapt the compression code to the threaded workqueue > >> > >> Signed-off-by: Xiao Guangrong <xiaoguangrong@xxxxxxxxxxx> > >> --- > >> migration/ram.c | 308 ++++++++++++++++++++------------------------------------ > >> 1 file changed, 110 insertions(+), 198 deletions(-) > >> > >> diff --git a/migration/ram.c b/migration/ram.c > >> index 7e7deec4d8..254c08f27b 100644 > >> --- a/migration/ram.c > >> +++ b/migration/ram.c > >> @@ -57,6 +57,7 @@ > >> #include "qemu/uuid.h" > >> #include "savevm.h" > >> #include "qemu/iov.h" > >> +#include "qemu/threaded-workqueue.h" > >> > >> /***********************************************************/ > >> /* ram save/restore */ > >> @@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus; > >> > >> CompressionStats compression_counters; > >> > >> -struct CompressParam { > >> - bool done; > >> - bool quit; > >> - bool zero_page; > >> - QEMUFile *file; > >> - QemuMutex mutex; > >> - QemuCond cond; > >> - RAMBlock *block; > >> - ram_addr_t offset; > >> - > >> - /* internally used fields */ > >> - z_stream stream; > >> - uint8_t *originbuf; > >> -}; > >> -typedef struct CompressParam CompressParam; > >> - > >> struct DecompressParam { > >> bool done; > >> bool quit; > >> @@ -377,15 +362,6 @@ struct DecompressParam { > >> }; > >> typedef struct DecompressParam DecompressParam; > >> > >> -static CompressParam *comp_param; > >> -static QemuThread *compress_threads; > >> -/* comp_done_cond is used to wake up the migration thread when > >> - * one of the compression threads has finished the compression. > >> - * comp_done_lock is used to co-work with comp_done_cond. > >> - */ > >> -static QemuMutex comp_done_lock; > >> -static QemuCond comp_done_cond; > >> -/* The empty QEMUFileOps will be used by file in CompressParam */ > >> static const QEMUFileOps empty_ops = { }; > >> > >> static QEMUFile *decomp_file; > >> @@ -394,125 +370,6 @@ static QemuThread *decompress_threads; > >> static QemuMutex decomp_done_lock; > >> static QemuCond decomp_done_cond; > >> > >> -static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, > >> - ram_addr_t offset, uint8_t *source_buf); > >> - > >> -static void *do_data_compress(void *opaque) > >> -{ > >> - CompressParam *param = opaque; > >> - RAMBlock *block; > >> - ram_addr_t offset; > >> - bool zero_page; > >> - > >> - qemu_mutex_lock(¶m->mutex); > >> - while (!param->quit) { > >> - if (param->block) { > >> - block = param->block; > >> - offset = param->offset; > >> - param->block = NULL; > >> - qemu_mutex_unlock(¶m->mutex); > >> - > >> - zero_page = do_compress_ram_page(param->file, ¶m->stream, > >> - block, offset, param->originbuf); > >> - > >> - qemu_mutex_lock(&comp_done_lock); > >> - param->done = true; > >> - param->zero_page = zero_page; > >> - qemu_cond_signal(&comp_done_cond); > >> - qemu_mutex_unlock(&comp_done_lock); > >> - > >> - qemu_mutex_lock(¶m->mutex); > >> - } else { > >> - qemu_cond_wait(¶m->cond, ¶m->mutex); > >> - } > >> - } > >> - qemu_mutex_unlock(¶m->mutex); > >> - > >> - return NULL; > >> -} > >> - > >> -static void compress_threads_save_cleanup(void) > >> -{ > >> - int i, thread_count; > >> - > >> - if (!migrate_use_compression() || !comp_param) { > >> - return; > >> - } > >> - > >> - thread_count = migrate_compress_threads(); > >> - for (i = 0; i < thread_count; i++) { > >> - /* > >> - * we use it as a indicator which shows if the thread is > >> - * properly init'd or not > >> - */ > >> - if (!comp_param[i].file) { > >> - break; > >> - } > >> - > >> - qemu_mutex_lock(&comp_param[i].mutex); > >> - comp_param[i].quit = true; > >> - qemu_cond_signal(&comp_param[i].cond); > >> - qemu_mutex_unlock(&comp_param[i].mutex); > >> - > >> - qemu_thread_join(compress_threads + i); > >> - qemu_mutex_destroy(&comp_param[i].mutex); > >> - qemu_cond_destroy(&comp_param[i].cond); > >> - deflateEnd(&comp_param[i].stream); > >> - g_free(comp_param[i].originbuf); > >> - qemu_fclose(comp_param[i].file); > >> - comp_param[i].file = NULL; > >> - } > >> - qemu_mutex_destroy(&comp_done_lock); > >> - qemu_cond_destroy(&comp_done_cond); > >> - g_free(compress_threads); > >> - g_free(comp_param); > >> - compress_threads = NULL; > >> - comp_param = NULL; > >> -} > >> - > >> -static int compress_threads_save_setup(void) > >> -{ > >> - int i, thread_count; > >> - > >> - if (!migrate_use_compression()) { > >> - return 0; > >> - } > >> - thread_count = migrate_compress_threads(); > >> - compress_threads = g_new0(QemuThread, thread_count); > >> - comp_param = g_new0(CompressParam, thread_count); > >> - qemu_cond_init(&comp_done_cond); > >> - qemu_mutex_init(&comp_done_lock); > >> - for (i = 0; i < thread_count; i++) { > >> - comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE); > >> - if (!comp_param[i].originbuf) { > >> - goto exit; > >> - } > >> - > >> - if (deflateInit(&comp_param[i].stream, > >> - migrate_compress_level()) != Z_OK) { > >> - g_free(comp_param[i].originbuf); > >> - goto exit; > >> - } > >> - > >> - /* comp_param[i].file is just used as a dummy buffer to save data, > >> - * set its ops to empty. > >> - */ > >> - comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops); > >> - comp_param[i].done = true; > >> - comp_param[i].quit = false; > >> - qemu_mutex_init(&comp_param[i].mutex); > >> - qemu_cond_init(&comp_param[i].cond); > >> - qemu_thread_create(compress_threads + i, "compress", > >> - do_data_compress, comp_param + i, > >> - QEMU_THREAD_JOINABLE); > >> - } > >> - return 0; > >> - > >> -exit: > >> - compress_threads_save_cleanup(); > >> - return -1; > >> -} > >> - > >> /* Multiple fd's */ > >> > >> #define MULTIFD_MAGIC 0x11223344U > >> @@ -1909,12 +1766,25 @@ exit: > >> return zero_page; > >> } > >> > >> +struct CompressData { > >> + /* filled by migration thread.*/ > >> + RAMBlock *block; > >> + ram_addr_t offset; > >> + > >> + /* filled by compress thread. */ > >> + QEMUFile *file; > >> + z_stream stream; > >> + uint8_t *originbuf; > >> + bool zero_page; > >> +}; > >> +typedef struct CompressData CompressData; > >> + > >> static void > >> -update_compress_thread_counts(const CompressParam *param, int bytes_xmit) > >> +update_compress_thread_counts(CompressData *cd, int bytes_xmit) > > > > Keep the const? > >> { > >> ram_counters.transferred += bytes_xmit; > >> > >> - if (param->zero_page) { > >> + if (cd->zero_page) { > >> ram_counters.duplicate++; > >> return; > >> } > >> @@ -1924,81 +1794,123 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit) > >> compression_counters.pages++; > >> } > >> > >> +static int compress_thread_data_init(void *request) > >> +{ > >> + CompressData *cd = request; > >> + > >> + cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE); > >> + if (!cd->originbuf) { > >> + return -1; > >> + } > >> + > >> + if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) { > >> + g_free(cd->originbuf); > >> + return -1; > >> + } > > > > Please print errors if you fail in any case so we can easily tell what > > happened. > > > >> + cd->file = qemu_fopen_ops(NULL, &empty_ops); > >> + return 0; > >> +} > >> + > >> +static void compress_thread_data_fini(void *request) > >> +{ > >> + CompressData *cd = request; > >> + > >> + qemu_fclose(cd->file); > >> + deflateEnd(&cd->stream); > >> + g_free(cd->originbuf); > >> +} > >> + > >> +static void compress_thread_data_handler(void *request) > >> +{ > >> + CompressData *cd = request; > >> + > >> + /* > >> + * if compression fails, it will be indicated by > >> + * migrate_get_current()->to_dst_file. > >> + */ > >> + cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block, > >> + cd->offset, cd->originbuf); > >> +} > >> + > >> +static void compress_thread_data_done(void *request) > >> +{ > >> + CompressData *cd = request; > >> + RAMState *rs = ram_state; > >> + int bytes_xmit; > >> + > >> + bytes_xmit = qemu_put_qemu_file(rs->f, cd->file); > >> + update_compress_thread_counts(cd, bytes_xmit); > >> +} > >> + > >> +static const ThreadedWorkqueueOps compress_ops = { > >> + .thread_request_init = compress_thread_data_init, > >> + .thread_request_uninit = compress_thread_data_fini, > >> + .thread_request_handler = compress_thread_data_handler, > >> + .thread_request_done = compress_thread_data_done, > >> + .request_size = sizeof(CompressData), > >> +}; > >> + > >> +static Threads *compress_threads; > >> + > >> static bool save_page_use_compression(RAMState *rs); > >> > >> static void flush_compressed_data(RAMState *rs) > >> { > >> - int idx, len, thread_count; > >> - > >> if (!save_page_use_compression(rs)) { > >> return; > >> } > >> - thread_count = migrate_compress_threads(); > >> > >> - qemu_mutex_lock(&comp_done_lock); > >> - for (idx = 0; idx < thread_count; idx++) { > >> - while (!comp_param[idx].done) { > >> - qemu_cond_wait(&comp_done_cond, &comp_done_lock); > >> - } > >> - } > >> - qemu_mutex_unlock(&comp_done_lock); > >> + threaded_workqueue_wait_for_requests(compress_threads); > >> +} > >> > >> - for (idx = 0; idx < thread_count; idx++) { > >> - qemu_mutex_lock(&comp_param[idx].mutex); > >> - if (!comp_param[idx].quit) { > >> - len = qemu_put_qemu_file(rs->f, comp_param[idx].file); > >> - /* > >> - * it's safe to fetch zero_page without holding comp_done_lock > >> - * as there is no further request submitted to the thread, > >> - * i.e, the thread should be waiting for a request at this point. > >> - */ > >> - update_compress_thread_counts(&comp_param[idx], len); > >> - } > >> - qemu_mutex_unlock(&comp_param[idx].mutex); > >> +static void compress_threads_save_cleanup(void) > >> +{ > >> + if (!compress_threads) { > >> + return; > >> } > >> + > >> + threaded_workqueue_destroy(compress_threads); > >> + compress_threads = NULL; > >> } > >> > >> -static inline void set_compress_params(CompressParam *param, RAMBlock *block, > >> - ram_addr_t offset) > >> +static int compress_threads_save_setup(void) > >> { > >> - param->block = block; > >> - param->offset = offset; > >> + if (!migrate_use_compression()) { > >> + return 0; > >> + } > >> + > >> + compress_threads = threaded_workqueue_create("compress", > >> + migrate_compress_threads(), > >> + DEFAULT_THREAD_REQUEST_NR, &compress_ops); > >> + return compress_threads ? 0 : -1; > >> } > >> > >> static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block, > >> ram_addr_t offset) > >> { > >> - int idx, thread_count, bytes_xmit = -1, pages = -1; > >> + CompressData *cd; > >> bool wait = migrate_compress_wait_thread(); > >> > >> - thread_count = migrate_compress_threads(); > >> - qemu_mutex_lock(&comp_done_lock); > >> retry: > >> - for (idx = 0; idx < thread_count; idx++) { > >> - if (comp_param[idx].done) { > >> - comp_param[idx].done = false; > >> - bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file); > >> - qemu_mutex_lock(&comp_param[idx].mutex); > >> - set_compress_params(&comp_param[idx], block, offset); > >> - qemu_cond_signal(&comp_param[idx].cond); > >> - qemu_mutex_unlock(&comp_param[idx].mutex); > >> - pages = 1; > >> - update_compress_thread_counts(&comp_param[idx], bytes_xmit); > >> - break; > >> + cd = threaded_workqueue_get_request(compress_threads); > >> + if (!cd) { > >> + /* > >> + * wait for the free thread if the user specifies > >> + * 'compress-wait-thread', otherwise we will post > >> + * the page out in the main thread as normal page. > >> + */ > >> + if (wait) { > >> + cpu_relax(); > >> + goto retry; > > > > Is there nothing better we can use to wait without eating CPU time? > > There is a mechanism to wait without eating CPU time in the data > structure, but it makes sense to busy wait. There are 4 threads in the > workqueue, so you have to compare 1/4th of the time spent compressing a > page, with the trip into the kernel to wake you up. You're adding 20% > CPU usage, but I'm not surprised it's worthwhile. Hmm OK; in that case it does at least need a comment because it's a bit odd, and we should watch out how that scales - I guess it's less of an overhead the more threads you use. Dave > Paolo > -- Dr. David Alan Gilbert / dgilbert@xxxxxxxxxx / Manchester, UK