Re: concurrent direct IO write in xfs

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hello

On Mon, Jan 23, 2012 at 2:34 PM, Zheng Da <zhengda1936@xxxxxxxxx> wrote:
> I build XFS on the top of ramdisk. So yes, there is a lot of small
> concurrent writes in a second.
> I create a file of 4GB in XFS (the ramdisk has 5GB of space). My test
> program overwrites 4G of data to the file and each time writes a page of
> data randomly to the file. It's always overwriting, and no appending. The
> offset of each write is always aligned to the page size. There is no
> overlapping between writes.

Why are you using XFS for this? tmpfs was designed to do this sort
of stuff as efficiently as possible....
OK, I can try that. 
tmpfs doesn't support direct IO. 

> So the test case is pretty simple and I think it's easy to reproduce it.
> It'll be great if you can try the test case.

Can you post your test code so I know what I test is exactly what
you are running?
I can do that. My test code gets very complicated now. I need to simplify it.
Here is the code. It's still a bit long. I hope it's OK.
You can run the code like "rand-read file option=direct pages=1048576 threads=8 access=write/read".

Thanks,
Da
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/time.h>
#include <stdlib.h>
#include <sys/resource.h>
#include <sys/mman.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <assert.h>
#include <google/profiler.h>

#include <iostream>
#include <string>
#include <deque>

#define PAGE_SIZE 4096
#define ROUND_PAGE(off) (((long) off) & (~(PAGE_SIZE - 1)))

#define NUM_PAGES 16384
#define NUM_THREADS 32

enum {
	READ,
	WRITE
};

int npages;
int nthreads = 1;
struct timeval global_start;
char static_buf[PAGE_SIZE * 8] __attribute__((aligned(PAGE_SIZE)));
volatile int first[NUM_THREADS];
int access_method = READ;

class workload_gen
{
public:
	virtual off_t next_offset() = 0;
	virtual bool has_next() = 0;
};

class rand_permute
{
	off_t *offset;
	long num;
public:
	rand_permute(long num, int stride) {
		offset = (off_t *) valloc(num * sizeof(off_t));
		for (int i = 0; i < num; i++) {
			offset[i] = ((off_t) i) * stride;
		}

		for (int i = num - 1; i >= 1; i--) {
			int j = random() % i;
			off_t tmp = offset[j];
			offset[j] = offset[i];
			offset[i] = tmp;
		}
	}

	~rand_permute() {
		free(offset);
	}

	off_t get_offset(long idx) const {
		return offset[idx];
	}
};

class local_rand_permute_workload: public workload_gen
{
	long start;
	long end;
	static const rand_permute *permute;
public:
	local_rand_permute_workload(long num, int stride, long start, long end) {
		if (permute == NULL) {
			permute = new rand_permute(num, stride);
		}
		this->start = start;
		this->end = end;
	}

	~local_rand_permute_workload() {
		if (permute) {
			delete permute;
			permute = NULL;
		}
	}

	off_t next_offset() {
		if (start >= end)
			return -1;
		return permute->get_offset(start++);
	}

	bool has_next() {
		return start < end;
	}
};

float time_diff(struct timeval time1, struct timeval time2)
{
	return time2.tv_sec - time1.tv_sec
			+ ((float)(time2.tv_usec - time1.tv_usec))/1000000;
}

class rand_buf
{
	/* where the data read from the disk is stored */
	char *buf;
	/* shows the locations in the array where data has be to stored.*/
	rand_permute buf_offset;
	int entry_size;
	int num_entries;

	int current;
public:
	rand_buf(int buf_size, int entry_size): buf_offset(buf_size / entry_size, entry_size) {
		this->entry_size = entry_size;
		num_entries = buf_size / entry_size;
		buf = (char *) valloc(buf_size);

		if (buf == NULL){
			fprintf(stderr, "can't allocate buffer\n");
			exit(1);
		}
		/* trigger page faults and bring pages to memory. */
		for (int i = 0; i < buf_size / PAGE_SIZE; i++)
			buf[i * PAGE_SIZE] = 0;

		current = 0;
	}

	~rand_buf() {
		free(buf);
	}

	char *next_entry() {
		int off = buf_offset.get_offset(current);
		current = (current + 1) % num_entries;;
		return &buf[off];
	}

	int get_entry_size() {
		return entry_size;
	}
};

/* this data structure stores the thread-private info. */
class thread_private
{
public:
	pthread_t id;
	/* the location in the thread descriptor array. */
	int idx;
	rand_buf buf;
	workload_gen *gen;
	ssize_t read_bytes;
	struct timeval start_time;
	struct timeval end_time;

	virtual ssize_t access(char *, off_t, ssize_t, int) = 0;
	virtual int thread_init() = 0;

	thread_private(int idx, int entry_size): buf(NUM_PAGES / nthreads * PAGE_SIZE, entry_size) {
		this->idx = idx;
		read_bytes = 0;
	}

};

class read_private: public thread_private
{
	const char *file_name;
	int fd;
	int flags;
protected:
	int get_fd() {
		return fd;
	}

public:
	read_private(const char *name, int idx, int entry_size,
			int flags = O_RDWR): thread_private(idx, entry_size), file_name(name) {
		this->flags = flags;
	}

	int thread_init() {
		int ret;

		fd = open(file_name, flags);
		if (fd < 0) {
			perror("open");
			exit (1);
		}
		ret = posix_fadvise(fd, 0, 0, POSIX_FADV_RANDOM);
		if (ret < 0) {
			perror("posix_fadvise");
			exit(1);
		}
		return 0;
	}

	ssize_t access(char *buf, off_t offset, ssize_t size, int access_method) {
		assert(offset < 0x100000000L);
		ssize_t ret;
		if (access_method == WRITE)
			ret = pwrite(fd, buf, size, offset);
		else
			ret = pread(fd, buf, size, offset);
		return ret;
	}
};

class direct_private: public read_private
{
	char *pages;
	int buf_idx;
public:
	direct_private(const char *name, int idx, int entry_size): read_private(name, idx,
			entry_size, O_DIRECT | O_RDWR) {
		pages = (char *) valloc(PAGE_SIZE * 4096);
		buf_idx = 0;
	}

	ssize_t access(char *buf, off_t offset, ssize_t size, int access_method) {
		ssize_t ret;
		/* for simplicity, I assume all request sizes are smaller than a page size */
		assert(size <= PAGE_SIZE);
		if (ROUND_PAGE(offset) == offset
				&& (long) buf == ROUND_PAGE(buf)
				&& size == PAGE_SIZE) {
			ret = read_private::access(buf, offset, size, access_method);
		}
		else {
			assert(access_method == READ);
			buf_idx++;
			if (buf_idx == 4096)
				buf_idx = 0;
			char *page = pages + buf_idx * PAGE_SIZE;
			ret = read_private::access(page, ROUND_PAGE(offset), PAGE_SIZE, access_method);
			if (ret < 0)
				return ret;
			else
				memcpy(buf, page + (offset - ROUND_PAGE(offset)), size);
			ret = size;
		}
		return ret;
	}
};

thread_private *threads[NUM_THREADS];

void *rand_read(void *arg)
{
	ssize_t ret = -1;
	thread_private *priv = threads[(long) arg];
	rand_buf *buf;

	priv->thread_init();
	buf = &priv->buf;

	gettimeofday(&priv->start_time, NULL);
	while (priv->gen->has_next()) {
		char *entry = buf->next_entry();
		off_t off = priv->gen->next_offset();

		ret = priv->access(entry, off, buf->get_entry_size(), access_method);
		if (ret > 0) {
			assert(ret == buf->get_entry_size());
			if (ret > 0)
				priv->read_bytes += ret;
			else
				break;
		}
		if (ret < 0) {
			perror("access");
			exit(1);
		}
	}
	if (ret < 0) {
		perror("read");
		exit(1);
	}
	gettimeofday(&priv->end_time, NULL);
	
	pthread_exit((void *) priv->read_bytes);
}

long str2size(std::string str)
{
	int len = str.length();
	long multiply = 1;
	if (str[len - 1] == 'M' || str[len - 1] == 'm') {
		multiply *= 1024 * 1024;
		str[len - 1] = 0;
	}
	else if (str[len - 1] == 'K' || str[len - 1] == 'k') {
		multiply *= 1024;
		str[len - 1] = 0;
	}
	else if (str[len - 1] == 'G' || str[len - 1] == 'g') {
		multiply *= 1024 * 1024 * 1024;
		str[len - 1] = 0;
	}
	return atol(str.c_str()) * multiply;
}

int main(int argc, char *argv[])
{
	int entry_size = 4096;
	std::string access_option;
	int ret;
	int i, j;
	struct timeval start_time, end_time;
	ssize_t read_bytes = 0;
	int num_files = 0;
	std::string file_names[NUM_THREADS];

	if (argc < 5) {
		fprintf(stderr, "there are %d argments\n", argc);
		fprintf(stderr, "read files option pages threads\n");
		exit(1);
	}

	for (int i = 1; i < argc; i++) {
		std::string str = argv[i];
		size_t found = str.find("=");
		/* if there isn't `=', I assume it's a file name*/
		if (found == std::string::npos) {
			file_names[num_files++] = str;
			continue;
		}

		std::string value = str.substr(found + 1);
		std::string key = str.substr(0, found);
		if (key.compare("option") == 0) {
			access_option = value;
		}
		else if(key.compare("pages") == 0) {
			npages = atoi(value.c_str());
		}
		else if(key.compare("threads") == 0) {
			nthreads = atoi(value.c_str());
		}
		else if(key.compare("access") == 0) {
			if(value.compare("read") == 0)
				access_method = READ;
			else if(value.compare("write") == 0)
				access_method = WRITE;
			else {
				fprintf(stderr, "wrong access method\n");
				exit(1);
			}
		}
		else {
			fprintf(stderr, "wrong option\n");
			exit(1);
		}
	}

	int num_entries = npages * (PAGE_SIZE / entry_size);

	if (nthreads > NUM_THREADS) {
		fprintf(stderr, "too many threads\n");
		exit(1);
	}
	if (num_files > 1 && num_files != nthreads) {
		fprintf(stderr, "if there are multiple files, \
				the number of files must be the same as the number of threads\n");
		exit(1);
	}

	/* initialize the threads' private data. */
	for (j = 0; j < nthreads; j++) {
		const char *file_name;
		if (num_files > 1) {
			file_name = file_names[j].c_str();
		}
		else {
			file_name = file_names[0].c_str();
		}
		if (access_option.compare("normal") == 0)
			threads[j] = new read_private(file_name, j, entry_size);
		else if (access_option.compare("direct") == 0)
			threads[j] = new direct_private(file_name, j, entry_size);
		else {
			fprintf(stderr, "wrong access option\n");
			exit(1);
		}
		
		long start, end;
		if (num_files > 1) {
			start = 0;
			end = npages * PAGE_SIZE / entry_size;
		}
		else {
			start = (long) npages / nthreads * PAGE_SIZE / entry_size * j;
			end = start + (long) npages / nthreads * PAGE_SIZE / entry_size;
		}
		printf("thread %d starts %ld ends %ld\n", j, start, end);
		threads[j]->gen = new local_rand_permute_workload(num_entries,
						entry_size, start, end);
	}

	ret = setpriority(PRIO_PROCESS, getpid(), -20);
	if (ret < 0) {
		perror("setpriority");
		exit(1);
	}

	gettimeofday(&start_time, NULL);
	global_start = start_time;
	for (i = 0; i < nthreads; i++) {
		ret = pthread_create(&threads[i]->id, NULL, rand_read, (void *) i);
		if (ret) {
			perror("pthread_create");
			exit(1);
		}
	}

	for (i = 0; i < nthreads; i++) {
		ssize_t size;
		ret = pthread_join(threads[i]->id, (void **) &size);
		if (ret) {
			perror("pthread_join");
			exit(1);
		}
		read_bytes += size;
	}
	gettimeofday(&end_time, NULL);
	printf("read %ld bytes, takes %f seconds\n",
			read_bytes, end_time.tv_sec - start_time.tv_sec
			+ ((float)(end_time.tv_usec - start_time.tv_usec))/1000000);
}

const rand_permute *local_rand_permute_workload::permute;
_______________________________________________
xfs mailing list
xfs@xxxxxxxxxxx
http://oss.sgi.com/mailman/listinfo/xfs

[Index of Archives]     [Linux XFS Devel]     [Linux Filesystem Development]     [Filesystem Testing]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux