On Thu, Jan 16, 2020 at 01:59:07PM +0100, Jack Wang wrote: > From: Jack Wang <jinpu.wang@xxxxxxxxxxxxxxx> > > This is main functionality of rnbd-client module, which provides > interface to map remote device as local block device /dev/rnbd<N> > and feeds RTRS with IO requests. > > Signed-off-by: Danil Kipnis <danil.kipnis@xxxxxxxxxxxxxxx> > Signed-off-by: Jack Wang <jinpu.wang@xxxxxxxxxxxxxxx> > --- > drivers/block/rnbd/rnbd-clt.c | 1730 +++++++++++++++++++++++++++++++++ > 1 file changed, 1730 insertions(+) > create mode 100644 drivers/block/rnbd/rnbd-clt.c > > diff --git a/drivers/block/rnbd/rnbd-clt.c b/drivers/block/rnbd/rnbd-clt.c > new file mode 100644 > index 000000000000..7d8cb38d3969 > --- /dev/null > +++ b/drivers/block/rnbd/rnbd-clt.c > @@ -0,0 +1,1730 @@ > +// SPDX-License-Identifier: GPL-2.0-or-later > +/* > + * RDMA Network Block Driver > + * > + * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. > + * > + * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. > + * > + * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. > + */ > + > +#undef pr_fmt > +#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt > + > +#include <linux/module.h> > +#include <linux/blkdev.h> > +#include <linux/hdreg.h> > +#include <linux/scatterlist.h> > +#include <linux/idr.h> > + > +#include "rnbd-clt.h" > + > +MODULE_DESCRIPTION("RDMA Network Block Device Client"); > +MODULE_LICENSE("GPL"); > + > +static int rnbd_client_major; > +static DEFINE_IDA(index_ida); > +static DEFINE_MUTEX(ida_lock); > +static DEFINE_MUTEX(sess_lock); > +static LIST_HEAD(sess_list); > + > +/* > + * Maximum number of partitions an instance can have. > + * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself) > + */ > +#define RNBD_PART_BITS 6 > + > +static inline bool rnbd_clt_get_sess(struct rnbd_clt_session *sess) > +{ > + return refcount_inc_not_zero(&sess->refcount); > +} > + > +static void free_sess(struct rnbd_clt_session *sess); > + > +static void rnbd_clt_put_sess(struct rnbd_clt_session *sess) > +{ > + might_sleep(); > + > + if (refcount_dec_and_test(&sess->refcount)) > + free_sess(sess); > +} I see that this code is for drivers/block and maybe it is a way to do it there, but in RDMA, we don't like abstraction of general and well-known kernel APIs. It looks like kref to me. > + > +static inline bool rnbd_clt_dev_is_mapped(struct rnbd_clt_dev *dev) > +{ > + return dev->dev_state == DEV_STATE_MAPPED; > +} > + > +static void rnbd_clt_put_dev(struct rnbd_clt_dev *dev) > +{ > + might_sleep(); > + > + if (refcount_dec_and_test(&dev->refcount)) { > + mutex_lock(&ida_lock); > + ida_simple_remove(&index_ida, dev->clt_device_id); > + mutex_unlock(&ida_lock); > + kfree(dev->hw_queues); > + rnbd_clt_put_sess(dev->sess); > + kfree(dev); > + } > +} > + > +static inline bool rnbd_clt_get_dev(struct rnbd_clt_dev *dev) > +{ > + return refcount_inc_not_zero(&dev->refcount); > +} > + > +static int rnbd_clt_set_dev_attr(struct rnbd_clt_dev *dev, > + const struct rnbd_msg_open_rsp *rsp) > +{ > + struct rnbd_clt_session *sess = dev->sess; > + > + if (unlikely(!rsp->logical_block_size)) > + return -EINVAL; unlikely() again. > + > + dev->device_id = le32_to_cpu(rsp->device_id); > + dev->nsectors = le64_to_cpu(rsp->nsectors); > + dev->logical_block_size = le16_to_cpu(rsp->logical_block_size); > + dev->physical_block_size = le16_to_cpu(rsp->physical_block_size); > + dev->max_write_same_sectors = le32_to_cpu(rsp->max_write_same_sectors); > + dev->max_discard_sectors = le32_to_cpu(rsp->max_discard_sectors); > + dev->discard_granularity = le32_to_cpu(rsp->discard_granularity); > + dev->discard_alignment = le32_to_cpu(rsp->discard_alignment); > + dev->secure_discard = le16_to_cpu(rsp->secure_discard); > + dev->rotational = rsp->rotational; > + > + dev->max_hw_sectors = sess->max_io_size / SECTOR_SIZE; > + dev->max_segments = BMAX_SEGMENTS; > + > + dev->max_hw_sectors = min_t(u32, dev->max_hw_sectors, > + le32_to_cpu(rsp->max_hw_sectors)); > + dev->max_segments = min_t(u16, dev->max_segments, > + le16_to_cpu(rsp->max_segments)); > + > + return 0; > +} > + > +static int rnbd_clt_change_capacity(struct rnbd_clt_dev *dev, > + size_t new_nsectors) > +{ > + int err = 0; > + > + rnbd_clt_info(dev, "Device size changed from %zu to %zu sectors\n", > + dev->nsectors, new_nsectors); > + dev->nsectors = new_nsectors; > + set_capacity(dev->gd, dev->nsectors); > + err = revalidate_disk(dev->gd); > + if (err) > + rnbd_clt_err(dev, > + "Failed to change device size from %zu to %zu, err: %d\n", > + dev->nsectors, new_nsectors, err); > + return err; > +} > + > +static int process_msg_open_rsp(struct rnbd_clt_dev *dev, > + struct rnbd_msg_open_rsp *rsp) > +{ > + int err = 0; > + > + mutex_lock(&dev->lock); > + if (dev->dev_state == DEV_STATE_UNMAPPED) { > + rnbd_clt_info(dev, > + "Ignoring Open-Response message from server for unmapped device\n"); > + err = -ENOENT; > + goto out; > + } > + if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) { > + u64 nsectors = le64_to_cpu(rsp->nsectors); > + > + /* > + * If the device was remapped and the size changed in the > + * meantime we need to revalidate it > + */ > + if (dev->nsectors != nsectors) > + rnbd_clt_change_capacity(dev, nsectors); > + rnbd_clt_info(dev, "Device online, device remapped successfully\n"); > + } > + err = rnbd_clt_set_dev_attr(dev, rsp); > + if (unlikely(err)) > + goto out; > + dev->dev_state = DEV_STATE_MAPPED; > + > +out: > + mutex_unlock(&dev->lock); > + > + return err; > +} > + > +int rnbd_clt_resize_disk(struct rnbd_clt_dev *dev, size_t newsize) > +{ > + int ret = 0; > + > + mutex_lock(&dev->lock); > + if (dev->dev_state != DEV_STATE_MAPPED) { > + pr_err("Failed to set new size of the device, device is not opened\n"); > + ret = -ENOENT; > + goto out; > + } > + ret = rnbd_clt_change_capacity(dev, newsize); > + > +out: > + mutex_unlock(&dev->lock); > + > + return ret; > +} > + > +static inline void rnbd_clt_dev_requeue(struct rnbd_queue *q) > +{ > + if (WARN_ON(!q->hctx)) > + return; > + > + /* We can come here from interrupt, thus async=true */ > + blk_mq_run_hw_queue(q->hctx, true); > +} > + > +enum { > + RNBD_DELAY_10ms = 10, > + RNBD_DELAY_IFBUSY = -1, > +}; > + > +/** > + * rnbd_get_cpu_qlist() - finds a list with HW queues to be rerun > + * @sess: Session to find a queue for > + * @cpu: Cpu to start the search from > + * > + * Description: > + * Each CPU has a list of HW queues, which needs to be rerun. If a list > + * is not empty - it is marked with a bit. This function finds first > + * set bit in a bitmap and returns corresponding CPU list. > + */ > +static struct rnbd_cpu_qlist * > +rnbd_get_cpu_qlist(struct rnbd_clt_session *sess, int cpu) > +{ > + int bit; > + > + /* First half */ > + bit = find_next_bit(sess->cpu_queues_bm, nr_cpu_ids, cpu); Is it protected by any lock? > + if (bit < nr_cpu_ids) { > + return per_cpu_ptr(sess->cpu_queues, bit); > + } else if (cpu != 0) { > + /* Second half */ > + bit = find_next_bit(sess->cpu_queues_bm, cpu, 0); > + if (bit < cpu) > + return per_cpu_ptr(sess->cpu_queues, bit); > + } > + > + return NULL; > +} > + > +static inline int nxt_cpu(int cpu) > +{ > + return (cpu + 1) % nr_cpu_ids; > +} > + > +/** > + * rnbd_rerun_if_needed() - rerun next queue marked as stopped > + * @sess: Session to rerun a queue on > + * > + * Description: > + * Each CPU has it's own list of HW queues, which should be rerun. > + * Function finds such list with HW queues, takes a list lock, picks up > + * the first HW queue out of the list and requeues it. > + * > + * Return: > + * True if the queue was requeued, false otherwise. > + * > + * Context: > + * Does not matter. > + */ > +static inline bool rnbd_rerun_if_needed(struct rnbd_clt_session *sess) No inline function in C files. > +{ > + struct rnbd_queue *q = NULL; > + struct rnbd_cpu_qlist *cpu_q; > + unsigned long flags; > + int *cpup; > + > + /* > + * To keep fairness and not to let other queues starve we always > + * try to wake up someone else in round-robin manner. That of course > + * increases latency but queues always have a chance to be executed. > + */ > + cpup = get_cpu_ptr(sess->cpu_rr); > + for (cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(*cpup)); cpu_q; > + cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(cpu_q->cpu))) { > + if (!spin_trylock_irqsave(&cpu_q->requeue_lock, flags)) > + continue; > + if (likely(test_bit(cpu_q->cpu, sess->cpu_queues_bm))) { Success oriented approach please. > + q = list_first_entry_or_null(&cpu_q->requeue_list, > + typeof(*q), requeue_list); > + if (WARN_ON(!q)) > + goto clear_bit; > + list_del_init(&q->requeue_list); > + clear_bit_unlock(0, &q->in_list); > + > + if (list_empty(&cpu_q->requeue_list)) { > + /* Clear bit if nothing is left */ > +clear_bit: > + clear_bit(cpu_q->cpu, sess->cpu_queues_bm); > + } > + } > + spin_unlock_irqrestore(&cpu_q->requeue_lock, flags); > + > + if (q) > + break; > + } > + > + /** > + * Saves the CPU that is going to be requeued on the per-cpu var. Just > + * incrementing it doesn't work because rnbd_get_cpu_qlist() will > + * always return the first CPU with something on the queue list when the > + * value stored on the var is greater than the last CPU with something > + * on the list. > + */ > + if (cpu_q) > + *cpup = cpu_q->cpu; > + put_cpu_var(sess->cpu_rr); > + > + if (q) > + rnbd_clt_dev_requeue(q); > + > + return !!q; > +} > + > +/** > + * rnbd_rerun_all_if_idle() - rerun all queues left in the list if > + * session is idling (there are no requests > + * in-flight). > + * @sess: Session to rerun the queues on > + * > + * Description: > + * This function tries to rerun all stopped queues if there are no > + * requests in-flight anymore. This function tries to solve an obvious > + * problem, when number of tags < than number of queues (hctx), which > + * are stopped and put to sleep. If last permit, which has been just put, > + * does not wake up all left queues (hctxs), IO requests hang forever. > + * > + * That can happen when all number of permits, say N, have been exhausted > + * from one CPU, and we have many block devices per session, say M. > + * Each block device has it's own queue (hctx) for each CPU, so eventually > + * we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids. > + * If number of permits N < M x nr_cpu_ids finally we will get an IO hang. > + * > + * To avoid this hang last caller of rnbd_put_permit() (last caller is the > + * one who observes sess->busy == 0) must wake up all remaining queues. > + * > + * Context: > + * Does not matter. > + */ > +static inline void rnbd_rerun_all_if_idle(struct rnbd_clt_session *sess) > +{ > + bool requeued; > + > + do { > + requeued = rnbd_rerun_if_needed(sess); > + } while (atomic_read(&sess->busy) == 0 && requeued); > +} > + > +static struct rtrs_permit *rnbd_get_permit(struct rnbd_clt_session *sess, > + enum rtrs_clt_con_type con_type, > + int wait) > +{ > + struct rtrs_permit *permit; > + > + permit = rtrs_clt_get_permit(sess->rtrs, con_type, > + wait ? RTRS_PERMIT_WAIT : > + RTRS_PERMIT_NOWAIT); > + if (likely(permit)) > + /* We have a subtle rare case here, when all permits can be > + * consumed before busy counter increased. This is safe, > + * because loser will get NULL as a permit, observe 0 busy > + * counter and immediately restart the queue himself. > + */ > + atomic_inc(&sess->busy); > + > + return permit; > +} > + > +static void rnbd_put_permit(struct rnbd_clt_session *sess, > + struct rtrs_permit *permit) > +{ > + rtrs_clt_put_permit(sess->rtrs, permit); > + atomic_dec(&sess->busy); > + /* Paired with rnbd_clt_dev_add_to_requeue(). Decrement first > + * and then check queue bits. > + */ > + smp_mb__after_atomic(); > + rnbd_rerun_all_if_idle(sess); > +} > + > +static struct rnbd_iu *rnbd_get_iu(struct rnbd_clt_session *sess, > + enum rtrs_clt_con_type con_type, > + int wait) > +{ > + struct rnbd_iu *iu; > + struct rtrs_permit *permit; > + > + permit = rnbd_get_permit(sess, con_type, > + wait ? RTRS_PERMIT_WAIT : > + RTRS_PERMIT_NOWAIT); > + if (unlikely(!permit)) > + return NULL; > + iu = rtrs_permit_to_pdu(permit); > + iu->permit = permit; > + /* yes, rtrs_permit_from_pdu() can be nice here, > + * but also we have to think about MQ mode > + */ > + /* > + * 1st reference is dropped after finishing sending a "user" message, > + * 2nd reference is dropped after confirmation with the response is > + * returned. > + * 1st and 2nd can happen in any order, so the rnbd_iu should be > + * released (rtrs_permit returned to ibbtrs) only leased after both > + * are finished. > + */ > + atomic_set(&iu->refcount, 2); > + init_waitqueue_head(&iu->comp.wait); > + iu->comp.errno = INT_MAX; > + > + return iu; > +} > + > +static void rnbd_put_iu(struct rnbd_clt_session *sess, struct rnbd_iu *iu) > +{ > + if (atomic_dec_and_test(&iu->refcount)) > + rnbd_put_permit(sess, iu->permit); > +} > + > +static void rnbd_softirq_done_fn(struct request *rq) > +{ > + struct rnbd_clt_dev *dev = rq->rq_disk->private_data; > + struct rnbd_clt_session *sess = dev->sess;a Please no vertical alignment in new code, it adds a lot of churn if such line is changed later and creates difficulties for the backports. Thanks