[RFC][PATCH v3 2/6] mm, directio: fix fork vs direct-io race (read(2) side IOW gup(write) side)

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Subject: [PATCH] mm, directio: fix fork vs direct-io race


ChangeLog:
V2 -> V3
   o remove early decow logic

V1 -> V2
   o add dio+aio logic

===============================================

Currently, following testcase is failed.

& dma_thread -a 512 -w 40

========== dma_thread.c =======
/* compile with 'gcc -g -o dma_thread dma_thread.c -lpthread' */

#define _GNU_SOURCE 1

#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <memory.h>
#include <pthread.h>
#include <getopt.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/wait.h>

#define FILESIZE (12*1024*1024) 
#define READSIZE  (1024*1024)

#define FILENAME    "test_%.04d.tmp"
#define FILECOUNT   100
#define MIN_WORKERS 2
#define MAX_WORKERS 256
#define PAGE_SIZE   4096

#define true	1
#define false	0

typedef int bool;

bool	done	= false;
int	workers = 2;

#define PATTERN (0xfa)

static void
usage (void)
{
    fprintf(stderr, "\nUsage: dma_thread [-h | -a <alignment> [ -w <workers>]\n"
		    "\nWith no arguments, generate test files and exit.\n"
		    "-h Display this help and exit.\n"
		    "-a align read buffer to offset <alignment>.\n"
		    "-w number of worker threads, 2 (default) to 256,\n"
		    "   defaults to number of cores.\n\n"

		    "Run first with no arguments to generate files.\n"
		    "Then run with -a <alignment> = 512  or 0. \n");
}

typedef struct {
    pthread_t	    tid;
    int		    worker_number;
    int		    fd;
    int		    offset;
    int		    length;
    int		    pattern;
    unsigned char  *buffer;
} worker_t;


void *worker_thread(void * arg)
{
    int		    bytes_read;
    int		    i,k;
    worker_t	   *worker  = (worker_t *) arg;
    int		    offset  = worker->offset;
    int		    fd	    = worker->fd;
    unsigned char  *buffer  = worker->buffer;
    int		    pattern = worker->pattern;
    int		    length  = worker->length;
    
    if (lseek(fd, offset, SEEK_SET) < 0) {
	fprintf(stderr, "Failed to lseek to %d on fd %d: %s.\n", 
			offset, fd, strerror(errno));
	exit(1);
    }

    bytes_read = read(fd, buffer, length);
    if (bytes_read != length) {
	fprintf(stderr, "read failed on fd %d: bytes_read %d, %s\n", 
			fd, bytes_read, strerror(errno));
	exit(1);
    }

    /* Corruption check */
    for (i = 0; i < length; i++) {
	if (buffer[i] != pattern) {
	    printf("Bad data at 0x%.06x: %p, \n", i, buffer + i);
	    printf("Data dump starting at 0x%.06x:\n", i - 8);
	    printf("Expect 0x%x followed by 0x%x:\n",
		    pattern, PATTERN);

	    for (k = 0; k < 16; k++) {
		printf("%02x ", buffer[i - 8 + k]);
		if (k == 7) {
		    printf("\n");
		}       
	    }

	    printf("\n");
	    abort();
	}
    }

    return 0;
}

void *fork_thread (void *arg) 
{
    pid_t pid;

    while (!done) {
	pid = fork();
	if (pid == 0) {
	    exit(0);
	} else if (pid < 0) {
	    fprintf(stderr, "Failed to fork child.\n");
	    exit(1);
	} 
	waitpid(pid, NULL, 0 );
	usleep(100);
    }

    return NULL;

}

