[PATCH RFC 07/11] Add write_kdump_pages_parallel to allow parallel process

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



From: Qiao Nuohan <qiaonuohan@xxxxxxxxxxxxxx>

Use several threads to read and compress pages and one thread to write
the produced pages into dumpfile. The produced pages will be stored in
a buffer, then the consumer thread will get pages from this buffer.

Signed-off-by: Qiao Nuohan <qiaonuohan at cn.fujitsu.com>
---
 makedumpfile.c |  450 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 makedumpfile.h |   44 ++++++
 2 files changed, 494 insertions(+), 0 deletions(-)

diff --git a/makedumpfile.c b/makedumpfile.c
index 05859a3..bce6dc3 100644
--- a/makedumpfile.c
+++ b/makedumpfile.c
@@ -235,6 +235,31 @@ is_in_same_page(unsigned long vaddr1, unsigned long vaddr2)
 	return FALSE;
 }
 
+static inline unsigned long
+calculate_len_buf_out(long page_size)
+{
+	unsigned long len_buf_out_zlib, len_buf_out_lzo, len_buf_out_snappy;
+	unsigned long len_buf_out;
+
+	len_buf_out_zlib = len_buf_out_lzo = len_buf_out_snappy = 0;
+
+#ifdef USELZO
+	len_buf_out_lzo = page_size + page_size / 16 + 64 + 3;
+#endif
+
+#ifdef USESNAPPY
+	len_buf_out_snappy = snappy_max_compressed_length(page_size);
+#endif
+
+	len_buf_out_zlib = compressBound(page_size);
+
+	len_buf_out = MAX(len_buf_out_zlib,
+			  MAX(len_buf_out_lzo,
+			      len_buf_out_snappy));
+
+	return len_buf_out;
+}
+
 #define BITMAP_SECT_LEN 4096
 static inline int is_dumpable(struct dump_bitmap *, mdf_pfn_t);
 static inline int is_dumpable_cyclic(char *bitmap, mdf_pfn_t, struct cycle *cycle);
@@ -7016,6 +7041,431 @@ write_elf_pages_cyclic(struct cache_data *cd_header, struct cache_data *cd_page)
 	return TRUE;
 }
 
