[PATCH 1/2] ceph: add a new blkcg policy for cephfs

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



From: Xuehan Xu <xuxuehan@xxxxxx>

this policy is supposed to facilitate limiting
the metadata ops or data ops issued to the underlying
cluster.

Signed-off-by: Xuehan Xu <xuxuehan@xxxxxx>
---
 fs/ceph/Kconfig                     |   8 +
 fs/ceph/Makefile                    |   1 +
 fs/ceph/ceph_io_policy.c            | 445 ++++++++++++++++++++++++++++
 include/linux/ceph/ceph_io_policy.h |  74 +++++
 include/linux/ceph/osd_client.h     |   7 +
 net/ceph/ceph_common.c              |  13 +
 6 files changed, 548 insertions(+)
 create mode 100644 fs/ceph/ceph_io_policy.c
 create mode 100644 include/linux/ceph/ceph_io_policy.h

diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig
index 7f7d92d6b024..0bbd39a66e11 100644
--- a/fs/ceph/Kconfig
+++ b/fs/ceph/Kconfig
@@ -36,3 +36,11 @@ config CEPH_FS_POSIX_ACL
 	  groups beyond the owner/group/world scheme.
 
 	  If you don't know what Access Control Lists are, say N
+
+config CEPH_LIB_IO_POLICY
+	bool "Use the ceph-specific blkcg policy to limit io reqs"
+	depends on CEPH_LIB
+	default n
+	help
+	  If you say Y here, the blkcg policy will be inited when
+	  libceph module is loaded
diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
index a699e320393f..bb5d468abc9e 100644
--- a/fs/ceph/Makefile
+++ b/fs/ceph/Makefile
@@ -12,3 +12,4 @@ ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
 
 ceph-$(CONFIG_CEPH_FSCACHE) += cache.o
 ceph-$(CONFIG_CEPH_FS_POSIX_ACL) += acl.o