int main(int argc, char *argv[])
{
    unsigned char  *buffer = NULL;
    char	    filename[1024];
    int		    fd;
    bool	    dowrite = true;
    pthread_t	    fork_tid;
    int		    c, n, j;
    worker_t	   *worker;
    int		    align = 0;
    int		    offset, rc;

    workers = sysconf(_SC_NPROCESSORS_ONLN);

    while ((c = getopt(argc, argv, "a:hw:")) != -1) {
	switch (c) {
	case 'a':
	    align = atoi(optarg);
	    if (align < 0 || align > PAGE_SIZE) {
		printf("Bad alignment %d.\n", align);
		exit(1);
	    }
	    dowrite = false;
	    break;

	case 'h':
	    usage();
	    exit(0);
	    break;

	case 'w':
	    workers = atoi(optarg);
	    if (workers < MIN_WORKERS || workers > MAX_WORKERS) {
		fprintf(stderr, "Worker count %d not between "
				"%d and %d, inclusive.\n",
				workers, MIN_WORKERS, MAX_WORKERS);
		usage();
		exit(1);
	    }
	    dowrite = false;
	    break;

	default:
	    usage();
	    exit(1);
	}
    }

    if (argc > 1 && (optind < argc)) {
	fprintf(stderr, "Bad command line.\n");
	usage();
	exit(1);
    }

    if (dowrite) {

	buffer = malloc(FILESIZE);
	if (buffer == NULL) {
	    fprintf(stderr, "Failed to malloc write buffer.\n");
	    exit(1);
	}

	for (n = 1; n <= FILECOUNT; n++) {
	    sprintf(filename, FILENAME, n);
	    fd = open(filename, O_RDWR|O_CREAT|O_TRUNC, 0666);
	    if (fd < 0) {
		printf("create failed(%s): %s.\n", filename, strerror(errno));
		exit(1);
	    }
	    memset(buffer, n, FILESIZE);
	    printf("Writing file %s.\n", filename);
	    if (write(fd, buffer, FILESIZE) != FILESIZE) {
		printf("write failed (%s)\n", filename);
	    }

	    close(fd);
	    fd = -1;
	}

	free(buffer);
	buffer = NULL;

	printf("done\n");
	exit(0);
    }

    printf("Using %d workers.\n", workers);

    worker = malloc(workers * sizeof(worker_t));
    if (worker == NULL) {
	fprintf(stderr, "Failed to malloc worker array.\n");
	exit(1);
    }

    for (j = 0; j < workers; j++) {
	worker[j].worker_number = j;
    }

    printf("Using alignment %d.\n", align);
    
    posix_memalign((void *)&buffer, PAGE_SIZE, READSIZE+ align);
    printf("Read buffer: %p.\n", buffer);
    for (n = 1; n <= FILECOUNT; n++) {

	sprintf(filename, FILENAME, n);
	for (j = 0; j < workers; j++) {
	    if ((worker[j].fd = open(filename,  O_RDONLY|O_DIRECT)) < 0) {
		fprintf(stderr, "Failed to open %s: %s.\n",
				filename, strerror(errno));
		exit(1);
	    }

	    worker[j].pattern = n;
	}

	printf("Reading file %d.\n", n);

	for (offset = 0; offset < FILESIZE; offset += READSIZE) {
	    memset(buffer, PATTERN, READSIZE + align);
	    for (j = 0; j < workers; j++) {
		worker[j].offset = offset + j * PAGE_SIZE;
		worker[j].buffer = buffer + align + j * PAGE_SIZE;
		worker[j].length = PAGE_SIZE;
	    }
	    /* The final worker reads whatever is left over. */
	    worker[workers - 1].length = READSIZE - PAGE_SIZE * (workers - 1);

	    done = 0;

	    rc = pthread_create(&fork_tid, NULL, fork_thread, NULL);
	    if (rc != 0) {
		fprintf(stderr, "Can't create fork thread: %s.\n", 
				strerror(rc));
		exit(1);
	    }

	    for (j = 0; j < workers; j++) {
		rc = pthread_create(&worker[j].tid, 
				    NULL, 
				    worker_thread, 
				    worker + j);
		if (rc != 0) {
		    fprintf(stderr, "Can't create worker thread %d: %s.\n", 
				    j, strerror(rc));
		    exit(1);
		}
	    }

	    for (j = 0; j < workers; j++) {
		rc = pthread_join(worker[j].tid, NULL);
		if (rc != 0) {
		    fprintf(stderr, "Failed to join worker thread %d: %s.\n",
				    j, strerror(rc));
		    exit(1);
		}
	    }

	    /* Let the fork thread know it's ok to exit */
	    done = 1;

	    rc = pthread_join(fork_tid, NULL);
	    if (rc != 0) {
		fprintf(stderr, "Failed to join fork thread: %s.\n",
				strerror(rc));
		exit(1);
	    }
	}

	/* Close the fd's for the next file. */
	for (j = 0; j < workers; j++) {
	    close(worker[j].fd);
	}
    }

    return 0;
}
========== dma_thread.c =======

Because following scenario happend.

   CPU0            CPU1                       CPU2                note
  (fork thread)    (worker thread1)           (worker thread2)
==========================================================================================
                   read()
                   | get_user_pages()
                   |
  fork             |                                              inc map_count and wprotect
                   |
                   |                         read()
                   |                         | get_user_pages()   COW break, CPU2 get copyed page,
                   |                         |                    but CPU1 still point to original page.
                   |                         |                    then the result of CPU1 transfer will be lost.
                   v                         |
                                             |
                                             |
                                             v


Actually, get_user_pages() (and get_user_pages_fast()) don't provide any pinning operation.
Caller must prevent fork in critical section.
access_process_vm() explain standard fork protection way, it use mmap_sem.

but, mmap_sem is very easy contended lock. it cause large performance regression to DirectIO.
Then, this patch introduce new lock for another fork prevent mechanism.
Almost application don't fork while DirectIO in progress, then mm_pinned_sem doesn't contend in almost case.


Also, this patch fix following aio+dio testcase.

========== forkscrew.c ========
/*
 * Copyright 2009, Red Hat, Inc.
 *
 * Author: Jeff Moyer <jmoyer@xxxxxxxxxx>
 *
 * This program attempts to expose a race between O_DIRECT I/O and the fork()
 * path in a multi-threaded program.  In order to reliably reproduce the
 * problem, it is best to perform a dd from the device under test to /dev/null
 * as this makes the read I/O slow enough to orchestrate the problem.
 *
 * Running:  ./forkscrew
 *
 * It is expected that a file name "data" exists in the current working
 * directory, and that its contents are something other than 0x2a.  A simple
 * dd if=/dev/zero of=data bs=1M count=1 should be sufficient.
 */
#define _GNU_SOURCE 1

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/wait.h>

#include <pthread.h>
#include <libaio.h>

pthread_cond_t worker_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t worker_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t fork_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t fork_mutex = PTHREAD_MUTEX_INITIALIZER;

char *buffer;
int fd;

/* pattern filled into the in-memory buffer */
#define PATTERN		0x2a  // '*'

void
usage(void)
{
	fprintf(stderr,
		"\nUsage: forkscrew\n"
		"it is expected that a file named \"data\" is the current\n"
		"working directory.  It should be at least 3*pagesize in size\n"
		);
}

void
dump_buffer(char *buf, int len)
{
	int i;
	int last_off, last_val;

	last_off = -1;
	last_val = -1;

	for (i = 0; i < len; i++) {
		if (last_off < 0) {
			last_off = i;
			last_val = buf[i];
			continue;
		}

		if (buf[i] != last_val) {
			printf("%d - %d: %d\n", last_off, i - 1, last_val);
			last_off = i;
			last_val = buf[i];
		}
	}

	if (last_off != len - 1)
		printf("%d - %d: %d\n", last_off, i-1, last_val);
}

int
check_buffer(char *bufp, int len, int pattern)
{
	int i;

	for (i = 0; i < len; i++) {
		if (bufp[i] == pattern)
			return 1;
	}
	return 0;
}

void *
forker_thread(void *arg)
{
	pthread_mutex_lock(&fork_mutex);
	pthread_cond_signal(&fork_cond);
	pthread_cond_wait(&fork_cond, &fork_mutex);
	switch (fork()) {
	case 0:
		sleep(1);
		printf("child dumping buffer:\n");
		dump_buffer(buffer + 512, 2*getpagesize());
		exit(0);
	case -1:
		perror("fork");
		exit(1);
	default:
		break;
	}
	pthread_cond_signal(&fork_cond);
	pthread_mutex_unlock(&fork_mutex);

	wait(NULL);
	return (void *)0;
}

