From: Qiao Nuohan <qiaonuohan@xxxxxxxxxxxxxx> Use several threads to read and compress pages and one thread to write the produced pages into dumpfile. The produced pages will be stored in a buffer, then the consumer thread will get pages from this buffer. Signed-off-by: Qiao Nuohan <qiaonuohan at cn.fujitsu.com> --- makedumpfile.c | 450 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ makedumpfile.h | 44 ++++++ 2 files changed, 494 insertions(+), 0 deletions(-) diff --git a/makedumpfile.c b/makedumpfile.c index 05859a3..bce6dc3 100644 --- a/makedumpfile.c +++ b/makedumpfile.c @@ -235,6 +235,31 @@ is_in_same_page(unsigned long vaddr1, unsigned long vaddr2) return FALSE; } +static inline unsigned long +calculate_len_buf_out(long page_size) +{ + unsigned long len_buf_out_zlib, len_buf_out_lzo, len_buf_out_snappy; + unsigned long len_buf_out; + + len_buf_out_zlib = len_buf_out_lzo = len_buf_out_snappy = 0; + +#ifdef USELZO + len_buf_out_lzo = page_size + page_size / 16 + 64 + 3; +#endif + +#ifdef USESNAPPY + len_buf_out_snappy = snappy_max_compressed_length(page_size); +#endif + + len_buf_out_zlib = compressBound(page_size); + + len_buf_out = MAX(len_buf_out_zlib, + MAX(len_buf_out_lzo, + len_buf_out_snappy)); + + return len_buf_out; +} + #define BITMAP_SECT_LEN 4096 static inline int is_dumpable(struct dump_bitmap *, mdf_pfn_t); static inline int is_dumpable_cyclic(char *bitmap, mdf_pfn_t, struct cycle *cycle); @@ -7016,6 +7041,431 @@ write_elf_pages_cyclic(struct cache_data *cd_header, struct cache_data *cd_page) return TRUE; } +void * +kdump_thread_function(void *arg) { + void *retval = PTHREAD_FAIL; + struct thread_args *kdump_thread_args = (struct thread_args *)arg; + struct page_data *page_data_buf = kdump_thread_args->page_data_buf; + int page_data_num = kdump_thread_args->page_data_num; + mdf_pfn_t pfn; + mdf_pfn_t consumed_pfn; + int index; + int found; + int fd_memory = 0; + struct dump_bitmap bitmap_parallel, bitmap_memory_parallel; + unsigned char *buf = NULL, *buf_out = NULL; + struct mmap_cache *mmap_cache = + MMAP_CACHE_PARALLEL(kdump_thread_args->thread_num); + unsigned long size_out; +#ifdef USELZO + lzo_bytep wrkmem = WRKMEM_PARALLEL(kdump_thread_args->thread_num); +#endif +#ifdef USESNAPPY + unsigned long len_buf_out_snappy = + snappy_max_compressed_length(info->page_size); +#endif + + buf = BUF_PARALLEL(kdump_thread_args->thread_num); + buf_out = BUF_OUT_PARALLEL(kdump_thread_args->thread_num); + + fd_memory = FD_MEMORY_PARALLEL(kdump_thread_args->thread_num); + + initialize_2nd_bitmap_parallel(&bitmap_parallel, kdump_thread_args->thread_num); + + if (info->flag_refiltering) { + initialize_bitmap_memory_parallel(&bitmap_memory_parallel, + kdump_thread_args->thread_num); + } + + while (1) { + /* get next pfn */ + pthread_mutex_lock(&info->current_pfn_mutex); + pfn = info->current_pfn; + info->current_pfn++; + pthread_mutex_unlock(&info->current_pfn_mutex); + + if (pfn >= kdump_thread_args->end_pfn) + break; + + index = -1; + found = FALSE; + + while (found == FALSE) { + /* + * need a cancellation point here + */ + sleep(0); + + index = (index + 1) % page_data_num; + + if (pthread_mutex_trylock(&page_data_buf[index].mutex) != 0) + continue; + + if (page_data_buf[index].ready != 0) + goto unlock; + + pthread_mutex_lock(&info->consumed_pfn_mutex); + if ((long)page_data_buf[index].pfn > + (long)info->consumed_pfn) + info->consumed_pfn = page_data_buf[index].pfn; + consumed_pfn = info->consumed_pfn; + pthread_mutex_unlock(&info->consumed_pfn_mutex); + + /* + * leave space for slow producer + */ + if ((long)pfn - (long)consumed_pfn > page_data_num) + goto unlock; + + found = TRUE; + + page_data_buf[index].pfn = pfn; + page_data_buf[index].ready = 1; + + if (!is_dumpable(&bitmap_parallel, pfn)) { + page_data_buf[index].dumpable = FALSE; + goto unlock; + } + + page_data_buf[index].dumpable = TRUE; + + if (!read_pfn_parallel(fd_memory, pfn, buf, + &bitmap_memory_parallel, + mmap_cache)) + goto fail; + + filter_data_buffer_parallel(buf, pfn_to_paddr(pfn), + info->page_size, + &info->filter_mutex); + + if ((info->dump_level & DL_EXCLUDE_ZERO) + && is_zero_page(buf, info->page_size)) { + page_data_buf[index].zero = TRUE; + goto unlock; + } + + page_data_buf[index].zero = FALSE; + + /* + * Compress the page data. + */ + size_out = kdump_thread_args->len_buf_out; + if ((info->flag_compress & DUMP_DH_COMPRESSED_ZLIB) + && ((size_out = kdump_thread_args->len_buf_out), + compress2(buf_out, &size_out, buf, + info->page_size, + Z_BEST_SPEED) == Z_OK) + && (size_out < info->page_size)) { + page_data_buf[index].flags = + DUMP_DH_COMPRESSED_ZLIB; + page_data_buf[index].size = size_out; + memcpy(page_data_buf[index].buf, buf_out, + size_out); +#ifdef USELZO + } else if (info->flag_lzo_support + && (info->flag_compress + & DUMP_DH_COMPRESSED_LZO) + && ((size_out = info->page_size), + lzo1x_1_compress(buf, info->page_size, + buf_out, &size_out, + wrkmem) == LZO_E_OK) + && (size_out < info->page_size)) { + page_data_buf[index].flags = + DUMP_DH_COMPRESSED_LZO; + page_data_buf[index].size = size_out; + memcpy(page_data_buf[index].buf, buf_out, + size_out); +#endif +#ifdef USESNAPPY + } else if ((info->flag_compress + & DUMP_DH_COMPRESSED_SNAPPY) + && ((size_out = len_buf_out_snappy), + snappy_compress((char *)buf, + info->page_size, + (char *)buf_out, + (size_t *)&size_out) + == SNAPPY_OK) + && (size_out < info->page_size)) { + page_data_buf[index].flags = + DUMP_DH_COMPRESSED_SNAPPY; + page_data_buf[index].size = size_out; + memcpy(page_data_buf[index].buf, buf_out, + size_out); +#endif + } else { + page_data_buf[index].flags = 0; + page_data_buf[index].size = info->page_size; + memcpy(page_data_buf[index].buf, buf, + info->page_size); + } +unlock: + pthread_mutex_unlock(&page_data_buf[index].mutex); + } + } + + retval = NULL; + +fail: + if (bitmap_memory_parallel.fd > 0) + close(bitmap_memory_parallel.fd); + + pthread_exit(retval); +} + +int +write_kdump_pages_parallel(struct cache_data *cd_header, + struct cache_data *cd_page) +{ + int ret = FALSE; + int res; + unsigned long len_buf_out; + mdf_pfn_t per, num_dumpable; + mdf_pfn_t start_pfn, end_pfn; + struct disk_dump_header *dh = info->dump_header; + struct page_desc pd, pd_zero; + off_t offset_data = 0; + struct timeval tv_start; + struct timeval last, new; + unsigned char buf[info->page_size]; + unsigned long long consuming_pfn; + pthread_t **threads = NULL; + struct thread_args *kdump_thread_args = NULL; + void *thread_result; + int page_data_num; + struct page_data *page_data_buf = NULL; + int index; + int i; + + if (info->flag_elf_dumpfile) + return ret; + + res = pthread_mutex_init(&info->current_pfn_mutex, NULL); + if (res != 0) { + ERRMSG("Can't initialize current_pfn_mutex. %s\n", + strerror(res)); + goto out; + } + + res = pthread_mutex_init(&info->consumed_pfn_mutex, NULL); + if (res != 0) { + ERRMSG("Can't initialize consumed_pfn_mutex. %s\n", + strerror(res)); + goto out; + } + + res = pthread_mutex_init(&info->filter_mutex, NULL); + if (res != 0) { + ERRMSG("Can't initialize filter_mutex. %s\n", strerror(res)); + goto out; + } + + res = pthread_rwlock_init(&info->usemmap_rwlock, NULL); + if (res != 0) { + ERRMSG("Can't initialize usemmap_rwlock. %s\n", strerror(res)); + goto out; + } + + len_buf_out = calculate_len_buf_out(info->page_size); + + num_dumpable = get_num_dumpable(); + per = num_dumpable / 10000; + per = per ? per : 1; + + /* + * Calculate the offset of the page data. + */ + cd_header->offset + = (DISKDUMP_HEADER_BLOCKS + dh->sub_hdr_size + dh->bitmap_blocks) + * dh->block_size; + cd_page->offset = cd_header->offset + sizeof(page_desc_t)*num_dumpable; + offset_data = cd_page->offset; + + /* + * Write the data of zero-filled page. + */ + gettimeofday(&tv_start, NULL); + if (info->dump_level & DL_EXCLUDE_ZERO) { + pd_zero.size = info->page_size; + pd_zero.flags = 0; + pd_zero.offset = offset_data; + pd_zero.page_flags = 0; + memset(buf, 0, pd_zero.size); + if (!write_cache(cd_page, buf, pd_zero.size)) + goto out; + offset_data += pd_zero.size; + } + + start_pfn = 0; + end_pfn = info->max_mapnr; + + info->current_pfn = start_pfn; + info->consumed_pfn = start_pfn - 1; + + threads = info->threads; + kdump_thread_args = info->kdump_thread_args; + + page_data_num = info->num_buffers; + page_data_buf = info->page_data_buf; + + for (i = 0; i < page_data_num; i++) { + /* + * producer will use pfn in page_data_buf to decide the + * consumed pfn + */ + page_data_buf[i].pfn = start_pfn - 1; + page_data_buf[i].ready = 0; + res = pthread_mutex_init(&page_data_buf[i].mutex, NULL); + if (res != 0) { + ERRMSG("Can't initialize mutex of page_data_buf. %s\n", + strerror(res)); + goto out; + } + } + + for (i = 0; i < info->num_threads; i++) { + kdump_thread_args[i].thread_num = i; + kdump_thread_args[i].len_buf_out = len_buf_out; + kdump_thread_args[i].start_pfn = start_pfn; + kdump_thread_args[i].end_pfn = end_pfn; + kdump_thread_args[i].page_data_num = page_data_num; + kdump_thread_args[i].page_data_buf = page_data_buf; + + res = pthread_create(threads[i], NULL, + kdump_thread_function, + (void *)&kdump_thread_args[i]); + if (res != 0) { + ERRMSG("Can't create thread %d. %s\n", + i, strerror(res)); + goto out; + } + } + + consuming_pfn = start_pfn; + index = -1; + + gettimeofday(&last, NULL); + + while (consuming_pfn < end_pfn) { + index = (index + 1) % page_data_num; + + gettimeofday(&new, NULL); + if (new.tv_sec - last.tv_sec > WAIT_TIME) { + ERRMSG("Can't get data of pfn %llx.\n", consuming_pfn); + goto out; + } + + /* + * check pfn first without mutex locked to reduce the time + * trying to lock the mutex + */ + if (page_data_buf[index].pfn != consuming_pfn) + continue; + + pthread_mutex_lock(&page_data_buf[index].mutex); + + /* check whether the found one is ready to be consumed */ + if (page_data_buf[index].pfn != consuming_pfn || + page_data_buf[index].ready != 1) { + goto unlock; + } + + if ((num_dumped % per) == 0) + print_progress(PROGRESS_COPY, num_dumped, num_dumpable); + + /* next pfn is found, refresh last here */ + last = new; + consuming_pfn++; + page_data_buf[index].ready = 0; + + if (page_data_buf[index].dumpable == FALSE) + goto unlock; + + num_dumped++; + + if (page_data_buf[index].zero == TRUE) { + if (!write_cache(cd_header, &pd_zero, sizeof(page_desc_t))) + goto out; + pfn_zero++; + } else { + pd.flags = page_data_buf[index].flags; + pd.size = page_data_buf[index].size; + pd.page_flags = 0; + pd.offset = offset_data; + offset_data += pd.size; + /* + * Write the page header. + */ + if (!write_cache(cd_header, &pd, sizeof(page_desc_t))) + goto out; + /* + * Write the page data. + */ + if (!write_cache(cd_page, page_data_buf[index].buf, pd.size)) + goto out; + } +unlock: + pthread_mutex_unlock(&page_data_buf[index].mutex); + } + + /* + * Write the remainder. + */ + if (!write_cache_bufsz(cd_page)) + goto out; + if (!write_cache_bufsz(cd_header)) + goto out; + + ret = TRUE; + /* + * print [100 %] + */ + print_progress(PROGRESS_COPY, num_dumped, num_dumpable); + print_execution_time(PROGRESS_COPY, &tv_start); + PROGRESS_MSG("\n"); + +out: + if (threads != NULL) { + for (i = 0; i < info->num_threads; i++) { + if (threads[i] != NULL) { + res = pthread_cancel(*threads[i]); + if (res != 0 && res != ESRCH) + ERRMSG("Can't cancel thread %d. %s\n", + i, strerror(res)); + } + } + + for (i = 0; i < info->num_threads; i++) { + if (threads[i] != NULL) { + res = pthread_join(*threads[i], &thread_result); + if (res != 0) + ERRMSG("Can't join with thread %d. %s\n", + i, strerror(res)); + + if (thread_result == PTHREAD_CANCELED) + DEBUG_MSG("Thread %d is cancelled.\n", i); + else if (thread_result == PTHREAD_FAIL) + DEBUG_MSG("Thread %d fails.\n", i); + else + DEBUG_MSG("Thread %d finishes.\n", i); + + } + } + } + + if (page_data_buf != NULL) { + for (i = 0; i < page_data_num; i++) { + pthread_mutex_destroy(&page_data_buf[i].mutex); + } + } + + pthread_rwlock_destroy(&info->usemmap_rwlock); + pthread_mutex_destroy(&info->filter_mutex); + pthread_mutex_destroy(&info->consumed_pfn_mutex); + pthread_mutex_destroy(&info->current_pfn_mutex); + + return ret; +} + int write_kdump_pages(struct cache_data *cd_header, struct cache_data *cd_page) { diff --git a/makedumpfile.h b/makedumpfile.h index b1ff561..bca3d56 100644 --- a/makedumpfile.h +++ b/makedumpfile.h @@ -431,8 +431,15 @@ do { \ /* * Macro for getting parallel info. */ +#define FD_MEMORY_PARALLEL(i) info->parallel_info[i].fd_memory #define FD_BITMAP_MEMORY_PARALLEL(i) info->parallel_info[i].fd_bitmap_memory #define FD_BITMAP_PARALLEL(i) info->parallel_info[i].fd_bitmap +#define BUF_PARALLEL(i) info->parallel_info[i].buf +#define BUF_OUT_PARALLEL(i) info->parallel_info[i].buf_out +#define MMAP_CACHE_PARALLEL(i) info->parallel_info[i].mmap_cache +#ifdef USELZO +#define WRKMEM_PARALLEL(i) info->parallel_info[i].wrkmem +#endif /* * kernel version * @@ -921,12 +928,39 @@ typedef unsigned long long int ulonglong; /* * for parallel process */ + +#define WAIT_TIME (60 * 10) +#define PTHREAD_FAIL ((void *)-2) + struct mmap_cache { char *mmap_buf; off_t mmap_start_offset; off_t mmap_end_offset; }; +struct page_data +{ + mdf_pfn_t pfn; + int dumpable; + int zero; + unsigned int flags; + long size; + unsigned char *buf; + pthread_mutex_t mutex; + /* + * whether the page_data is ready to be consumed + */ + int ready; +}; + +struct thread_args { + int thread_num; + unsigned long len_buf_out; + mdf_pfn_t start_pfn, end_pfn; + int page_data_num; + struct page_data *page_data_buf; +}; + /* * makedumpfile header * For re-arranging the dump data on different architecture, all the @@ -1208,7 +1242,17 @@ struct DumpInfo { /* * for parallel process */ + int num_threads; + int num_buffers; + pthread_t **threads; + struct thread_args *kdump_thread_args; + struct page_data *page_data_buf; pthread_rwlock_t usemmap_rwlock; + mdf_pfn_t current_pfn; + pthread_mutex_t current_pfn_mutex; + mdf_pfn_t consumed_pfn; + pthread_mutex_t consumed_pfn_mutex; + pthread_mutex_t filter_mutex; }; extern struct DumpInfo *info; -- 1.7.1