[PATCH RFC 2/5] libext2fs: add threading support to the I/O manager abstraction

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Add initial implementation support for the unix_io manager.
Applications which want to use threading should pass in
IO_FLAG_THREADS when opening the channel.  Channels which support
threading (which as of this commit is unix_io and test_io if the
backing io_manager supports threading) will set the
CHANNEL_FLAGS_THREADS bit in io->flags.  Library code or applications
can test if threading is enabled by checking this flag.

Applications using libext2fs can pass in EXT2_FLAG_THREADS to
ext2fs_open() or ext2fs_open2() to request threading support.

Signed-off-by: Theodore Ts'o <tytso@xxxxxxx>
---
 lib/ext2fs/ext2_io.h |   2 +
 lib/ext2fs/ext2fs.h  |   1 +
 lib/ext2fs/openfs.c  |   2 +
 lib/ext2fs/test_io.c |   6 +-
 lib/ext2fs/undo_io.c |   2 +
 lib/ext2fs/unix_io.c | 137 ++++++++++++++++++++++++++++++++++++++-----
 6 files changed, 133 insertions(+), 17 deletions(-)

diff --git a/lib/ext2fs/ext2_io.h b/lib/ext2fs/ext2_io.h
index 5540900a5..2e0da5a53 100644
--- a/lib/ext2fs/ext2_io.h
+++ b/lib/ext2fs/ext2_io.h
@@ -33,6 +33,7 @@ typedef struct struct_io_stats *io_stats;
 #define CHANNEL_FLAGS_WRITETHROUGH	0x01
 #define CHANNEL_FLAGS_DISCARD_ZEROES	0x02
 #define CHANNEL_FLAGS_BLOCK_DEVICE	0x04
+#define CHANNEL_FLAGS_THREADS		0x08
 
 #define io_channel_discard_zeroes_data(i) (i->flags & CHANNEL_FLAGS_DISCARD_ZEROES)
 
@@ -104,6 +105,7 @@ struct struct_io_manager {
 #define IO_FLAG_EXCLUSIVE	0x0002
 #define IO_FLAG_DIRECT_IO	0x0004
 #define IO_FLAG_FORCE_BOUNCE	0x0008
+#define IO_FLAG_THREADS		0x0010
 
 /*
  * Convenience functions....
diff --git a/lib/ext2fs/ext2fs.h b/lib/ext2fs/ext2fs.h
index 69c8a3ff0..5955c3ae9 100644
--- a/lib/ext2fs/ext2fs.h
+++ b/lib/ext2fs/ext2fs.h
@@ -206,6 +206,7 @@ typedef struct ext2_file *ext2_file_t;
 #define EXT2_FLAG_IGNORE_SB_ERRORS	0x800000
 #define EXT2_FLAG_BBITMAP_TAIL_PROBLEM	0x1000000
 #define EXT2_FLAG_IBITMAP_TAIL_PROBLEM	0x2000000
+#define EXT2_FLAG_THREADS		0x4000000
 
 /*
  * Special flag in the ext2 inode i_flag field that means that this is
diff --git a/lib/ext2fs/openfs.c b/lib/ext2fs/openfs.c
index 3ed1e25cd..5ec8ed5c1 100644
--- a/lib/ext2fs/openfs.c
+++ b/lib/ext2fs/openfs.c
@@ -170,6 +170,8 @@ errcode_t ext2fs_open2(const char *name, const char *io_options,
 		io_flags |= IO_FLAG_EXCLUSIVE;
 	if (flags & EXT2_FLAG_DIRECT_IO)
 		io_flags |= IO_FLAG_DIRECT_IO;
+	if (flags & EXT2_FLAG_THREADS)
+		io_flags |= IO_FLAG_THREADS;
 	retval = manager->open(fs->device_name, io_flags, &fs->io);
 	if (retval)
 		goto cleanup;
diff --git a/lib/ext2fs/test_io.c b/lib/ext2fs/test_io.c
index ee828be7a..480e68fcc 100644
--- a/lib/ext2fs/test_io.c
+++ b/lib/ext2fs/test_io.c
@@ -197,6 +197,7 @@ static errcode_t test_open(const char *name, int flags, io_channel *channel)
 	io->read_error = 0;
 	io->write_error = 0;
 	io->refcount = 1;
+	io->flags = 0;
 
 	memset(data, 0, sizeof(struct test_private_data));
 	data->magic = EXT2_ET_MAGIC_TEST_IO_CHANNEL;
@@ -237,8 +238,11 @@ static errcode_t test_open(const char *name, int flags, io_channel *channel)
 	if ((value = safe_getenv("TEST_IO_WRITE_ABORT")) != NULL)
 		data->write_abort_count = strtoul(value, NULL, 0);
 
-	if (data->real)
+	if (data->real) {
 		io->align = data->real->align;
+		if (data->real->flags & CHANNEL_FLAGS_THREADS)
+			io->flags |= CHANNEL_FLAGS_THREADS;
+	}
 
 	*channel = io;
 	return 0;
diff --git a/lib/ext2fs/undo_io.c b/lib/ext2fs/undo_io.c
index 198624145..eb56f53d5 100644
--- a/lib/ext2fs/undo_io.c
+++ b/lib/ext2fs/undo_io.c
@@ -698,6 +698,8 @@ static errcode_t undo_open(const char *name, int flags, io_channel *channel)
 	int		undo_fd = -1;
 	errcode_t	retval;
 
+	/* We don't support multi-threading, at least for now */
+	flags &= ~IO_FLAG_THREADS;
 	if (name == 0)
 		return EXT2_ET_BAD_DEVICE_NAME;
 	retval = ext2fs_get_mem(sizeof(struct struct_io_channel), &io);
diff --git a/lib/ext2fs/unix_io.c b/lib/ext2fs/unix_io.c
index 628e60c39..9385487d9 100644
--- a/lib/ext2fs/unix_io.c
+++ b/lib/ext2fs/unix_io.c
@@ -67,6 +67,9 @@
 #if HAVE_LINUX_FALLOC_H
 #include <linux/falloc.h>
 #endif
+#ifdef HAVE_PTHREAD
+#include <pthread.h>
+#endif
 
 #if defined(__linux__) && defined(_IO) && !defined(BLKROGET)
 #define BLKROGET   _IO(0x12, 94) /* Get read-only status (0 = read_write).  */
@@ -107,11 +110,58 @@ struct unix_private_data {
 	struct unix_cache cache[CACHE_SIZE];
 	void	*bounce;
 	struct struct_io_stats io_stats;
+#ifdef HAVE_PTHREAD
+	pthread_mutex_t cache_mutex;
+	pthread_mutex_t bounce_mutex;
+	pthread_mutex_t stats_mutex;
+#endif
 };
 
 #define IS_ALIGNED(n, align) ((((uintptr_t) n) & \
 			       ((uintptr_t) ((align)-1))) == 0)
 
+typedef enum lock_kind {
+	CACHE_MTX, BOUNCE_MTX, STATS_MTX
+} kind_t;
+
+#ifdef HAVE_PTHREAD
+static inline pthread_mutex_t *get_mutex(struct unix_private_data *data,
+					 kind_t kind)
+{
+	if (data->flags & IO_FLAG_THREADS) {
+		switch (kind) {
+		case CACHE_MTX:
+			return &data->cache_mutex;
+		case BOUNCE_MTX:
+			return &data->bounce_mutex;
+		case STATS_MTX:
+			return &data->stats_mutex;
+		}
+	}
+	return NULL;
+}
+#endif
+
+static inline void mutex_lock(struct unix_private_data *data, kind_t kind)
+{
+#ifdef HAVE_PTHREAD
+	pthread_mutex_t *mtx = get_mutex(data,kind);
+
+	if (mtx)
+		pthread_mutex_lock(mtx);
+#endif
+}
+
+static inline void mutex_unlock(struct unix_private_data *data, kind_t kind)
+{
+#ifdef HAVE_PTHREAD
+	pthread_mutex_t *mtx = get_mutex(data,kind);
+
+	if (mtx)
+		pthread_mutex_unlock(mtx);
+#endif
+}
+
 static errcode_t unix_get_stats(io_channel channel, io_stats *stats)
 {
 	errcode_t	retval = 0;
@@ -122,8 +172,11 @@ static errcode_t unix_get_stats(io_channel channel, io_stats *stats)
 	data = (struct unix_private_data *) channel->private_data;
 	EXT2_CHECK_MAGIC(data, EXT2_ET_MAGIC_UNIX_IO_CHANNEL);
 
-	if (stats)
+	if (stats) {
+		mutex_lock(data, STATS_MTX);
 		*stats = &data->io_stats;
+		mutex_unlock(data, STATS_MTX);
+	}
 
 	return retval;
 }
@@ -167,7 +220,9 @@ static errcode_t raw_read_blk(io_channel channel,
 	ssize_t		really_read = 0;
 
 	size = (count < 0) ? -count : (ext2_loff_t) count * channel->block_size;
+	mutex_lock(data, STATS_MTX);
 	data->io_stats.bytes_read += size;
+	mutex_unlock(data, STATS_MTX);
 	location = ((ext2_loff_t) block * channel->block_size) + data->offset;
 
 	if (data->flags & IO_FLAG_FORCE_BOUNCE) {
@@ -232,8 +287,10 @@ static errcode_t raw_read_blk(io_channel channel,
 	 */
 bounce_read:
 	while (size > 0) {
+		mutex_lock(data, BOUNCE_MTX);
 		actual = read(data->dev, data->bounce, channel->block_size);
 		if (actual != channel->block_size) {
+			mutex_unlock(data, BOUNCE_MTX);
 			actual = really_read;
 			buf -= really_read;
 			size += really_read;
@@ -246,6 +303,7 @@ bounce_read:
 		really_read += actual;
 		size -= actual;
 		buf += actual;
+		mutex_unlock(data, BOUNCE_MTX);
 	}
 	return 0;
 
@@ -277,7 +335,9 @@ static errcode_t raw_write_blk(io_channel channel,
 		else
 			size = (ext2_loff_t) count * channel->block_size;
 	}
+	mutex_lock(data, STATS_MTX);
 	data->io_stats.bytes_written += size;
+	mutex_unlock(data, STATS_MTX);
 
 	location = ((ext2_loff_t) block * channel->block_size) + data->offset;
 
@@ -341,11 +401,13 @@ static errcode_t raw_write_blk(io_channel channel,
 	 */
 bounce_write:
 	while (size > 0) {
+		mutex_lock(data, BOUNCE_MTX);
 		if (size < channel->block_size) {
 			actual = read(data->dev, data->bounce,
 				      channel->block_size);
 			if (actual != channel->block_size) {
 				if (actual < 0) {
+					mutex_unlock(data, BOUNCE_MTX);
 					retval = errno;
 					goto error_out;
 				}
@@ -362,6 +424,7 @@ bounce_write:
 			goto error_out;
 		}
 		actual = write(data->dev, data->bounce, channel->block_size);
+		mutex_unlock(data, BOUNCE_MTX);
 		if (actual < 0) {
 			retval = errno;
 			goto error_out;
@@ -481,24 +544,28 @@ static void reuse_cache(io_channel channel, struct unix_private_data *data,
 	cache->access_time = ++data->access_time;
 }
 
+#define FLUSH_INVALIDATE	0x01
+#define FLUSH_NOLOCK		0x02
+
 /*
  * Flush all of the blocks in the cache
  */
 static errcode_t flush_cached_blocks(io_channel channel,
 				     struct unix_private_data *data,
-				     int invalidate)
-
+				     int flags)
 {
 	struct unix_cache	*cache;
 	errcode_t		retval, retval2;
 	int			i;
 
 	retval2 = 0;
+	if ((flags & FLUSH_NOLOCK) == 0)
+		mutex_lock(data, CACHE_MTX);
 	for (i=0, cache = data->cache; i < CACHE_SIZE; i++, cache++) {
 		if (!cache->in_use)
 			continue;
 
-		if (invalidate)
+		if (flags & FLUSH_INVALIDATE)
 			cache->in_use = 0;
 
 		if (!cache->dirty)
@@ -511,6 +578,8 @@ static errcode_t flush_cached_blocks(io_channel channel,
 		else
 			cache->dirty = 0;
 	}
+	if ((flags & FLUSH_NOLOCK) == 0)
+		mutex_unlock(data, CACHE_MTX);
 	return retval2;
 }
 #endif /* NO_IO_CACHE */
@@ -597,6 +666,7 @@ static errcode_t unix_open_channel(const char *name, int fd,
 	io->read_error = 0;
 	io->write_error = 0;
 	io->refcount = 1;
+	io->flags = 0;
 
 	memset(data, 0, sizeof(struct unix_private_data));
 	data->magic = EXT2_ET_MAGIC_UNIX_IO_CHANNEL;
@@ -703,6 +773,25 @@ static errcode_t unix_open_channel(const char *name, int fd,
 			setrlimit(RLIMIT_FSIZE, &rlim);
 		}
 	}
+#endif
+#ifdef HAVE_PTHREAD
+	if (flags & IO_FLAG_THREADS) {
+		io->flags |= CHANNEL_FLAGS_THREADS;
+		retval = pthread_mutex_init(&data->cache_mutex, NULL);
+		if (retval)
+			goto cleanup;
+		retval = pthread_mutex_init(&data->bounce_mutex, NULL);
+		if (retval) {
+			pthread_mutex_destroy(&data->cache_mutex);
+			goto cleanup;
+		}
+		retval = pthread_mutex_init(&data->stats_mutex, NULL);
+		if (retval) {
+			pthread_mutex_destroy(&data->cache_mutex);
+			pthread_mutex_destroy(&data->bounce_mutex);
+			goto cleanup;
+		}
+	}
 #endif
 	*channel = io;
 	return 0;
@@ -796,6 +885,13 @@ static errcode_t unix_close(io_channel channel)
 	if (close(data->dev) < 0)
 		retval = errno;
 	free_cache(data);
+#ifdef HAVE_PTHREAD
+	if (data->flags & IO_FLAG_THREADS) {
+		pthread_mutex_destroy(&data->cache_mutex);
+		pthread_mutex_destroy(&data->bounce_mutex);
+		pthread_mutex_destroy(&data->stats_mutex);
+	}
+#endif
 
 	ext2fs_free_mem(&channel->private_data);
 	if (channel->name)
@@ -807,24 +903,27 @@ static errcode_t unix_close(io_channel channel)
 static errcode_t unix_set_blksize(io_channel channel, int blksize)
 {
 	struct unix_private_data *data;
-	errcode_t		retval;
+	errcode_t		retval = 0;
 
 	EXT2_CHECK_MAGIC(channel, EXT2_ET_MAGIC_IO_CHANNEL);
 	data = (struct unix_private_data *) channel->private_data;
 	EXT2_CHECK_MAGIC(data, EXT2_ET_MAGIC_UNIX_IO_CHANNEL);
 
 	if (channel->block_size != blksize) {
+		mutex_lock(data, CACHE_MTX);
+		mutex_lock(data, BOUNCE_MTX);
 #ifndef NO_IO_CACHE
-		if ((retval = flush_cached_blocks(channel, data, 0)))
+		if ((retval = flush_cached_blocks(channel, data, FLUSH_NOLOCK)))
 			return retval;
 #endif
 
 		channel->block_size = blksize;
 		free_cache(data);
-		if ((retval = alloc_cache(channel, data)))
-			return retval;
+		retval = alloc_cache(channel, data);
+		mutex_unlock(data, BOUNCE_MTX);
+		mutex_unlock(data, CACHE_MTX);
 	}
-	return 0;
+	return retval;
 }
 
 static errcode_t unix_read_blk64(io_channel channel, unsigned long long block,
@@ -832,7 +931,7 @@ static errcode_t unix_read_blk64(io_channel channel, unsigned long long block,
 {
 	struct unix_private_data *data;
 	struct unix_cache *cache, *reuse[READ_DIRECT_SIZE];
-	errcode_t	retval;
+	errcode_t	retval = 0;
 	char		*cp;
 	int		i, j;
 
@@ -854,6 +953,7 @@ static errcode_t unix_read_blk64(io_channel channel, unsigned long long block,
 	}
 
 	cp = buf;
+	mutex_lock(data, CACHE_MTX);
 	while (count > 0) {
 		/* If it's in the cache, use it! */
 		if ((cache = find_cached_block(data, block, &reuse[0]))) {
@@ -876,10 +976,11 @@ static errcode_t unix_read_blk64(io_channel channel, unsigned long long block,
 			if ((retval = raw_read_blk(channel, data, block, 1,
 						   cache->buf))) {
 				cache->in_use = 0;
-				return retval;
+				break;
 			}
 			memcpy(cp, cache->buf, channel->block_size);
-			return 0;
+			retval = 0;
+			break;
 		}
 
 		/*
@@ -893,7 +994,7 @@ static errcode_t unix_read_blk64(io_channel channel, unsigned long long block,
 		printf("Reading %d blocks starting at %lu\n", i, block);
 #endif
 		if ((retval = raw_read_blk(channel, data, block, i, cp)))
-			return retval;
+			break;
 
 		/* Save the results in the cache */
 		for (j=0; j < i; j++) {
@@ -904,7 +1005,8 @@ static errcode_t unix_read_blk64(io_channel channel, unsigned long long block,
 			cp += channel->block_size;
 		}
 	}
-	return 0;
+	mutex_unlock(data, CACHE_MTX);
+	return retval;
 #endif /* NO_IO_CACHE */
 }
 
@@ -935,7 +1037,8 @@ static errcode_t unix_write_blk64(io_channel channel, unsigned long long block,
 	 * flush out the cache completely and then do a direct write.
 	 */
 	if (count < 0 || count > WRITE_DIRECT_SIZE) {
-		if ((retval = flush_cached_blocks(channel, data, 1)))
+		if ((retval = flush_cached_blocks(channel, data,
+						  FLUSH_INVALIDATE)))
 			return retval;
 		return raw_write_blk(channel, data, block, count, buf);
 	}
@@ -950,6 +1053,7 @@ static errcode_t unix_write_blk64(io_channel channel, unsigned long long block,
 		retval = raw_write_blk(channel, data, block, count, buf);
 
 	cp = buf;
+	mutex_lock(data, CACHE_MTX);
 	while (count > 0) {
 		cache = find_cached_block(data, block, &reuse);
 		if (!cache) {
@@ -963,6 +1067,7 @@ static errcode_t unix_write_blk64(io_channel channel, unsigned long long block,
 		block++;
 		cp += channel->block_size;
 	}
+	mutex_unlock(data, CACHE_MTX);
 	return retval;
 #endif /* NO_IO_CACHE */
 }
@@ -1013,7 +1118,7 @@ static errcode_t unix_write_byte(io_channel channel, unsigned long offset,
 	/*
 	 * Flush out the cache completely
 	 */
-	if ((retval = flush_cached_blocks(channel, data, 1)))
+	if ((retval = flush_cached_blocks(channel, data, FLUSH_INVALIDATE)))
 		return retval;
 #endif
 
-- 
2.28.0




[Index of Archives]     [Reiser Filesystem Development]     [Ceph FS]     [Kernel Newbies]     [Security]     [Netfilter]     [Bugtraq]     [Linux FS]     [Yosemite National Park]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Samba]     [Device Mapper]     [Linux Media]

  Powered by Linux