void *
worker(void *arg)
{
	int first = (int)arg;
	char *bufp;
	int pagesize = getpagesize();
	int ret;
	int corrupted = 0;

	if (first) {
		io_context_t aioctx;
		struct io_event event;
		struct iocb *iocb = malloc(sizeof *iocb);
		if (!iocb) {
			perror("malloc");
			exit(1);
		}
		memset(&aioctx, 0, sizeof(aioctx));
		ret = io_setup(1, &aioctx);
		if (ret != 0) {
			errno = -ret;
			perror("io_setup");
			exit(1);
		}
		bufp = buffer + 512;
		io_prep_pread(iocb, fd, bufp, pagesize, 0);

		/* submit the I/O */
		io_submit(aioctx, 1, &iocb);

		/* tell the fork thread to run */
		pthread_mutex_lock(&fork_mutex);
		pthread_cond_signal(&fork_cond);

		/* wait for the fork to happen */
		pthread_cond_wait(&fork_cond, &fork_mutex);
		pthread_mutex_unlock(&fork_mutex);

		/* release the other worker to issue I/O */
		pthread_mutex_lock(&worker_mutex);
		pthread_cond_signal(&worker_cond);
		pthread_mutex_unlock(&worker_mutex);

		ret = io_getevents(aioctx, 1, 1, &event, NULL);
		if (ret != 1) {
			errno = -ret;
			perror("io_getevents");
			exit(1);
		}
		if (event.res != pagesize) {
			errno = -event.res;
			perror("read error");
			exit(1);
		}

		io_destroy(aioctx);

		/* check buffer, should be corrupt */
		if (check_buffer(bufp, pagesize, PATTERN)) {
			printf("worker 0 failed check\n");
			dump_buffer(bufp, pagesize);
			corrupted = 1;
		}

	} else {

		bufp = buffer + 512 + pagesize;

		pthread_mutex_lock(&worker_mutex);
		pthread_cond_signal(&worker_cond); /* tell main we're ready */
		/* wait for the first I/O and the fork */
		pthread_cond_wait(&worker_cond, &worker_mutex);
		pthread_mutex_unlock(&worker_mutex);

		/* submit overlapping I/O */
		ret = read(fd, bufp, pagesize);
		if (ret != pagesize) {
			perror("read");
			exit(1);
		}
		/* check buffer, should be fine */
		if (check_buffer(bufp, pagesize, PATTERN)) {
			printf("worker 1 failed check -- abnormal\n");
			dump_buffer(bufp, pagesize);
			corrupted = 1;
		}
	}

	return (void *)corrupted;
}

int
main(int argc, char **argv)
{
	pthread_t workers[2];
	pthread_t forker;
	int ret, rc = 0;
	void *thread_ret;
	int pagesize = getpagesize();

	fd = open("data", O_DIRECT|O_RDONLY);
	if (fd < 0) {
		perror("open");
		exit(1);
	}

	ret = posix_memalign(&buffer, pagesize, 3 * pagesize);
	if (ret != 0) {
		errno = ret;
		perror("posix_memalign");
		exit(1);
	}
	memset(buffer, PATTERN, 3*pagesize);

	pthread_mutex_lock(&fork_mutex);
	ret = pthread_create(&forker, NULL, forker_thread, NULL);
	pthread_cond_wait(&fork_cond, &fork_mutex);
	pthread_mutex_unlock(&fork_mutex);

	pthread_mutex_lock(&worker_mutex);
	ret |= pthread_create(&workers[0], NULL, worker, (void *)0);
	if (ret) {
		perror("pthread_create");
		exit(1);
	}
	pthread_cond_wait(&worker_cond, &worker_mutex);
	pthread_mutex_unlock(&worker_mutex);

	ret = pthread_create(&workers[1], NULL, worker, (void *)1);
	if (ret != 0) {
		perror("pthread_create");
		exit(1);
	}

	pthread_join(forker, NULL);
	pthread_join(workers[0], &thread_ret);
	if (thread_ret != 0)
		rc = 1;
	pthread_join(workers[1], &thread_ret);
	if (thread_ret != 0)
		rc = 1;

	if (rc != 0) {
		printf("parent dumping full buffer\n");
		dump_buffer(buffer + 512, 2 * pagesize);
	}

	close(fd);
	free(buffer);
	exit(rc);
}
========== forkscrew.c ========


Signed-off-by: KOSAKI Motohiro <kosaki.motohiro@xxxxxxxxxxxxxx>
Sugessted-by: Linus Torvalds <torvalds@xxxxxxxx>
Cc: Hugh Dickins <hugh@xxxxxxxxxxx>
Cc: Andrew Morton <akpm@xxxxxxxx>
Cc: Nick Piggin <nickpiggin@xxxxxxxxxxxx>
Cc: Andrea Arcangeli <aarcange@xxxxxxxxxx>
Cc: Jeff Moyer <jmoyer@xxxxxxxxxx>
Cc: Zach Brown <zach.brown@xxxxxxxxxx>
Cc: Andy Grover <andy.grover@xxxxxxxxxx>
Cc: linux-fsdevel@xxxxxxxxxxxxxxx
Cc: linux-mm@xxxxxxxxx
---
 fs/direct-io.c            |   16 ++++++++++++++++
 include/linux/init_task.h |    1 +
 include/linux/mm_types.h  |    6 ++++++
 kernel/fork.c             |    3 +++
 4 files changed, 26 insertions(+)