+void *
+kdump_thread_function(void *arg) {
+	void *retval = PTHREAD_FAIL;
+	struct thread_args *kdump_thread_args = (struct thread_args *)arg;
+	struct page_data *page_data_buf = kdump_thread_args->page_data_buf;
+	int page_data_num = kdump_thread_args->page_data_num;
+	mdf_pfn_t pfn;
+	mdf_pfn_t consumed_pfn;
+	int index;
+	int found;
+	int fd_memory = 0;
+	struct dump_bitmap bitmap_parallel, bitmap_memory_parallel;
+	unsigned char *buf = NULL, *buf_out = NULL;
+	struct mmap_cache *mmap_cache =
+			MMAP_CACHE_PARALLEL(kdump_thread_args->thread_num);
+	unsigned long size_out;
+#ifdef USELZO
+	lzo_bytep wrkmem = WRKMEM_PARALLEL(kdump_thread_args->thread_num);
+#endif
+#ifdef USESNAPPY
+	unsigned long len_buf_out_snappy =
+				snappy_max_compressed_length(info->page_size);
+#endif
+
+	buf = BUF_PARALLEL(kdump_thread_args->thread_num);
+	buf_out = BUF_OUT_PARALLEL(kdump_thread_args->thread_num);
+
+	fd_memory = FD_MEMORY_PARALLEL(kdump_thread_args->thread_num);
+
+	initialize_2nd_bitmap_parallel(&bitmap_parallel, kdump_thread_args->thread_num);
+
+	if (info->flag_refiltering) {
+		initialize_bitmap_memory_parallel(&bitmap_memory_parallel,
+						kdump_thread_args->thread_num);
+	}
+
+	while (1) {
+		/* get next pfn */
+		pthread_mutex_lock(&info->current_pfn_mutex);
+		pfn = info->current_pfn;
+		info->current_pfn++;
+		pthread_mutex_unlock(&info->current_pfn_mutex);
+
+		if (pfn >= kdump_thread_args->end_pfn)
+			break;
+
+		index = -1;
+		found = FALSE;
+
+		while (found == FALSE) {
+			/*
+			 * need a cancellation point here
+			 */
+			sleep(0);
+
+			index = (index + 1) % page_data_num;
+
+			if (pthread_mutex_trylock(&page_data_buf[index].mutex) != 0)
+				continue;
+
+			if (page_data_buf[index].ready != 0)
+				goto unlock;
+
+			pthread_mutex_lock(&info->consumed_pfn_mutex);
+			if ((long)page_data_buf[index].pfn >
+						(long)info->consumed_pfn)
+				info->consumed_pfn = page_data_buf[index].pfn;
+			consumed_pfn = info->consumed_pfn;
+			pthread_mutex_unlock(&info->consumed_pfn_mutex);
+
+			/*
+			 * leave space for slow producer
+			 */
+			if ((long)pfn - (long)consumed_pfn > page_data_num)
+				goto unlock;
+
+			found = TRUE;
+
+			page_data_buf[index].pfn = pfn;
+			page_data_buf[index].ready = 1;
+
+			if (!is_dumpable(&bitmap_parallel, pfn)) {
+				page_data_buf[index].dumpable = FALSE;
+				goto unlock;
+			}
+
+			page_data_buf[index].dumpable = TRUE;
+
+			if (!read_pfn_parallel(fd_memory, pfn, buf,
+					       &bitmap_memory_parallel,
+					       mmap_cache))
+				goto fail;
+
+			filter_data_buffer_parallel(buf, pfn_to_paddr(pfn),
+							info->page_size,
+							&info->filter_mutex);
+
+			if ((info->dump_level & DL_EXCLUDE_ZERO)
+			    && is_zero_page(buf, info->page_size)) {
+				page_data_buf[index].zero = TRUE;
+				goto unlock;
+			}
+
+			page_data_buf[index].zero = FALSE;
+
+			/*
+			 * Compress the page data.
+			 */
+			size_out = kdump_thread_args->len_buf_out;
+			if ((info->flag_compress & DUMP_DH_COMPRESSED_ZLIB)
+			    && ((size_out = kdump_thread_args->len_buf_out),
+				compress2(buf_out, &size_out, buf,
+					  info->page_size,
+					  Z_BEST_SPEED)	== Z_OK)
+			    && (size_out < info->page_size)) {
+				page_data_buf[index].flags =
+							DUMP_DH_COMPRESSED_ZLIB;
+				page_data_buf[index].size  = size_out;
+				memcpy(page_data_buf[index].buf, buf_out,
+								size_out);
+#ifdef USELZO
+			} else if (info->flag_lzo_support
+				   && (info->flag_compress
+				       & DUMP_DH_COMPRESSED_LZO)
+				   && ((size_out = info->page_size),
+				       lzo1x_1_compress(buf, info->page_size,
+							buf_out, &size_out,
+							wrkmem) == LZO_E_OK)
+				   && (size_out < info->page_size)) {
+				page_data_buf[index].flags =
+							DUMP_DH_COMPRESSED_LZO;
+				page_data_buf[index].size  = size_out;
+				memcpy(page_data_buf[index].buf, buf_out,
+								size_out);
+#endif
+#ifdef USESNAPPY
+			} else if ((info->flag_compress
+				    & DUMP_DH_COMPRESSED_SNAPPY)
+				   && ((size_out = len_buf_out_snappy),
+				       snappy_compress((char *)buf,
+						       info->page_size,
+						       (char *)buf_out,
+						       (size_t *)&size_out)
+				       == SNAPPY_OK)
+				   && (size_out < info->page_size)) {
+				page_data_buf[index].flags =
+						DUMP_DH_COMPRESSED_SNAPPY;
+				page_data_buf[index].size  = size_out;
+				memcpy(page_data_buf[index].buf, buf_out,
+								size_out);
+#endif
+			} else {
+				page_data_buf[index].flags = 0;
+				page_data_buf[index].size  = info->page_size;
+				memcpy(page_data_buf[index].buf, buf,
+							info->page_size);
+			}
+unlock:
+			pthread_mutex_unlock(&page_data_buf[index].mutex);
+		}
+	}
+
+	retval = NULL;
+
+fail:
+	if (bitmap_memory_parallel.fd > 0)
+		close(bitmap_memory_parallel.fd);
+
+	pthread_exit(retval);
+}
+
+int
+write_kdump_pages_parallel(struct cache_data *cd_header,
+			   struct cache_data *cd_page)
+{
+	int ret = FALSE;
+	int res;
+	unsigned long len_buf_out;
+	mdf_pfn_t per, num_dumpable;
+	mdf_pfn_t start_pfn, end_pfn;
+	struct disk_dump_header *dh = info->dump_header;
+	struct page_desc pd, pd_zero;
+	off_t offset_data = 0;
+	struct timeval tv_start;
+	struct timeval last, new;
+	unsigned char buf[info->page_size];
+	unsigned long long consuming_pfn;
+	pthread_t **threads = NULL;
+	struct thread_args *kdump_thread_args = NULL;
+	void *thread_result;
+	int page_data_num;
+	struct page_data *page_data_buf = NULL;
+	int index;
+	int i;
+
+	if (info->flag_elf_dumpfile)
+		return ret;
+
+	res = pthread_mutex_init(&info->current_pfn_mutex, NULL);
+	if (res != 0) {
+		ERRMSG("Can't initialize current_pfn_mutex. %s\n",
+				strerror(res));
+		goto out;
+	}
+
+	res = pthread_mutex_init(&info->consumed_pfn_mutex, NULL);
+	if (res != 0) {
+		ERRMSG("Can't initialize consumed_pfn_mutex. %s\n",
+				strerror(res));
+		goto out;
+	}
+
+	res = pthread_mutex_init(&info->filter_mutex, NULL);
+	if (res != 0) {
+		ERRMSG("Can't initialize filter_mutex. %s\n", strerror(res));
+		goto out;
+	}
+
+	res = pthread_rwlock_init(&info->usemmap_rwlock, NULL);
+	if (res != 0) {
+		ERRMSG("Can't initialize usemmap_rwlock. %s\n", strerror(res));
+		goto out;
+	}
+
+	len_buf_out = calculate_len_buf_out(info->page_size);
+
+	num_dumpable = get_num_dumpable();
+	per = num_dumpable / 10000;
+	per = per ? per : 1;
+
+	/*
+	 * Calculate the offset of the page data.
+	 */
+	cd_header->offset
+	    = (DISKDUMP_HEADER_BLOCKS + dh->sub_hdr_size + dh->bitmap_blocks)
+		* dh->block_size;
+	cd_page->offset = cd_header->offset + sizeof(page_desc_t)*num_dumpable;
+	offset_data  = cd_page->offset;
+
+	/*
+	 * Write the data of zero-filled page.
+	 */
+	gettimeofday(&tv_start, NULL);
+	if (info->dump_level & DL_EXCLUDE_ZERO) {
+		pd_zero.size = info->page_size;
+		pd_zero.flags = 0;
+		pd_zero.offset = offset_data;
+		pd_zero.page_flags = 0;
+		memset(buf, 0, pd_zero.size);
+		if (!write_cache(cd_page, buf, pd_zero.size))
+			goto out;
+		offset_data  += pd_zero.size;
+	}
+
+	start_pfn = 0;
+	end_pfn   = info->max_mapnr;
+
+	info->current_pfn = start_pfn;
+	info->consumed_pfn = start_pfn - 1;
+
+	threads = info->threads;
+	kdump_thread_args = info->kdump_thread_args;
+
+	page_data_num = info->num_buffers;
+	page_data_buf = info->page_data_buf;
+
+	for (i = 0; i < page_data_num; i++) {
+		/*
+		 * producer will use pfn in page_data_buf to decide the
+		 * consumed pfn
+		 */
+		page_data_buf[i].pfn = start_pfn - 1;
+		page_data_buf[i].ready = 0;
+		res = pthread_mutex_init(&page_data_buf[i].mutex, NULL);
+		if (res != 0) {
+			ERRMSG("Can't initialize mutex of page_data_buf. %s\n",
+					strerror(res));
+			goto out;
+		}
+	}
+
+	for (i = 0; i < info->num_threads; i++) {
+		kdump_thread_args[i].thread_num = i;
+		kdump_thread_args[i].len_buf_out = len_buf_out;
+		kdump_thread_args[i].start_pfn = start_pfn;
+		kdump_thread_args[i].end_pfn = end_pfn;
+		kdump_thread_args[i].page_data_num = page_data_num;
+		kdump_thread_args[i].page_data_buf = page_data_buf;
+
+		res = pthread_create(threads[i], NULL,
+				     kdump_thread_function,
+				     (void *)&kdump_thread_args[i]);
+		if (res != 0) {
+			ERRMSG("Can't create thread %d. %s\n",
+					i, strerror(res));
+			goto out;
+		}
+	}
+
+	consuming_pfn = start_pfn;
+	index = -1;
+
+	gettimeofday(&last, NULL);
+
+	while (consuming_pfn < end_pfn) {
+		index = (index + 1) % page_data_num;
+
+		gettimeofday(&new, NULL);
+		if (new.tv_sec - last.tv_sec > WAIT_TIME) {
+			ERRMSG("Can't get data of pfn %llx.\n", consuming_pfn);
+			goto out;
+		}
+
+		/*
+		 * check pfn first without mutex locked to reduce the time
+		 * trying to lock the mutex
+		 */
+		if (page_data_buf[index].pfn != consuming_pfn)
+			continue;
+
+		pthread_mutex_lock(&page_data_buf[index].mutex);
+
+		/* check whether the found one is ready to be consumed */
+		if (page_data_buf[index].pfn != consuming_pfn ||
+		    page_data_buf[index].ready != 1) {
+			goto unlock;
+		}
+
+		if ((num_dumped % per) == 0)
+			print_progress(PROGRESS_COPY, num_dumped, num_dumpable);
+
+		/* next pfn is found, refresh last here */
+		last = new;
+		consuming_pfn++;
+		page_data_buf[index].ready = 0;
+
+		if (page_data_buf[index].dumpable == FALSE)
+			goto unlock;
+
+		num_dumped++;
+
+		if (page_data_buf[index].zero == TRUE) {
+			if (!write_cache(cd_header, &pd_zero, sizeof(page_desc_t)))
+				goto out;
+			pfn_zero++;
+		} else {
+			pd.flags      = page_data_buf[index].flags;
+			pd.size       = page_data_buf[index].size;
+			pd.page_flags = 0;
+			pd.offset     = offset_data;
+			offset_data  += pd.size;
+			/*
+			 * Write the page header.
+			 */
+			if (!write_cache(cd_header, &pd, sizeof(page_desc_t)))
+				goto out;
+			/*
+			 * Write the page data.
+			 */
+			if (!write_cache(cd_page, page_data_buf[index].buf, pd.size))
+				goto out;
+		}
+unlock:
+		pthread_mutex_unlock(&page_data_buf[index].mutex);
+	}
+
+	/*
+	 * Write the remainder.
+	 */
+	if (!write_cache_bufsz(cd_page))
+		goto out;
+	if (!write_cache_bufsz(cd_header))
+		goto out;
+
+	ret = TRUE;
+	/*
+	 * print [100 %]
+	 */
+	print_progress(PROGRESS_COPY, num_dumped, num_dumpable);
+	print_execution_time(PROGRESS_COPY, &tv_start);
+	PROGRESS_MSG("\n");
+
+out:
+	if (threads != NULL) {
+		for (i = 0; i < info->num_threads; i++) {
+			if (threads[i] != NULL) {
+				res = pthread_cancel(*threads[i]);
+				if (res != 0 && res != ESRCH)
+					ERRMSG("Can't cancel thread %d. %s\n",
+							i, strerror(res));
+			}
+		}
+
+		for (i = 0; i < info->num_threads; i++) {
+			if (threads[i] != NULL) {
+				res = pthread_join(*threads[i], &thread_result);
+				if (res != 0)
+					ERRMSG("Can't join with thread %d. %s\n",
+							i, strerror(res));
+
+				if (thread_result == PTHREAD_CANCELED)
+					DEBUG_MSG("Thread %d is cancelled.\n", i);
+				else if (thread_result == PTHREAD_FAIL)
+					DEBUG_MSG("Thread %d fails.\n", i);
+				else
+					DEBUG_MSG("Thread %d finishes.\n", i);
+
+			}
+		}
+	}
+
+	if (page_data_buf != NULL) {
+		for (i = 0; i < page_data_num; i++) {
+			pthread_mutex_destroy(&page_data_buf[i].mutex);
+		}
+	}
+
+	pthread_rwlock_destroy(&info->usemmap_rwlock);
+	pthread_mutex_destroy(&info->filter_mutex);
+	pthread_mutex_destroy(&info->consumed_pfn_mutex);
+	pthread_mutex_destroy(&info->current_pfn_mutex);
+
+	return ret;
+}
+
 int
 write_kdump_pages(struct cache_data *cd_header, struct cache_data *cd_page)
 {
diff --git a/makedumpfile.h b/makedumpfile.h
index b1ff561..bca3d56 100644
--- a/makedumpfile.h
+++ b/makedumpfile.h
@@ -431,8 +431,15 @@ do { \
 /*
  * Macro for getting parallel info.
  */
+#define FD_MEMORY_PARALLEL(i)		info->parallel_info[i].fd_memory
 #define FD_BITMAP_MEMORY_PARALLEL(i)	info->parallel_info[i].fd_bitmap_memory
 #define FD_BITMAP_PARALLEL(i)		info->parallel_info[i].fd_bitmap
+#define BUF_PARALLEL(i)			info->parallel_info[i].buf
+#define BUF_OUT_PARALLEL(i)		info->parallel_info[i].buf_out
+#define MMAP_CACHE_PARALLEL(i)		info->parallel_info[i].mmap_cache
+#ifdef USELZO
+#define WRKMEM_PARALLEL(i)		info->parallel_info[i].wrkmem
+#endif
 /*
  * kernel version
  *
@@ -921,12 +928,39 @@ typedef unsigned long long int ulonglong;
 /*
  * for parallel process
  */
+
+#define WAIT_TIME	(60 * 10)
+#define PTHREAD_FAIL	((void *)-2)
+
 struct mmap_cache {
 	char	*mmap_buf;
 	off_t	mmap_start_offset;
 	off_t   mmap_end_offset;
 };
 
+struct page_data
+{
+	mdf_pfn_t pfn;
+	int dumpable;
+	int zero;
+	unsigned int flags;
+	long size;
+	unsigned char *buf;
+	pthread_mutex_t mutex;
+	/*
+	 * whether the page_data is ready to be consumed
+	 */
+	int ready;
+};
+
+struct thread_args {
+	int thread_num;
+	unsigned long len_buf_out;
+	mdf_pfn_t start_pfn, end_pfn;
+	int page_data_num;
+	struct page_data *page_data_buf;
+};
+
 /*
  * makedumpfile header
  *   For re-arranging the dump data on different architecture, all the
@@ -1208,7 +1242,17 @@ struct DumpInfo {
 	/*
 	 * for parallel process
 	 */
+	int num_threads;
+	int num_buffers;
+	pthread_t **threads;
+	struct thread_args *kdump_thread_args;
+	struct page_data *page_data_buf;
 	pthread_rwlock_t usemmap_rwlock;
+	mdf_pfn_t current_pfn;
+	pthread_mutex_t current_pfn_mutex;
+	mdf_pfn_t consumed_pfn;
+	pthread_mutex_t consumed_pfn_mutex;
+	pthread_mutex_t filter_mutex;
 };
 extern struct DumpInfo		*info;
 
-- 
1.7.1




[Index of Archives]     [LM Sensors]     [Linux Sound]     [ALSA Users]     [ALSA Devel]     [Linux Audio Users]     [Linux Media]     [Kernel]     [Gimp]     [Yosemite News]     [Linux Media]

  Powered by Linux