+ceph-$(CONFIG_CEPH_LIB_IO_POLICY) += ceph_io_policy.o
diff --git a/fs/ceph/ceph_io_policy.c b/fs/ceph/ceph_io_policy.c
new file mode 100644
index 000000000000..e7edc07de11a
--- /dev/null
+++ b/fs/ceph/ceph_io_policy.c
@@ -0,0 +1,445 @@
+#include <linux/ceph/ceph_io_policy.h>
+#include <linux/slab.h>
+
+enum {
+	CEPH_MDS_META_OPS,
+	CEPH_MDS_META_OPS_IOPS,
+	CEPH_OSD_DATA_OPS,
+	CEPH_OSD_DATA_OPS_IOPS,
+	CEPH_OSD_DATA_OPS_BANDWIDTH,
+};
+
+static void put_token(struct token_bucket_throttle* ptbt, u64 tick_interval)
+{
+    struct token_bucket* ptb = NULL;
+    u64 tokens_to_put = 0;
+    int i = 0;
+
+    for (i = 0; i < ptbt->tb_num; i++) {
+        ptb = &ptbt->tb[i];
+
+        if (!ptb->max)
+            continue;
+
+        tokens_to_put = ptb->target_throughput * tick_interval / HZ;
+
+        if (ptb->remain + tokens_to_put >= ptb->max)
+            ptb->remain = ptb->max;
+        else
+            ptb->remain += tokens_to_put;
+        pr_debug("%s: put_token: token bucket remain: %lld\n", __func__, ptb->remain);
+    }
+}
+
+static bool should_wait(struct token_bucket_throttle* ptbt, struct queue_item* qitem)
+{
+    struct token_bucket* ptb = NULL;
+    int i = 0;
+
+    BUG_ON(ptbt->tb_num != qitem->tb_item_num);
+    for (i = 0; i < ptbt->tb_num; i++) {
+        ptb = &ptbt->tb[i];
+
+        if (!ptb->max)
+            continue;
+
+        if (ptb->remain < qitem->tokens_requested[i])
+            return true;
+    }
+    return false;
+}
+
+static void get_token(struct token_bucket_throttle* ptbt, struct queue_item* qitem)
+{
+    struct token_bucket* ptb = NULL;
+    int i = 0;
+    BUG_ON(should_wait(ptbt, qitem));
+
+    for (i = 0; i < ptbt->tb_num; i++) {
+        ptb = &ptbt->tb[i];
+        if (!ptb->max)
+            continue;
+        ptb->remain -= qitem->tokens_requested[i];
+    }
+}
+
+void schedule_token_bucket_throttle_tick(struct token_bucket_throttle* ptbt, u64 tick_interval)
+{
+    if (tick_interval)
+        schedule_delayed_work(&ptbt->tick_work, tick_interval);
+}
+EXPORT_SYMBOL(schedule_token_bucket_throttle_tick);
+
+void token_bucket_throttle_tick(struct work_struct* work)
+{
+    struct token_bucket_throttle* ptbt = 
+        container_of(work, struct token_bucket_throttle, tick_work.work);
+    struct queue_item* req = NULL, *tmp = NULL;
+    LIST_HEAD(reqs_to_go);
+    u64 tick_interval = ptbt->tick_interval;
+
+    mutex_lock(&ptbt->bucket_lock);
+    put_token(ptbt, tick_interval);
+    if (!tick_interval)
+        pr_debug("%s: tick_interval set to 0, turning off the throttle, item: %p\n", __func__, req);
+
+    list_for_each_entry_safe(req, tmp, &ptbt->reqs_blocked, token_bucket_throttle_item) {
+        pr_debug("%s: waiting item: %p\n", __func__, req);
+        if (tick_interval) {
+            if (should_wait(ptbt, req))
+                break;
+            get_token(ptbt, req);
+        }
+        list_del_init(&req->token_bucket_throttle_item);
+        list_add_tail(&req->token_bucket_throttle_item, &reqs_to_go);
+        pr_debug("%s: tokens got for req: %p\n", __func__, req);
+    }
+    mutex_unlock(&ptbt->bucket_lock);
+
+    list_for_each_entry_safe(req, tmp, &reqs_to_go, token_bucket_throttle_item) {
+        pr_debug("%s: notifying req: %p, list head: %p\n", __func__, req, &reqs_to_go);
+        complete_all(&req->throttled);
+        list_del_init(&req->token_bucket_throttle_item);
+    }
+
+    if (tick_interval)
+        schedule_token_bucket_throttle_tick(ptbt, tick_interval);
+}
+EXPORT_SYMBOL(token_bucket_throttle_tick);
+
+int get_token_bucket_throttle(struct token_bucket_throttle* ptbt, struct queue_item* req)
+{
+    int ret = 0;
+    long timeleft = 0;
+
+    mutex_lock(&ptbt->bucket_lock);
+    if (should_wait(ptbt, req)) {
+        pr_debug("%s: wait for tokens, req: %p\n", __func__, req);
+        list_add_tail(&req->token_bucket_throttle_item, &ptbt->reqs_blocked);
+        mutex_unlock(&ptbt->bucket_lock);
+        timeleft = wait_for_completion_killable_timeout(&req->throttled, req->tbt_timeout ?: MAX_SCHEDULE_TIMEOUT);
+        if (timeleft > 0)
+            ret = 0;
+        else {
+		if (!timeleft)
+			ret = -EIO; /* timed out */
+		else {
+			/* killed */
+			pr_debug("%s: killed, req: %p\n", __func__, req);
+			ret = timeleft;
+		}
+		mutex_lock(&ptbt->bucket_lock);
+		if (!list_empty(&req->token_bucket_throttle_item))
+			list_del_init(&req->token_bucket_throttle_item);
+		mutex_unlock(&ptbt->bucket_lock);
+        }
+    } else {
+        pr_debug("%s: no need to wait for tokens, going ahead, req: %p\n", __func__, req);
+        get_token(ptbt, req);
+        mutex_unlock(&ptbt->bucket_lock);
+    }
+    return ret;
+}
+EXPORT_SYMBOL(get_token_bucket_throttle);
+
+int queue_item_init(struct queue_item* qitem, struct token_bucket_throttle* ptbt, int tb_item_num)
+{
+    qitem->tokens_requested = kzalloc(sizeof(*qitem->tokens_requested) * tb_item_num, GFP_KERNEL);
+    if (!qitem->tokens_requested)
+        return -ENOMEM;
+
+    qitem->tb_item_num = tb_item_num;
+    INIT_LIST_HEAD(&qitem->token_bucket_throttle_item);
+    init_completion(&qitem->throttled);
+    qitem->tbt_timeout = ptbt->tbt_timeout;
+
+    return 0;
+}
+EXPORT_SYMBOL(queue_item_init);
+
+void queue_item_free(struct queue_item* qitem)
+{
+	kfree(qitem->tokens_requested);
+}
+EXPORT_SYMBOL(queue_item_free);
+
+int token_bucket_throttle_init(struct token_bucket_throttle* ptbt,
+        int token_bucket_num)
+{
+    int i = 0;
+
+    INIT_LIST_HEAD(&ptbt->reqs_blocked);
+    mutex_init(&ptbt->bucket_lock);
+    ptbt->tb_num = token_bucket_num;
+    ptbt->tb = kzalloc(sizeof(*ptbt->tb) * ptbt->tb_num, GFP_KERNEL);
+    if (!ptbt->tb) {
+        return -ENOMEM;
+    }
+
+    for (i = 0; i < ptbt->tb_num; i++) {
+        ptbt->tb[i].target_throughput = 0;
+        ptbt->tb[i].max = 0;
+    }
+    ptbt->tick_interval = 0;
+    ptbt->tbt_timeout = 0;
+    INIT_DELAYED_WORK(&ptbt->tick_work, token_bucket_throttle_tick);
+
+    return 0;
+}
+EXPORT_SYMBOL(token_bucket_throttle_init);
+
+static int set_throttle_params(struct token_bucket_throttle* ptbt, char* param_list)
+{
+    char* options = strstrip(param_list);
+    char* val = NULL;
+    int res = 0;
+    unsigned long interval = 0, timeout = 0, last_interval = ptbt->tick_interval;
+
+    val = strsep(&options, ",");
+    if (!val)
+        return -EINVAL;
+
+    res = kstrtol(val, 0, &interval);
+    if (res)
+        return res;
+
+    val = strsep(&options, ",");
+    if (!val)
+        return -EINVAL;
+
+    res = kstrtol(val, 0, &timeout);
+    if (res)
+        return res;
+
+    if (last_interval && !interval) {
+        int i = 0;
+
+        for (i = 0; i<ptbt->tb_num; i++) {
+            if (ptbt->tb[i].max) {
+                /* all token bucket must be unset
+                 * before turning off the throttle */
+                return -EINVAL;
+            }
+        }
+    }
+    ptbt->tick_interval = msecs_to_jiffies(interval);
+    ptbt->tbt_timeout = timeout;
+
+    if (ptbt->tick_interval && !last_interval) {
+        schedule_token_bucket_throttle_tick(ptbt, ptbt->tick_interval);
+    }
+
+    return 0;
+}
+
+static int set_tb_params(struct token_bucket_throttle* ptbt, int tb_idx, char* param_list)
+{
+    char* options = strstrip(param_list);
+    char* val = NULL;
+    int res = 0;
+    unsigned long throughput = 0, burst = 0;
+
+    val = strsep(&options, ",");
+    if (!val)
+        return -EINVAL;
+
+    res = kstrtol(val, 0, &throughput);
+    if (res)
+        return res;
+
+    val = strsep(&options, ",");
+    if (!val)
+        return -EINVAL;
+
+    res = kstrtol(val, 0, &burst);
+    if (res)
+        return res;
+
+    if (!(throughput && burst) && (throughput || burst)) {
+        /* either both or none of throughput and burst are set*/
+        return -EINVAL;
+    }
+    if (throughput && !ptbt->tick_interval) {
+        /* all token bucket must be unset
+         * before turning off the throttle */
+        return -EINVAL;
+    }
+    ptbt->tb[tb_idx].target_throughput = throughput;
+    ptbt->tb[tb_idx].max = burst;
+
+    return 0;
+}
+
+static ssize_t ceph_set_throttle_params(struct kernfs_open_file *of,
+        char *buf, size_t nbytes, loff_t off)
+{
+	int ret = 0;
+	struct blkcg* blkcg = css_to_blkcg(of_css(of));
+	struct ceph_group_data* cephgd_p = blkcg_to_cephgd(blkcg);
+	int index = of_cft(of)->private;
+
+	switch (index) {
+	case CEPH_MDS_META_OPS:
+		ret = set_throttle_params(&cephgd_p->meta_ops_throttle, buf);
+		break;
+	case CEPH_OSD_DATA_OPS:
+		ret = set_throttle_params(&cephgd_p->data_ops_throttle, buf);
+		break;
+	case CEPH_MDS_META_OPS_IOPS:
+		ret = set_tb_params(&cephgd_p->meta_ops_throttle,
+				    META_OPS_IOPS_IDX, buf);
+		break;
+	case CEPH_OSD_DATA_OPS_IOPS:
+		ret = set_tb_params(&cephgd_p->data_ops_throttle,
+				    DATA_OPS_IOPS_IDX, buf);
+		break;
+	case CEPH_OSD_DATA_OPS_BANDWIDTH:
+		ret = set_tb_params(&cephgd_p->data_ops_throttle,
+				    DATA_OPS_BAND_IDX, buf);
+		break;
+	default:
+		BUG();
+	}
+
+	return ret ?: nbytes;
+}
+
+static int ceph_throttle_params_read(struct seq_file *sf, void *v)
+{
+	struct blkcg* blkcg = css_to_blkcg(seq_css(sf));
+	struct ceph_group_data* cephgd_p = blkcg_to_cephgd(blkcg);
+	int index = seq_cft(sf)->private;
+
+	switch (index) {
+	case CEPH_MDS_META_OPS:
+		seq_printf(sf, "%llu,%lu\n",
+			   cephgd_p->meta_ops_throttle.tick_interval,
+			   cephgd_p->meta_ops_throttle.tbt_timeout);
+		break;
+	case CEPH_OSD_DATA_OPS:
+		seq_printf(sf, "%llu,%lu\n",
+			   cephgd_p->data_ops_throttle.tick_interval,
+			   cephgd_p->data_ops_throttle.tbt_timeout);
+		break;
+	case CEPH_MDS_META_OPS_IOPS:
+		seq_printf(sf, "%llu,%llu\n",
+			   cephgd_p->meta_ops_throttle.tb[META_OPS_IOPS_IDX].target_throughput,
+			   cephgd_p->meta_ops_throttle.tb[META_OPS_IOPS_IDX].max);
+		break;
+	case CEPH_OSD_DATA_OPS_IOPS:
+		seq_printf(sf, "%llu,%llu\n",
+			   cephgd_p->data_ops_throttle.tb[DATA_OPS_IOPS_IDX].target_throughput,
+			   cephgd_p->data_ops_throttle.tb[DATA_OPS_IOPS_IDX].max);
+		break;
+	case CEPH_OSD_DATA_OPS_BANDWIDTH:
+		seq_printf(sf, "%llu,%llu\n",
+			   cephgd_p->data_ops_throttle.tb[DATA_OPS_BAND_IDX].target_throughput,
+			   cephgd_p->data_ops_throttle.tb[DATA_OPS_BAND_IDX].max);
+		break;
+	default:
+		BUG();
+	}
+
+	return 0;
+}
+
+static struct cftype cephgd_files[] = {
+    {
+        .name = "cephfs_meta_ops.iops",
+        .write = ceph_set_throttle_params,
+        .seq_show = ceph_throttle_params_read,
+	.private = CEPH_MDS_META_OPS_IOPS,
+    },
+    {
+        .name = "cephfs_meta_ops",
+        .write = ceph_set_throttle_params,
+        .seq_show = ceph_throttle_params_read,
+	.private = CEPH_MDS_META_OPS,
+    },
+    {
+        .name = "cephfs_data_ops.iops",
+        .write = ceph_set_throttle_params,
+        .seq_show = ceph_throttle_params_read,
+	.private = CEPH_OSD_DATA_OPS_IOPS,
+    },
+    {
+        .name = "cephfs_data_ops.bandwidth",
+        .write = ceph_set_throttle_params,
+        .seq_show = ceph_throttle_params_read,
+	.private = CEPH_OSD_DATA_OPS_BANDWIDTH,
+    },
+    {
+        .name = "cephfs_data_ops",
+        .write = ceph_set_throttle_params,
+        .seq_show = ceph_throttle_params_read,
+	.private = CEPH_OSD_DATA_OPS,
+    },
+    { }
+};
+
+static struct blkcg_policy_data * ceph_cpd_alloc(gfp_t gfp) {
+
+    struct ceph_group_data* cephgd_p = NULL;
+    struct blkcg_policy_data *ret = NULL;
+    int r = 0;
+
+    cephgd_p = kzalloc(sizeof(*cephgd_p), gfp);
+    if (!cephgd_p) {
+	    ret = ERR_PTR(-ENOMEM);
+	    goto err;
+    }
+
+    r = token_bucket_throttle_init(&cephgd_p->meta_ops_throttle,
+				   META_OPS_TB_NUM);
+    if (r) {
+        ret = ERR_PTR(r);
+        goto err;
+    }
+
+    r = token_bucket_throttle_init(&cephgd_p->data_ops_throttle,
+				   DATA_OPS_TB_NUM);
+    if (r) {
+        ret = ERR_PTR(r);
+        goto err;
+    }
+
+    return &cephgd_p->cpd;
+err:
+    return ret;
+}
+
+static void ceph_cpd_init(struct blkcg_policy_data *cpd) {
+}
+
+static void ceph_cpd_free(struct blkcg_policy_data *cpd) {
+    struct ceph_group_data* cephgd_p = cpd_to_cephgd(cpd);
+
+    cancel_delayed_work_sync(&cephgd_p->meta_ops_throttle.tick_work);
+    cancel_delayed_work_sync(&cephgd_p->data_ops_throttle.tick_work);
+
+    kfree(cephgd_p->meta_ops_throttle.tb);
+    kfree(cephgd_p->data_ops_throttle.tb);
+
+    kfree(cephgd_p);
+}
+
+struct blkcg_policy io_policy_ceph = {
+	.dfl_cftypes = cephgd_files,
+
+	.cpd_alloc_fn = ceph_cpd_alloc,
+	.cpd_init_fn = ceph_cpd_init,
+	.cpd_free_fn = ceph_cpd_free,
+};
+EXPORT_SYMBOL_GPL(io_policy_ceph);
+
+int ceph_io_policy_init()
+{
+	return blkcg_policy_register(&io_policy_ceph);
+};
+EXPORT_SYMBOL(ceph_io_policy_init);
+
+void ceph_io_policy_release()
+{
+	blkcg_policy_unregister(&io_policy_ceph);
+};
+EXPORT_SYMBOL(ceph_io_policy_release);
diff --git a/include/linux/ceph/ceph_io_policy.h b/include/linux/ceph/ceph_io_policy.h
new file mode 100644
index 000000000000..32b452ec358f
--- /dev/null
+++ b/include/linux/ceph/ceph_io_policy.h
@@ -0,0 +1,74 @@
+#ifndef _CEPHFS_CGROUP_H
+#define _CEPHFS_CGROUP_H
+
+#include <linux/blk-cgroup.h>
+
+#define META_OPS_IOPS_IDX 0
+#define DATA_OPS_IOPS_IDX 0
+#define DATA_OPS_BAND_IDX 1
+#define META_OPS_TB_NUM 1
+#define DATA_OPS_TB_NUM 2
+
+/*
+ * token bucket throttle
+ */
+struct token_bucket {
+	u64 remain;
+	u64 max;
+	u64 target_throughput;
+};
+
+struct token_bucket_throttle {
+	struct token_bucket* tb;
+	u64 tick_interval;
+	int tb_num;
+	struct list_head reqs_blocked;
+	struct mutex bucket_lock;
+	struct delayed_work tick_work;
+	unsigned long tbt_timeout;
+};
+
+struct queue_item {
+	struct list_head token_bucket_throttle_item;
+	u64* tokens_requested;
+	int tb_item_num;
+	struct completion throttled;
+	unsigned long tbt_timeout;
+};
+
+struct ceph_group_data {
+	struct blkcg_policy_data cpd;
+
+	struct token_bucket_throttle meta_ops_throttle;
+	struct token_bucket_throttle data_ops_throttle;
+};
+
+extern struct blkcg_policy io_policy_ceph;
+
+static inline struct ceph_group_data *cpd_to_cephgd(struct blkcg_policy_data *cpd)
+{
+	return cpd ? container_of(cpd, struct ceph_group_data, cpd) : NULL;
+}
+
+static inline struct ceph_group_data* blkcg_to_cephgd(struct blkcg* blkcg)
+{
+	return cpd_to_cephgd(blkcg_to_cpd(blkcg, &io_policy_ceph));
+}
+
+extern void schedule_token_bucket_throttle_tick(struct token_bucket_throttle* ptbt, u64 tick_interval);
+
+extern void token_bucket_throttle_tick(struct work_struct* work);
+
+extern int get_token_bucket_throttle(struct token_bucket_throttle* ptbt, struct queue_item* req);
+
+extern int queue_item_init(struct queue_item* qitem, struct token_bucket_throttle* ptbt, int tb_item_num);
+
+extern int token_bucket_throttle_init(struct token_bucket_throttle* ptbt, int token_bucket_num);
+
+extern int ceph_io_policy_init(void);
+
+extern void ceph_io_policy_release(void);
+
+extern void queue_item_free(struct queue_item* qitem);
+
+#endif /*_CEPHFS_CGROUP_H*/
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 2294f963dab7..c80c96368679 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -15,6 +15,9 @@
 #include <linux/ceph/msgpool.h>
 #include <linux/ceph/auth.h>
 #include <linux/ceph/pagelist.h>
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+#include <linux/ceph/ceph_io_policy.h>
+#endif
 
 struct ceph_msg;
 struct ceph_snap_context;
@@ -193,6 +196,10 @@ struct ceph_osd_request {
 
 	int               r_result;
 
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+	/* token bucket throttle item*/
+	struct queue_item qitem;
+#endif
 	struct ceph_osd_client *r_osdc;
 	struct kref       r_kref;
 	bool              r_mempool;
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 1c811c74bfc0..05c6e7b89c42 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -26,6 +26,9 @@
 #include <linux/ceph/decode.h>
 #include <linux/ceph/mon_client.h>
 #include <linux/ceph/auth.h>
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+#include <linux/ceph/ceph_io_policy.h>
+#endif
 #include "crypto.h"
 
 
@@ -776,6 +779,12 @@ static int __init init_ceph_lib(void)
 {
 	int ret = 0;
 
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+	ret = ceph_io_policy_init();
+	if (ret < 0)
+		goto out;
+#endif
+
 	ret = ceph_debugfs_init();
 	if (ret < 0)
 		goto out;
@@ -812,6 +821,10 @@ static void __exit exit_ceph_lib(void)
 	dout("exit_ceph_lib\n");
 	WARN_ON(!ceph_strings_empty());
 
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+	ceph_io_policy_release();
+#endif
+
 	ceph_osdc_cleanup();
 	ceph_msgr_exit();
 	ceph_crypto_shutdown();
-- 
2.21.0




[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux