Re: [PATCH v9 06/25] RDMA/rtrs: client: main functionality

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 2020-02-21 02:47, Jack Wang wrote:
> +	rcu_read_lock();
> +	list_for_each_entry_rcu(sess, &clt->paths_list, s.entry)
> +		connected |= (READ_ONCE(sess->state) == RTRS_CLT_CONNECTED);
> +	rcu_read_unlock();

Are the parentheses around the comparison necessary?

> +static struct rtrs_permit *
> +__rtrs_get_permit(struct rtrs_clt *clt, enum rtrs_clt_con_type con_type)
> +{
> +	size_t max_depth = clt->queue_depth;
> +	struct rtrs_permit *permit;
> +	int cpu, bit;
> +
> +	/* Combined with cq_vector, we pin the IO to the the cpu it comes */

This comment is confusing. Please clarify this comment. All I see below
is that preemption is disabled. I don't see pinning of I/O to the CPU of
the caller.

> +	cpu = get_cpu();
> +	do {
> +		bit = find_first_zero_bit(clt->permits_map, max_depth);
> +		if (unlikely(bit >= max_depth)) {
> +			put_cpu();
> +			return NULL;
> +		}
> +
> +	} while (unlikely(test_and_set_bit_lock(bit, clt->permits_map)));
> +	put_cpu();
> +
> +	permit = GET_PERMIT(clt, bit);
> +	WARN_ON(permit->mem_id != bit);
> +	permit->cpu_id = cpu;
> +	permit->con_type = con_type;
> +
> +	return permit;
> +}

Please remove the blank line before "} while (...)".

> +void rtrs_clt_put_permit(struct rtrs_clt *clt, struct rtrs_permit *permit)
> +{
> +	if (WARN_ON(!test_bit(permit->mem_id, clt->permits_map)))
> +		return;
> +
> +	__rtrs_put_permit(clt, permit);
> +
> +	/*
> +	 * Putting a permit is a barrier, so we will observe
> +	 * new entry in the wait list, no worries.
> +	 */
> +	if (waitqueue_active(&clt->permits_wait))
> +		wake_up(&clt->permits_wait);
> +}
> +EXPORT_SYMBOL(rtrs_clt_put_permit);

The comment in the above function is not only confusing but it also
fails to explain why it is safe to guard wake_up() with a
waitqueue_active() check. How about changing that comment into the
following:

rtrs_clt_get_permit() adds itself to the &clt->permits_wait list before
calling schedule(). So if rtrs_clt_get_permit() is sleeping it must have
added itself to &clt->permits_wait before __rtrs_put_permit() finished.
Hence it is safe to guard wake_up() with a waitqueue_active() test.

> +static int rtrs_post_send_rdma(struct rtrs_clt_con *con,
> +				struct rtrs_clt_io_req *req,
> +				struct rtrs_rbuf *rbuf, u32 off,
> +				u32 imm, struct ib_send_wr *wr)
> +{
> +	struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
> +	enum ib_send_flags flags;
> +	struct ib_sge sge;
> +
> +	if (unlikely(!req->sg_size)) {
> +		rtrs_wrn(con->c.sess,
> +			 "Doing RDMA Write failed, no data supplied\n");
> +		return -EINVAL;
> +	}
> +
> +	/* user data and user message in the first list element */
> +	sge.addr   = req->iu->dma_addr;
> +	sge.length = req->sg_size;
> +	sge.lkey   = sess->s.dev->ib_pd->local_dma_lkey;
> +
> +	/*
> +	 * From time to time we have to post signalled sends,
> +	 * or send queue will fill up and only QP reset can help.
> +	 */
> +	flags = atomic_inc_return(&con->io_cnt) % sess->queue_depth ?
> +			0 : IB_SEND_SIGNALED;
> +
> +	ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr,
> +				      req->sg_size, DMA_TO_DEVICE);
> +
> +	return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, &sge, 1,
> +					    rbuf->rkey, rbuf->addr + off,
> +					    imm, flags, wr);
> +}

I don't think that posting a signalled send from time to time is
sufficient to prevent send queue overflow. Please address Jason's
comment from January 7th: "Not quite. If the SQ depth is 16 and you post
16 things and then signal the last one, you *cannot* post new work until
you see the completion. More SQ space *ONLY* becomes available upon
receipt of a completion. This is why you can't have an unsignaled SQ."

See also https://lore.kernel.org/linux-rdma/20200107182528.GB26174@xxxxxxxx/

> +static void rtrs_clt_init_req(struct rtrs_clt_io_req *req,
> +				     struct rtrs_clt_sess *sess,
> +				     rtrs_conf_fn *conf,
> +				     struct rtrs_permit *permit, void *priv,
> +				     const struct kvec *vec, size_t usr_len,
> +				     struct scatterlist *sg, size_t sg_cnt,
> +				     size_t data_len, int dir)
> +{
> +	struct iov_iter iter;
> +	size_t len;
> +
> +	req->permit = permit;
> +	req->in_use = true;
> +	req->usr_len = usr_len;
> +	req->data_len = data_len;
> +	req->sglist = sg;
> +	req->sg_cnt = sg_cnt;
> +	req->priv = priv;
> +	req->dir = dir;
> +	req->con = rtrs_permit_to_clt_con(sess, permit);
> +	req->conf = conf;
> +	req->need_inv = false;
> +	req->need_inv_comp = false;
> +	req->inv_errno = 0;
> +
> +	iov_iter_kvec(&iter, READ, vec, 1, usr_len);
> +	len = _copy_from_iter(req->iu->buf, usr_len, &iter);
> +	WARN_ON(len != usr_len);
> +
> +	reinit_completion(&req->inv_comp);
> +}

It is hard to verify whether the above function initializes all fields
of 'req' or not. Consider changing the 'req' member assignments into
something like *req = (struct rtrs_clt_io_req){ .permit = permit, ... };

> +static int rtrs_post_rdma_write_sg(struct rtrs_clt_con *con,
> +				    struct rtrs_clt_io_req *req,
> +				    struct rtrs_rbuf *rbuf,
> +				    u32 size, u32 imm)
> +{
> +	struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
> +	struct ib_sge *sge = req->sge;
> +	enum ib_send_flags flags;
> +	struct scatterlist *sg;
> +	size_t num_sge;
> +	int i;
> +
> +	for_each_sg(req->sglist, sg, req->sg_cnt, i) {
> +		sge[i].addr   = sg_dma_address(sg);
> +		sge[i].length = sg_dma_len(sg);
> +		sge[i].lkey   = sess->s.dev->ib_pd->local_dma_lkey;
> +	}
> +	sge[i].addr   = req->iu->dma_addr;
> +	sge[i].length = size;
> +	sge[i].lkey   = sess->s.dev->ib_pd->local_dma_lkey;
> +
> +	num_sge = 1 + req->sg_cnt;
> +
> +	/*
> +	 * From time to time we have to post signalled sends,
> +	 * or send queue will fill up and only QP reset can help.
> +	 */
> +	flags = atomic_inc_return(&con->io_cnt) % sess->queue_depth ?
> +			0 : IB_SEND_SIGNALED;
> +
> +	ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr,
> +				      size, DMA_TO_DEVICE);
> +
> +	return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, sge, num_sge,
> +					    rbuf->rkey, rbuf->addr, imm,
> +					    flags, NULL);
> +}

Same comment here. Posting a signalled send from time to time is not
sufficient to prevent send queue overflow.

> +		memset(&rwr, 0, sizeof(rwr));
> +		rwr.wr.next = NULL;
> +		rwr.wr.opcode = IB_WR_REG_MR;
> +		rwr.wr.wr_cqe = &fast_reg_cqe;
> +		rwr.wr.num_sge = 0;
> +		rwr.mr = req->mr;
> +		rwr.key = req->mr->rkey;
> +		rwr.access = (IB_ACCESS_LOCAL_WRITE |
> +			      IB_ACCESS_REMOTE_WRITE);

How about changing the above code into rwr = (struct ib_reg_wr){.wr = {
... }, ... };?

> +static int rtrs_rdma_route_resolved(struct rtrs_clt_con *con)
> +{
> +	struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
> +	struct rtrs_clt *clt = sess->clt;
> +	struct rtrs_msg_conn_req msg;
> +	struct rdma_conn_param param;
> +
> +	int err;
> +
> +	memset(&param, 0, sizeof(param));
> +	param.retry_count = 7;
> +	param.rnr_retry_count = 7;
> +	param.private_data = &msg;
> +	param.private_data_len = sizeof(msg);
> +
> +	/*
> +	 * Those two are the part of struct cma_hdr which is shared
> +	 * with private_data in case of AF_IB, so put zeroes to avoid
> +	 * wrong validation inside cma.c on receiver side.
> +	 */
> +	msg.__cma_version = 0;
> +	msg.__ip_version = 0;
> +	msg.magic = cpu_to_le16(RTRS_MAGIC);
> +	msg.version = cpu_to_le16(RTRS_PROTO_VER);
> +	msg.cid = cpu_to_le16(con->c.cid);
> +	msg.cid_num = cpu_to_le16(sess->s.con_num);
> +	msg.recon_cnt = cpu_to_le16(sess->s.recon_cnt);
> +	uuid_copy(&msg.sess_uuid, &sess->s.uuid);
> +	uuid_copy(&msg.paths_uuid, &clt->paths_uuid);
> +
> +	err = rdma_connect(con->c.cm_id, &param);
> +	if (err)
> +		rtrs_err(clt, "rdma_connect(): %d\n", err);
> +
> +	return err;
> +}

Please use structure assignment instead of memset() followed by a series
of member assignments.

> +static inline bool xchg_sessions(struct rtrs_clt_sess __rcu **rcu_ppcpu_path,
> +				 struct rtrs_clt_sess *sess,
> +				 struct rtrs_clt_sess *next)
> +{
> +	struct rtrs_clt_sess **ppcpu_path;
> +
> +	/* Call cmpxchg() without sparse warnings */
> +	ppcpu_path = (typeof(ppcpu_path))rcu_ppcpu_path;
> +	return (sess == cmpxchg(ppcpu_path, sess, next));
> +}

Did checkpatch report for the above code that "return is not a function"?

> +static void rtrs_clt_add_path_to_arr(struct rtrs_clt_sess *sess,
> +				      struct rtrs_addr *addr)
> +{
> +	struct rtrs_clt *clt = sess->clt;
> +
> +	mutex_lock(&clt->paths_mutex);
> +	clt->paths_num++;
> +
> +	/*
> +	 * Firstly increase paths_num, wait for GP and then
> +	 * add path to the list.  Why?  Since we add path with
> +	 * !CONNECTED state explanation is similar to what has
> +	 * been written in rtrs_clt_remove_path_from_arr().
> +	 */
> +	synchronize_rcu();
> +
> +	list_add_tail_rcu(&sess->s.entry, &clt->paths_list);
> +	mutex_unlock(&clt->paths_mutex);
> +}

At least in the Linux kernel keeping track of the number of elements in
a list is considered bad practice. What prevents removal of the
'paths_num' counter?

> +static void rtrs_clt_dev_release(struct device *dev)
> +{
> +	struct rtrs_clt *clt  = container_of(dev, struct rtrs_clt, dev);
> +
> +	kfree(clt);
> +}

Please surround the assignment operator with only a single space at each
side.

> +int rtrs_clt_remove_path_from_sysfs(struct rtrs_clt_sess *sess,
> +				     const struct attribute *sysfs_self)
> +{
> +	struct rtrs_clt *clt = sess->clt;
> +	enum rtrs_clt_state old_state;
> +	bool changed;
> +
> +	/*
> +	 * That can happen only when userspace tries to remove path
> +	 * very early, when rtrs_clt_open() is not yet finished.
> +	 */
> +	if (!clt->opened)
> +		return -EBUSY;
> +
> +	/*
> +	 * Continue stopping path till state was changed to DEAD or
> +	 * state was observed as DEAD:
> +	 * 1. State was changed to DEAD - we were fast and nobody
> +	 *    invoked rtrs_clt_reconnect(), which can again start
> +	 *    reconnecting.
> +	 * 2. State was observed as DEAD - we have someone in parallel
> +	 *    removing the path.
> +	 */
> +	do {
> +		rtrs_clt_close_conns(sess, true);
> +	} while (!(changed = rtrs_clt_change_state_get_old(sess,
> +							    RTRS_CLT_DEAD,
> +							    &old_state)) &&
> +		   old_state != RTRS_CLT_DEAD);

Did checkpatch ask not to use an assignment in the while-loop condition?

Thanks,

Bart.



[Index of Archives]     [Linux RAID]     [Linux SCSI]     [Linux ATA RAID]     [IDE]     [Linux Wireless]     [Linux Kernel]     [ATH6KL]     [Linux Bluetooth]     [Linux Netdev]     [Kernel Newbies]     [Security]     [Git]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Device Mapper]

  Powered by Linux