Index: b/fs/direct-io.c
===================================================================
--- a/fs/direct-io.c	2009-04-13 00:24:01.000000000 +0900
+++ b/fs/direct-io.c	2009-04-13 01:36:37.000000000 +0900
@@ -131,6 +131,9 @@ struct dio {
 	int is_async;			/* is IO async ? */
 	int io_error;			/* IO error in completion path */
 	ssize_t result;                 /* IO result */
+
+	/* fork exclusive stuff */
+	struct mm_struct *mm;
 };
 
 /*
@@ -244,6 +247,12 @@ static int dio_complete(struct dio *dio,
 		/* lockdep: non-owner release */
 		up_read_non_owner(&dio->inode->i_alloc_sem);
 
+	if (dio->rw == READ) {
+		BUG_ON(!dio->mm);
+		up_read_non_owner(&dio->mm->mm_pinned_sem);
+		mmdrop(dio->mm);
+	}
+
 	if (ret == 0)
 		ret = dio->page_errors;
 	if (ret == 0)
@@ -942,6 +951,7 @@ direct_io_worker(int rw, struct kiocb *i
 	ssize_t ret = 0;
 	ssize_t ret2;
 	size_t bytes;
+	struct mm_struct *mm;
 
 	dio->inode = inode;
 	dio->rw = rw;
@@ -960,6 +970,12 @@ direct_io_worker(int rw, struct kiocb *i
 	spin_lock_init(&dio->bio_lock);
 	dio->refcount = 1;
 
+	if (rw == READ) {
+		mm = dio->mm = current->mm;
+		atomic_inc(&mm->mm_count);
+		down_read_non_owner(&mm->mm_pinned_sem);
+	}
+
 	/*
 	 * In case of non-aligned buffers, we may need 2 more
 	 * pages since we need to zero out first and last block.
Index: b/include/linux/init_task.h
===================================================================
--- a/include/linux/init_task.h	2009-04-13 00:24:01.000000000 +0900
+++ b/include/linux/init_task.h	2009-04-13 00:24:32.000000000 +0900
@@ -37,6 +37,7 @@ extern struct fs_struct init_fs;
 	.page_table_lock =  __SPIN_LOCK_UNLOCKED(name.page_table_lock),	\
 	.mmlist		= LIST_HEAD_INIT(name.mmlist),		\
 	.cpu_vm_mask	= CPU_MASK_ALL,				\
+	.mm_pinned_sem	= __RWSEM_INITIALIZER(name.mm_pinned_sem), \
 }
 
 #define INIT_SIGNALS(sig) {						\
Index: b/include/linux/mm_types.h
===================================================================
--- a/include/linux/mm_types.h	2009-04-13 00:24:01.000000000 +0900
+++ b/include/linux/mm_types.h	2009-04-13 00:24:32.000000000 +0900
@@ -274,6 +274,12 @@ struct mm_struct {
 #ifdef CONFIG_MMU_NOTIFIER
 	struct mmu_notifier_mm *mmu_notifier_mm;
 #endif
+
+	/*
+	 * if there are on-flight directio or similar pinning action,
+	 * COW cause memory corruption. the sem protect it by preventing fork.
+	 */
+	struct rw_semaphore mm_pinned_sem;
 };
 
 /* Future-safe accessor for struct mm_struct's cpu_vm_mask. */
Index: b/kernel/fork.c
===================================================================
--- a/kernel/fork.c	2009-04-13 00:24:01.000000000 +0900
+++ b/kernel/fork.c	2009-04-13 00:24:32.000000000 +0900
@@ -266,6 +266,7 @@ static int dup_mmap(struct mm_struct *mm
 	unsigned long charge;
 	struct mempolicy *pol;
 
+	down_write(&oldmm->mm_pinned_sem);
 	down_write(&oldmm->mmap_sem);
 	flush_cache_dup_mm(oldmm);
 	/*
@@ -368,6 +369,7 @@ out:
 	up_write(&mm->mmap_sem);
 	flush_tlb_mm(oldmm);
 	up_write(&oldmm->mmap_sem);
+	up_write(&oldmm->mm_pinned_sem);
 	return retval;
 fail_nomem_policy:
 	kmem_cache_free(vm_area_cachep, tmp);
@@ -431,6 +433,7 @@ static struct mm_struct * mm_init(struct
 	mm->free_area_cache = TASK_UNMAPPED_BASE;
 	mm->cached_hole_size = ~0UL;
 	mm_init_owner(mm, p);
+	init_rwsem(&mm->mm_pinned_sem);
 
 	if (likely(!mm_alloc_pgd(mm))) {
 		mm->def_flags = 0;


--
To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html

[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [Samba]     [Device Mapper]     [CEPH Development]
  Powered by Linux