[PATCH 10/28] ibtrs_srv: add main functionality for ibtrs_server

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



From: Jack Wang <jinpu.wang@xxxxxxxxxxxxxxxx>

Service accept connection requests from clients and reserve memory
for them.

It excutes rdma transfers, hands over received data to ibnbd_server.

Signed-off-by: Jack Wang <jinpu.wang@xxxxxxxxxxxxxxxx>
Signed-off-by: Kleber Souza <kleber.souza@xxxxxxxxxxxxxxxx>
Signed-off-by: Danil Kipnis <danil.kipnis@xxxxxxxxxxxxxxxx>
Signed-off-by: Roman Pen <roman.penyaev@xxxxxxxxxxxxxxxx>
---
 drivers/infiniband/ulp/ibtrs_server/ibtrs_srv.c | 3744 +++++++++++++++++++++++
 1 file changed, 3744 insertions(+)
 create mode 100644 drivers/infiniband/ulp/ibtrs_server/ibtrs_srv.c

diff --git a/drivers/infiniband/ulp/ibtrs_server/ibtrs_srv.c b/drivers/infiniband/ulp/ibtrs_server/ibtrs_srv.c
new file mode 100644
index 0000000..513e90a
--- /dev/null
+++ b/drivers/infiniband/ulp/ibtrs_server/ibtrs_srv.c
@@ -0,0 +1,3744 @@
+/*
+ * InfiniBand Transport Layer
+ *
+ * Copyright (c) 2014 - 2017 ProfitBricks GmbH. All rights reserved.
+ * Authors: Fabian Holler < mail@xxxxxxxxxx>
+ *          Jack Wang <jinpu.wang@xxxxxxxxxxxxxxxx>
+ *   	    Kleber Souza <kleber.souza@xxxxxxxxxxxxxxxx>
+ * 	    Danil Kipnis <danil.kipnis@xxxxxxxxxxxxxxxx>
+ *   	    Roman Pen <roman.penyaev@xxxxxxxxxxxxxxxx>
+ *          Milind Dumbare <Milind.dumbare@xxxxxxxxx>
+ *
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions, and the following disclaimer,
+ *    without modification.
+ * 2. Redistributions in binary form must reproduce at minimum a disclaimer
+ *    substantially similar to the "NO WARRANTY" disclaimer below
+ *    ("Disclaimer") and any redistribution must be conditioned upon
+ *    including a substantially similar Disclaimer requirement for further
+ *    binary redistribution.
+ * 3. Neither the names of the above-listed copyright holders nor the names
+ *    of any contributors may be used to endorse or promote products derived
+ *    from this software without specific prior written permission.
+ *
+ * Alternatively, this software may be distributed under the terms of the
+ * GNU General Public License ("GPL") version 2 as published by the Free
+ * Software Foundation.
+ *
+ * NO WARRANTY
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTIBILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * HOLDERS OR CONTRIBUTORS BE LIABLE FOR SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
+ * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGES.
+ *
+ */
+
+#include <linux/module.h>
+#include <linux/sizes.h>
+#include <linux/utsname.h>
+#include <linux/cpumask.h>
+#include <linux/debugfs.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <rdma/ib.h>
+
+#include <rdma/ibtrs_srv.h>
+#include "ibtrs_srv_sysfs.h"
+#include "ibtrs_srv_internal.h"
+#include <rdma/ibtrs.h>
+#include <rdma/ibtrs_log.h>
+
+MODULE_AUTHOR("ibnbd@xxxxxxxxxxxxxxxx");
+MODULE_DESCRIPTION("InfiniBand Transport Server");
+MODULE_VERSION(__stringify(IBTRS_VER));
+MODULE_LICENSE("GPL");
+
+#define DEFAULT_MAX_IO_SIZE_KB 128
+#define DEFAULT_MAX_IO_SIZE (DEFAULT_MAX_IO_SIZE_KB * 1024)
+static int max_io_size = DEFAULT_MAX_IO_SIZE;
+#define MAX_REQ_SIZE PAGE_SIZE
+static int rcv_buf_size = DEFAULT_MAX_IO_SIZE + MAX_REQ_SIZE;
+
+static int max_io_size_set(const char *val, const struct kernel_param *kp)
+{
+	int err, ival;
+
+	err = kstrtoint(val, 0, &ival);
+	if (err)
+		return err;
+
+	if (ival < 4096 || ival + MAX_REQ_SIZE > (4096 * 1024) ||
+	    (ival + MAX_REQ_SIZE) % 512 != 0) {
+		ERR_NP("Invalid max io size value %d, has to be"
+		       " > %d, < %d\n", ival, 4096, 4194304);
+		return -EINVAL;
+	}
+
+	max_io_size = ival;
+	rcv_buf_size = max_io_size + MAX_REQ_SIZE;
+	INFO_NP("max io size changed to %d\n", ival);
+
+	return 0;
+}
+
+static const struct kernel_param_ops max_io_size_ops = {
+	.set		= max_io_size_set,
+	.get		= param_get_int,
+};
+module_param_cb(max_io_size, &max_io_size_ops, &max_io_size, 0444);
+MODULE_PARM_DESC(max_io_size,
+		 "Max size for each IO request, when change the unit is in byte"
+		 " (default: " __stringify(DEFAULT_MAX_IO_SIZE_KB) "KB)");
+
+#define DEFAULT_SESS_QUEUE_DEPTH 512
+static int sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH;
+module_param_named(sess_queue_depth, sess_queue_depth, int, 0444);
+MODULE_PARM_DESC(sess_queue_depth,
+		 "Number of buffers for pending I/O requests to allocate"
+		 " per session. Maximum: " __stringify(MAX_SESS_QUEUE_DEPTH)
+		 " (default: " __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
+
+#define DEFAULT_INIT_POOL_SIZE 10
+static int init_pool_size = DEFAULT_INIT_POOL_SIZE;
+module_param_named(init_pool_size, init_pool_size, int, 0444);
+MODULE_PARM_DESC(init_pool_size,
+		 "Maximum size of the RDMA buffers pool to pre-allocate on"
+		 " module load, in number of sessions. (default: "
+		 __stringify(DEFAULT_INIT_POOL_SIZE) ")");
+
+#define DEFAULT_POOL_SIZE_HI_WM 100
+static int pool_size_hi_wm = DEFAULT_POOL_SIZE_HI_WM;
+module_param_named(pool_size_hi_wm, pool_size_hi_wm, int, 0444);
+MODULE_PARM_DESC(pool_size_hi_wm,
+		 "High watermark value for the size of RDMA buffers pool"
+		 " (in number of sessions). Newly allocated buffers will be"
+		 " added to the pool until pool_size_hi_wm is reached."
+		 " (default: " __stringify(DEFAULT_POOL_SIZE_HI_WM) ")");
+
+static int retry_count = 7;
+
+static int retry_count_set(const char *val, const struct kernel_param *kp)
+{
+	int err, ival;
+
+	err = kstrtoint(val, 0, &ival);
+	if (err)
+		return err;
+
+	if (ival < MIN_RTR_CNT || ival > MAX_RTR_CNT) {
+		ERR_NP("Invalid retry count value %d, has to be"
+		       " > %d, < %d\n", ival, MIN_RTR_CNT, MAX_RTR_CNT);
+		return -EINVAL;
+	}
+
+	retry_count = ival;
+	INFO_NP("QP retry count changed to %d\n", ival);
+
+	return 0;
+}
+
+static const struct kernel_param_ops retry_count_ops = {
+	.set		= retry_count_set,
+	.get		= param_get_int,
+};
+module_param_cb(retry_count, &retry_count_ops, &retry_count, 0644);
+
+MODULE_PARM_DESC(retry_count, "Number of times to send the message if the"
+		 " remote side didn't respond with Ack or Nack (default: 3,"
+		 " min: " __stringify(MIN_RTR_CNT) ", max: "
+		 __stringify(MAX_RTR_CNT) ")");
+
+static int default_heartbeat_timeout_ms = DEFAULT_HEARTBEAT_TIMEOUT_MS;
+
+static int default_heartbeat_timeout_set(const char *val,
+					 const struct kernel_param *kp)
+{
+	int ret, ival;
+
+	ret = kstrtouint(val, 0, &ival);
+	if (ret)
+		return ret;
+
+	ret = ibtrs_heartbeat_timeout_validate(ival);
+	if (ret)
+		return ret;
+
+	default_heartbeat_timeout_ms = ival;
+	INFO_NP("Default heartbeat timeout changed to %d\n", ival);
+
+	return 0;
+}
+
+static const struct kernel_param_ops heartbeat_timeout_ops = {
+	.set		= default_heartbeat_timeout_set,
+	.get		= param_get_int,
+};
+
+module_param_cb(default_heartbeat_timeout_ms, &heartbeat_timeout_ops,
+		&default_heartbeat_timeout_ms, 0644);
+MODULE_PARM_DESC(default_heartbeat_timeout_ms, "default heartbeat timeout,"
+		 " min. " __stringify(MIN_HEARTBEAT_TIMEOUT_MS)
+		 " (default:" __stringify(DEFAULT_HEARTBEAT_TIMEOUT_MS) ")");
+
+static char cq_affinity_list[256] = "";
+static cpumask_t cq_affinity_mask = { CPU_BITS_ALL };
+
+static void init_cq_affinity(void)
+{
+	sprintf(cq_affinity_list, "0-%d", nr_cpu_ids - 1);
+}
+
+static int cq_affinity_list_set(const char *val, const struct kernel_param *kp)
+{
+	int ret = 0, len = strlen(val);
+	cpumask_var_t new_value;
+
+	if (!strlen(cq_affinity_list))
+		init_cq_affinity();
+
+	if (len >= sizeof(cq_affinity_list))
+		return -EINVAL;
+	if (!alloc_cpumask_var(&new_value, GFP_KERNEL))
+		return -ENOMEM;
+
+	ret = cpulist_parse(val, new_value);
+	if (ret) {
+		ERR_NP("Can't set cq_affinity_list \"%s\": %d\n", val, ret);
+		goto free_cpumask;
+	}
+
+	strlcpy(cq_affinity_list, val, sizeof(cq_affinity_list));
+	*strchrnul(cq_affinity_list, '\n') = '\0';
+	cpumask_copy(&cq_affinity_mask, new_value);
+
+	INFO_NP("cq_affinity_list changed to %*pbl\n",
+		cpumask_pr_args(&cq_affinity_mask));
+free_cpumask:
+	free_cpumask_var(new_value);
+	return ret;
+}
+
+static struct kparam_string cq_affinity_list_kparam_str = {
+	.maxlen	= sizeof(cq_affinity_list),
+	.string	= cq_affinity_list
+};
+
+static const struct kernel_param_ops cq_affinity_list_ops = {
+	.set	= cq_affinity_list_set,
+	.get	= param_get_string,
+};
+
+module_param_cb(cq_affinity_list, &cq_affinity_list_ops,
+		&cq_affinity_list_kparam_str, 0644);
+MODULE_PARM_DESC(cq_affinity_list, "Sets the list of cpus to use as cq vectors."
+				   "(default: use all possible CPUs)");
+
+static char hostname[MAXHOSTNAMELEN] = "";
+
+static int hostname_set(const char *val, const struct kernel_param *kp)
+{
+	int ret = 0, len = strlen(val);
+
+	if (len >= sizeof(hostname))
+		return -EINVAL;
+	strlcpy(hostname, val, sizeof(hostname));
+	*strchrnul(hostname, '\n') = '\0';
+
+	INFO_NP("hostname changed to %s\n", hostname);
+	return ret;
+}
+
+static struct kparam_string hostname_kparam_str = {
+	.maxlen	= sizeof(hostname),
+	.string	= hostname
+};
+
+static const struct kernel_param_ops hostname_ops = {
+	.set	= hostname_set,
+	.get	= param_get_string,
+};
+
+module_param_cb(hostname, &hostname_ops,
+		&hostname_kparam_str, 0644);
+MODULE_PARM_DESC(hostname, "Sets hostname of local server, will send to the"
+		 " other side if set,  will display togather with addr "
+		 "(default: empty)");
+
+static struct dentry *ibtrs_srv_debugfs_dir;
+static struct dentry *mempool_debugfs_dir;
+
+static struct rdma_cm_id	*cm_id_ip;
+static struct rdma_cm_id	*cm_id_ib;
+static DEFINE_MUTEX(sess_mutex);
+static LIST_HEAD(sess_list);
+static DECLARE_WAIT_QUEUE_HEAD(sess_list_waitq);
+static struct workqueue_struct *destroy_wq;
+
+static LIST_HEAD(device_list);
+static DEFINE_MUTEX(device_list_mutex);
+
+static DEFINE_MUTEX(buf_pool_mutex);
+static LIST_HEAD(free_buf_pool_list);
+static int nr_free_buf_pool;
+static int nr_total_buf_pool;
+static int nr_active_sessions;
+
+static const struct ibtrs_srv_ops *srv_ops;
+enum ssm_ev {
+	SSM_EV_CON_DISCONNECTED,
+	SSM_EV_CON_EST_ERR,
+	SSM_EV_CON_CONNECTED,
+	SSM_EV_SESS_CLOSE,
+	SSM_EV_SYSFS_DISCONNECT
+};
+
+static inline const char *ssm_ev_str(enum ssm_ev ev)
+{
+	switch (ev) {
+	case SSM_EV_CON_DISCONNECTED:
+		return "SSM_EV_CON_DISCONNECTED";
+	case SSM_EV_CON_EST_ERR:
+		return "SSM_EV_CON_EST_ERR";
+	case SSM_EV_CON_CONNECTED:
+		return "SSM_EV_CON_CONNECTED";
+	case SSM_EV_SESS_CLOSE:
+		return "SSM_EV_SESS_CLOSE";
+	case SSM_EV_SYSFS_DISCONNECT:
+		return "SSM_EV_SYSFS_DISCONNECT";
+	default:
+		return "UNKNOWN";
+	}
+}
+
+static const char *ssm_state_str(enum ssm_state state)
+{
+	switch (state) {
+	case SSM_STATE_IDLE:
+		return "SSM_STATE_IDLE";
+	case SSM_STATE_CONNECTED:
+		return "SSM_STATE_CONNECTED";
+	case SSM_STATE_CLOSING:
+		return "SSM_STATE_CLOSING";
+	case SSM_STATE_CLOSED:
+		return "SSM_STATE_CLOSED";
+	default:
+		return "UNKNOWN";
+	}
+}
+
+enum csm_state {
+	CSM_STATE_REQUESTED,
+	CSM_STATE_CONNECTED,
+	CSM_STATE_CLOSING,
+	CSM_STATE_FLUSHING,
+	CSM_STATE_CLOSED
+};
+
+static inline const char *csm_state_str(enum csm_state s)
+{
+	switch (s) {
+	case CSM_STATE_REQUESTED:
+		return "CSM_STATE_REQUESTED";
+	case CSM_STATE_CONNECTED:
+		return "CSM_STATE_CONNECTED";
+	case CSM_STATE_CLOSING:
+		return "CSM_STATE_CLOSING";
+	case CSM_STATE_FLUSHING:
+		return "CSM_STATE_FLUSHING";
+	case CSM_STATE_CLOSED:
+		return "CSM_STATE_CLOSED";
+	default:
+		return "UNKNOWN";
+	}
+}
+
+enum csm_ev {
+	CSM_EV_CON_REQUEST,
+	CSM_EV_CON_ESTABLISHED,
+	CSM_EV_CON_ERROR,
+	CSM_EV_DEVICE_REMOVAL,
+	CSM_EV_SESS_CLOSING,
+	CSM_EV_CON_DISCONNECTED,
+	CSM_EV_BEACON_COMPLETED
+};
+
+static inline const char *csm_ev_str(enum csm_ev ev)
+{
+	switch (ev) {
+	case CSM_EV_CON_REQUEST:
+		return "CSM_EV_CON_REQUEST";
+	case CSM_EV_CON_ESTABLISHED:
+		return "CSM_EV_CON_ESTABLISHED";
+	case CSM_EV_CON_ERROR:
+		return "CSM_EV_CON_ERROR";
+	case CSM_EV_DEVICE_REMOVAL:
+		return "CSM_EV_DEVICE_REMOVAL";
+	case CSM_EV_SESS_CLOSING:
+		return "CSM_EV_SESS_CLOSING";
+	case CSM_EV_CON_DISCONNECTED:
+		return "CSM_EV_CON_DISCONNECTED";
+	case CSM_EV_BEACON_COMPLETED:
+		return "CSM_EV_BEACON_COMPLETED";
+	default:
+		return "UNKNOWN";
+	}
+}
+
+struct sess_put_work {
+	struct ibtrs_session	*sess;
+	struct work_struct	work;
+};
+
+struct ibtrs_srv_sysfs_put_work {
+	struct work_struct	work;
+	struct ibtrs_session	*sess;
+};
+
+struct ssm_create_con_work {
+	struct ibtrs_session	*sess;
+	struct rdma_cm_id	*cm_id;
+	struct work_struct	work;
+	bool			user;/* true if con is for user msg only */
+};
+
+struct ssm_work {
+	struct ibtrs_session	*sess;
+	enum ssm_ev		ev;
+	struct work_struct	work;
+};
+
+struct ibtrs_con {
+	/* list for ibtrs_session->con_list */
+	struct list_head	list;
+	enum csm_state		state;
+	/* true if con is for user msg only */
+	bool			user;
+	bool			failover_enabled;
+	struct ib_con		ib_con;
+	atomic_t		wr_cnt;
+	struct rdma_cm_id	*cm_id;
+	int			cq_vector;
+	struct ibtrs_session	*sess;
+	struct work_struct	cq_work;
+	struct workqueue_struct *cq_wq;
+	struct workqueue_struct *rdma_resp_wq;
+	struct ib_wc		wcs[WC_ARRAY_SIZE];
+	bool			device_being_removed;
+};
+
+struct csm_work {
+	struct ibtrs_con	*con;
+	enum csm_ev		ev;
+	struct work_struct	work;
+};
+
+struct msg_work {
+	struct work_struct	work;
+	struct ibtrs_con	*con;
+	void                    *msg;
+};
+
+struct ibtrs_device {
+	struct list_head	entry;
+	struct ib_device	*device;
+	struct ib_session	ib_sess;
+	struct completion	*ib_sess_destroy_completion;
+	struct kref		ref;
+};
+
+struct ibtrs_ops_id {
+	struct ibtrs_con		*con;
+	u32				msg_id;
+	u8				dir;
+	u64				data_dma_addr;
+	struct ibtrs_msg_req_rdma_write *req;
+	struct ib_rdma_wr		*tx_wr;
+	struct ib_sge			*tx_sg;
+	int				status;
+	struct work_struct		work;
+} ____cacheline_aligned;
+
+static void csm_set_state(struct ibtrs_con *con, enum csm_state s)
+{
+	if (con->state != s) {
+		DEB("changing con %p csm state from %s to %s\n", con,
+		    csm_state_str(con->state), csm_state_str(s));
+		con->state = s;
+	}
+}
+
+static void ssm_set_state(struct ibtrs_session *sess, enum ssm_state state)
+{
+	if (sess->state != state) {
+		DEB("changing sess %p ssm state from %s to %s\n", sess,
+		    ssm_state_str(sess->state), ssm_state_str(state));
+		sess->state = state;
+	}
+}
+
+static struct ibtrs_con *ibtrs_srv_get_user_con(struct ibtrs_session *sess)
+{
+	struct ibtrs_con *con;
+
+	if (sess->est_cnt > 0) {
+		list_for_each_entry(con, &sess->con_list, list) {
+			if (con->user && con->state == CSM_STATE_CONNECTED)
+				return con;
+		}
+	}
+	return NULL;
+}
+
+static void csm_init(struct ibtrs_con *con);
+static void csm_schedule_event(struct ibtrs_con *con, enum csm_ev ev);
+static int ssm_init(struct ibtrs_session *sess);
+static int ssm_schedule_event(struct ibtrs_session *sess, enum ssm_ev ev);
+
+static int ibtrs_srv_get_sess_current_port_num(struct ibtrs_session *sess)
+{
+	struct ibtrs_con *con, *next;
+	struct ibtrs_con *ucon = ibtrs_srv_get_user_con(sess);
+
+	if (sess->state != SSM_STATE_CONNECTED || !ucon)
+		return -ECOMM;
+
+	mutex_lock(&sess->lock);
+	if (WARN_ON(!sess->cm_id)) {
+		mutex_unlock(&sess->lock);
+		return -ENODEV;
+	}
+	list_for_each_entry_safe(con, next, &sess->con_list, list) {
+		if (unlikely(con->state != CSM_STATE_CONNECTED)) {
+			mutex_unlock(&sess->lock);
+			return -ECOMM;
+		}
+		if (con->cm_id->port_num != sess->cm_id->port_num) {
+			mutex_unlock(&sess->lock);
+			return 0;
+		}
+	}
+	mutex_unlock(&sess->lock);
+	return sess->cm_id->port_num;
+}
+
+int ibtrs_srv_current_hca_port_to_str(struct ibtrs_session *sess,
+				      char *buf, size_t len)
+{
+	if (!ibtrs_srv_get_sess_current_port_num(sess))
+		return scnprintf(buf, len, "migrating\n");
+
+	if (ibtrs_srv_get_sess_current_port_num(sess) < 0)
+		return ibtrs_srv_get_sess_current_port_num(sess);
+
+	return scnprintf(buf, len, "%u\n",
+			 ibtrs_srv_get_sess_current_port_num(sess));
+}
+
+inline const char *ibtrs_srv_get_sess_hca_name(struct ibtrs_session *sess)
+{
+	struct ibtrs_con *con = ibtrs_srv_get_user_con(sess);
+
+	if (con)
+		return sess->dev->device->name;
+	return "n/a";
+}
+
+static void ibtrs_srv_update_rdma_stats(struct ibtrs_srv_stats *s,
+					size_t size, bool read)
+{
+	int inflight;
+
+	if (read) {
+		atomic64_inc(&s->rdma_stats.cnt_read);
+		atomic64_add(size, &s->rdma_stats.size_total_read);
+	} else {
+		atomic64_inc(&s->rdma_stats.cnt_write);
+		atomic64_add(size, &s->rdma_stats.size_total_write);
+	}
+
+	inflight = atomic_inc_return(&s->rdma_stats.inflight);
+	atomic64_add(inflight, &s->rdma_stats.inflight_total);
+}
+
+static inline void ibtrs_srv_stats_dec_inflight(struct ibtrs_session *sess)
+{
+	if (!atomic_dec_return(&sess->stats.rdma_stats.inflight))
+		wake_up(&sess->bufs_wait);
+}
+
+int ibtrs_srv_reset_rdma_stats(struct ibtrs_session *sess, bool enable)
+{
+	if (enable) {
+		struct ibtrs_srv_stats_rdma_stats *r = &sess->stats.rdma_stats;
+
+		/*
+		 * TODO: inflight is used for flow control
+		 * we can't memset the whole structure, so reset each member
+		 */
+		atomic64_set(&r->cnt_read, 0);
+		atomic64_set(&r->size_total_read, 0);
+		atomic64_set(&r->cnt_write, 0);
+		atomic64_set(&r->size_total_write, 0);
+		atomic64_set(&r->inflight_total, 0);
+		return 0;
+	} else {
+		return -EINVAL;
+	}
+}
+
+ssize_t ibtrs_srv_stats_rdma_to_str(struct ibtrs_session *sess,
+				    char *page, size_t len)
+{
+	struct ibtrs_srv_stats_rdma_stats *r = &sess->stats.rdma_stats;
+
+	return scnprintf(page, len, "%ld %ld %ld %ld %u %ld\n",
+			 atomic64_read(&r->cnt_read),
+			 atomic64_read(&r->size_total_read),
+			 atomic64_read(&r->cnt_write),
+			 atomic64_read(&r->size_total_write),
+			 atomic_read(&r->inflight),
+			 (atomic64_read(&r->cnt_read) +
+			  atomic64_read(&r->cnt_write)) ?
+			 atomic64_read(&r->inflight_total) /
+			 (atomic64_read(&r->cnt_read) +
+			  atomic64_read(&r->cnt_write)) : 0);
+}
+
+int ibtrs_srv_reset_user_ib_msgs_stats(struct ibtrs_session *sess, bool enable)
+{
+	if (enable) {
+		memset(&sess->stats.user_ib_msgs, 0,
+		       sizeof(sess->stats.user_ib_msgs));
+		return 0;
+	} else {
+		return -EINVAL;
+	}
+}
+
+int ibtrs_srv_stats_user_ib_msgs_to_str(struct ibtrs_session *sess, char *buf,
+					size_t len)
+{
+	return snprintf(buf, len, "%ld %ld %ld %ld\n",
+			atomic64_read(&sess->stats.user_ib_msgs.recv_msg_cnt),
+			atomic64_read(&sess->stats.user_ib_msgs.recv_size),
+			atomic64_read(&sess->stats.user_ib_msgs.sent_msg_cnt),
+			atomic64_read(&sess->stats.user_ib_msgs.sent_size));
+}
+
+int ibtrs_srv_reset_wc_completion_stats(struct ibtrs_session *sess, bool enable)
+{
+	if (enable) {
+		memset(&sess->stats.wc_comp, 0, sizeof(sess->stats.wc_comp));
+		return 0;
+	} else {
+		return -EINVAL;
+	}
+}
+
+int ibtrs_srv_stats_wc_completion_to_str(struct ibtrs_session *sess, char *buf,
+					 size_t len)
+{
+	return snprintf(buf, len, "%d %ld %ld\n",
+			atomic_read(&sess->stats.wc_comp.max_wc_cnt),
+			atomic64_read(&sess->stats.wc_comp.total_wc_cnt),
+			atomic64_read(&sess->stats.wc_comp.calls));
+}
+
+ssize_t ibtrs_srv_reset_all_help(struct ibtrs_session *sess,
+				 char *page, size_t len)
+{
+	return scnprintf(page, PAGE_SIZE, "echo 1 to reset all statistics\n");
+}
+
+int ibtrs_srv_reset_all_stats(struct ibtrs_session *sess, bool enable)
+{
+	if (enable) {
+		ibtrs_srv_reset_wc_completion_stats(sess, enable);
+		ibtrs_srv_reset_user_ib_msgs_stats(sess, enable);
+		ibtrs_srv_reset_rdma_stats(sess, enable);
+		return 0;
+	} else {
+		return -EINVAL;
+	}
+}
+
+static inline bool srv_ops_are_valid(const struct ibtrs_srv_ops *ops)
+{
+	return ops && ops->sess_ev && ops->rdma_ev && ops->recv;
+}
+
+static int ibtrs_srv_sess_ev(struct ibtrs_session *sess,
+			     enum ibtrs_srv_sess_ev ev)
+{
+	if (!sess->session_announced_to_user &&
+	    ev != IBTRS_SRV_SESS_EV_CONNECTED)
+		return 0;
+
+	if (ev == IBTRS_SRV_SESS_EV_CONNECTED)
+		sess->session_announced_to_user = true;
+
+	return srv_ops->sess_ev(sess, ev, sess->priv);
+}
+
+static void free_id(struct ibtrs_ops_id *id)
+{
+	if (!id)
+		return;
+	kfree(id->tx_wr);
+	kfree(id->tx_sg);
+	kvfree(id);
+}
+
+static void free_sess_tx_bufs(struct ibtrs_session *sess)
+{
+	struct ibtrs_iu *e, *next;
+	int i;
+
+	if (sess->rdma_info_iu) {
+		ibtrs_iu_free(sess->rdma_info_iu, DMA_TO_DEVICE,
+			      sess->dev->device);
+		sess->rdma_info_iu = NULL;
+	}
+
+	WARN_ON(sess->tx_bufs_used);
+	list_for_each_entry_safe(e, next, &sess->tx_bufs, list) {
+		list_del(&e->list);
+		ibtrs_iu_free(e, DMA_TO_DEVICE, sess->dev->device);
+	}
+
+	if (sess->ops_ids) {
+		for (i = 0; i < sess->queue_depth; i++)
+			free_id(sess->ops_ids[i]);
+		kfree(sess->ops_ids);
+		sess->ops_ids = NULL;
+	}
+}
+
+static void put_tx_iu(struct ibtrs_session *sess, struct ibtrs_iu *iu)
+{
+	spin_lock(&sess->tx_bufs_lock);
+	ibtrs_iu_put(&sess->tx_bufs, iu);
+	sess->tx_bufs_used--;
+	spin_unlock(&sess->tx_bufs_lock);
+}
+
+static struct ibtrs_iu *get_tx_iu(struct ibtrs_session *sess)
+{
+	struct ibtrs_iu *iu;
+
+	spin_lock(&sess->tx_bufs_lock);
+	iu = ibtrs_iu_get(&sess->tx_bufs);
+	if (iu)
+		sess->tx_bufs_used++;
+	spin_unlock(&sess->tx_bufs_lock);
+
+	return iu;
+}
+
+static int rdma_write_sg(struct ibtrs_ops_id *id)
+{
+	int err, i, offset;
+	struct ib_send_wr *bad_wr;
+	struct ib_rdma_wr *wr = NULL;
+	struct ibtrs_session *sess = id->con->sess;
+
+	if (unlikely(id->req->sg_cnt == 0))
+		return -EINVAL;
+
+	offset = 0;
+	for (i = 0; i < id->req->sg_cnt; i++) {
+		struct ib_sge *list;
+
+		wr		= &id->tx_wr[i];
+		list		= &id->tx_sg[i];
+		list->addr	= id->data_dma_addr + offset;
+		list->length	= id->req->desc[i].len;
+
+		/* WR will fail with length error
+		 * if this is 0
+		 */
+		if (unlikely(list->length == 0)) {
+			ERR(sess, "Invalid RDMA-Write sg list length 0\n");
+			return -EINVAL;
+		}
+
+		list->lkey = sess->dev->ib_sess.pd->local_dma_lkey;
+		offset += list->length;
+
+		wr->wr.wr_id		= (uintptr_t)id;
+		wr->wr.sg_list		= list;
+		wr->wr.num_sge		= 1;
+		wr->remote_addr	= id->req->desc[i].addr;
+		wr->rkey	= id->req->desc[i].key;
+
+		if (i < (id->req->sg_cnt - 1)) {
+			wr->wr.next	= &id->tx_wr[i + 1].wr;
+			wr->wr.opcode	= IB_WR_RDMA_WRITE;
+			wr->wr.ex.imm_data	= 0;
+			wr->wr.send_flags	= 0;
+		}
+	}
+
+	wr->wr.opcode	= IB_WR_RDMA_WRITE_WITH_IMM;
+	wr->wr.next	= NULL;
+	wr->wr.send_flags	= atomic_inc_return(&id->con->wr_cnt) %
+				sess->queue_depth ? 0 : IB_SEND_SIGNALED;
+	wr->wr.ex.imm_data	= cpu_to_be32(id->msg_id << 16);
+
+	err = ib_post_send(id->con->ib_con.qp, &id->tx_wr[0].wr, &bad_wr);
+	if (unlikely(err))
+		ERR(sess,
+		    "Posting RDMA-Write-Request to QP failed, errno: %d\n",
+		    err);
+
+	return err;
+}
+
+static int send_io_resp_imm(struct ibtrs_con *con, int msg_id, s16 errno)
+{
+	int err;
+
+	err = ibtrs_write_empty_imm(con->ib_con.qp, (msg_id << 16) | (u16)errno,
+				    atomic_inc_return(&con->wr_cnt) %
+				    con->sess->queue_depth ? 0 :
+				    IB_SEND_SIGNALED);
+	if (unlikely(err))
+		ERR_RL(con->sess, "Posting RDMA-Write-Request to QP failed,"
+		       " errno: %d\n", err);
+
+	return err;
+}
+
+static int send_heartbeat_raw(struct ibtrs_con *con)
+{
+	int err;
+
+	err = ibtrs_write_empty_imm(con->ib_con.qp, UINT_MAX, IB_SEND_SIGNALED);
+	if (unlikely(err)) {
+		ERR(con->sess,
+		    "Sending heartbeat failed, posting msg to QP failed,"
+		    " errno: %d\n", err);
+		return err;
+	}
+
+	ibtrs_heartbeat_set_send_ts(&con->sess->heartbeat);
+	return err;
+}
+
+static int send_heartbeat(struct ibtrs_session *sess)
+{
+	struct ibtrs_con *con;
+
+	if (unlikely(list_empty(&sess->con_list)))
+		return -ENOENT;
+
+	con = list_first_entry(&sess->con_list, struct ibtrs_con, list);
+	WARN_ON(!con->user);
+
+	if (unlikely(con->state != CSM_STATE_CONNECTED))
+		return -ENOTCONN;
+
+	return send_heartbeat_raw(con);
+}
+
+static int ibtrs_srv_queue_resp_rdma(struct ibtrs_ops_id *id)
+{
+	if (unlikely(id->con->state != CSM_STATE_CONNECTED)) {
+		ERR_RL(id->con->sess, "Sending I/O response failed, "
+		       " session is disconnected, sess state %s,"
+		       " con state %s\n", ssm_state_str(id->con->sess->state),
+		       csm_state_str(id->con->state));
+		return -ECOMM;
+	}
+
+	if (WARN_ON(!queue_work(id->con->rdma_resp_wq, &id->work))) {
+		ERR_RL(id->con->sess, "Sending I/O response failed,"
+		       " couldn't queue work\n");
+		return -EPERM;
+	}
+
+	return 0;
+}
+
+static void ibtrs_srv_resp_rdma_worker(struct work_struct *work)
+{
+	struct ibtrs_ops_id *id;
+	int err;
+	struct ibtrs_session *sess;
+
+	id = container_of(work, struct ibtrs_ops_id, work);
+	sess = id->con->sess;
+
+	if (id->status || id->dir == WRITE) {
+		DEB("err or write msg_id=%d, status=%d, sending response\n",
+		    id->msg_id, id->status);
+
+		err = send_io_resp_imm(id->con, id->msg_id, id->status);
+		if (unlikely(err)) {
+			ERR_RL(sess, "Sending imm msg failed, errno: %d\n",
+			       err);
+			if (err == -ENOMEM && !ibtrs_srv_queue_resp_rdma(id))
+				return;
+			csm_schedule_event(id->con, CSM_EV_CON_ERROR);
+		}
+
+		ibtrs_heartbeat_set_send_ts(&sess->heartbeat);
+		ibtrs_srv_stats_dec_inflight(sess);
+		return;
+	}
+
+	DEB("read req msg_id=%d completed, sending data\n", id->msg_id);
+	err = rdma_write_sg(id);
+	if (unlikely(err)) {
+		ERR_RL(sess, "Sending I/O read response failed, errno: %d\n",
+		       err);
+		if (err == -ENOMEM && !ibtrs_srv_queue_resp_rdma(id))
+			return;
+		csm_schedule_event(id->con, CSM_EV_CON_ERROR);
+	}
+	ibtrs_heartbeat_set_send_ts(&sess->heartbeat);
+	ibtrs_srv_stats_dec_inflight(sess);
+}
+
+/*
+ * This function may be called from an interrupt context, e.g. on bio_endio
+ * callback on the user module. Queue the real work on a workqueue so we don't
+ * need to hold an irq spinlock.
+ */
+int ibtrs_srv_resp_rdma(struct ibtrs_ops_id *id, int status)
+{
+	int err = 0;
+
+	if (unlikely(!id)) {
+		ERR_NP("Sending I/O response failed, I/O ops id NULL\n");
+		return -EINVAL;
+	}
+
+	id->status = status;
+	INIT_WORK(&id->work, ibtrs_srv_resp_rdma_worker);
+
+	err = ibtrs_srv_queue_resp_rdma(id);
+	if (err)
+		ibtrs_srv_stats_dec_inflight(id->con->sess);
+	return err;
+}
+EXPORT_SYMBOL(ibtrs_srv_resp_rdma);
+
+static bool ibtrs_srv_get_usr_msg_buf(struct ibtrs_session *sess)
+{
+	return atomic_dec_if_positive(&sess->peer_usr_msg_bufs) >= 0;
+}
+
+int ibtrs_srv_send(struct ibtrs_session *sess, const struct kvec *vec,
+		   size_t nr)
+{
+	struct ibtrs_iu *iu = NULL;
+	struct ibtrs_con *con;
+	struct ibtrs_msg_user *msg;
+	size_t len;
+	bool closed_st = false;
+	int err;
+
+	if (WARN_ONCE(list_empty(&sess->con_list),
+		      "Sending message failed, no connection available\n"))
+		return -ECOMM;
+	con = ibtrs_srv_get_user_con(sess);
+
+	if (unlikely(!con)) {
+		WRN(sess,
+		    "Sending message failed, no user connection exists\n");
+		return -ECOMM;
+	}
+
+	len = kvec_length(vec, nr);
+
+	if (unlikely(len + IBTRS_HDR_LEN > MAX_REQ_SIZE)) {
+		WRN_RL(sess, "Sending message failed, passed data too big,"
+		       " %zu > %lu\n", len, MAX_REQ_SIZE - IBTRS_HDR_LEN);
+		return -EMSGSIZE;
+	}
+
+	wait_event(sess->mu_buf_wait_q,
+		   (closed_st = (con->state != CSM_STATE_CONNECTED)) ||
+		   ibtrs_srv_get_usr_msg_buf(sess));
+
+	if (unlikely(closed_st)) {
+		ERR_RL(sess, "Sending message failed, not connected (state"
+		       " %s)\n", csm_state_str(con->state));
+		return -ECOMM;
+	}
+
+	wait_event(sess->mu_iu_wait_q,
+		   (closed_st = (con->state != CSM_STATE_CONNECTED)) ||
+		   (iu = get_tx_iu(sess)) != NULL);
+
+	if (unlikely(closed_st)) {
+		ERR_RL(sess, "Sending message failed, not connected (state"
+		       " %s)\n", csm_state_str(con->state));
+		err = -ECOMM;
+		goto err_iu;
+	}
+
+	msg		= iu->buf;
+	msg->hdr.type	= IBTRS_MSG_USER;
+	msg->hdr.tsize	= len + IBTRS_HDR_LEN;
+	copy_from_kvec(msg->payl, vec, len);
+
+	ibtrs_deb_msg_hdr("Sending: ", &msg->hdr);
+	err = ibtrs_post_send(con->ib_con.qp,
+			      con->sess->dev->ib_sess.pd->__internal_mr, iu,
+			      msg->hdr.tsize);
+	if (unlikely(err)) {
+		ERR_RL(sess, "Sending message failed, posting message to QP"
+		       " failed, errno: %d\n", err);
+		goto err_post_send;
+	}
+	ibtrs_heartbeat_set_send_ts(&sess->heartbeat);
+
+	atomic64_inc(&sess->stats.user_ib_msgs.sent_msg_cnt);
+	atomic64_add(len, &sess->stats.user_ib_msgs.sent_size);
+
+	return 0;
+
+err_post_send:
+	put_tx_iu(sess, iu);
+	wake_up(&con->sess->mu_iu_wait_q);
+err_iu:
+	atomic_inc(&sess->peer_usr_msg_bufs);
+	wake_up(&con->sess->mu_buf_wait_q);
+	return err;
+}
+EXPORT_SYMBOL(ibtrs_srv_send);
+
+inline void ibtrs_srv_set_sess_priv(struct ibtrs_session *sess, void *priv)
+{
+	sess->priv = priv;
+}
+EXPORT_SYMBOL(ibtrs_srv_set_sess_priv);
+
+static int ibtrs_post_recv(struct ibtrs_con *con, struct ibtrs_iu *iu)
+{
+	struct ib_recv_wr wr, *bad_wr;
+	struct ib_sge list;
+	int err;
+
+	list.addr   = iu->dma_addr;
+	list.length = iu->size;
+	list.lkey   = con->sess->dev->ib_sess.pd->local_dma_lkey;
+
+	if (unlikely(list.length == 0)) {
+		ERR_RL(con->sess, "Posting recv buffer failed, invalid sg list"
+		       " length 0\n");
+		return -EINVAL;
+	}
+
+	wr.next     = NULL;
+	wr.wr_id    = (uintptr_t)iu;
+	wr.sg_list  = &list;
+	wr.num_sge  = 1;
+
+	err = ib_post_recv(con->ib_con.qp, &wr, &bad_wr);
+	if (unlikely(err))
+		ERR_RL(con->sess, "Posting recv buffer failed, errno: %d\n",
+		       err);
+
+	return err;
+}
+
+static struct ibtrs_rcv_buf_pool *alloc_rcv_buf_pool(void)
+{
+	struct ibtrs_rcv_buf_pool *pool;
+	struct page *cont_pages = NULL;
+	struct ibtrs_mem_chunk *mem_chunk;
+	int alloced_bufs = 0;
+	int rcv_buf_order = get_order(rcv_buf_size);
+	int max_order, alloc_order;
+	unsigned int alloced_size;
+
+	pool = kzalloc(sizeof(*pool), GFP_KERNEL);
+	if (!pool) {
+		ERR_NP("Failed to allocate memory for buffer pool struct\n");
+		return NULL;
+	}
+
+	pool->rcv_bufs = kcalloc(sess_queue_depth, sizeof(*pool->rcv_bufs),
+				 GFP_KERNEL);
+	if (!pool->rcv_bufs) {
+		ERR_NP("Failed to allocate array for receive buffers\n");
+		kfree(pool);
+		return NULL;
+	}
+	INIT_LIST_HEAD(&pool->chunk_list);
+
+	while (alloced_bufs < sess_queue_depth) {
+		mem_chunk = kzalloc(sizeof(*mem_chunk), GFP_KERNEL);
+		if (!mem_chunk) {
+			ERR_NP("Failed to allocate memory for memory chunk"
+			       " struct\n");
+			goto alloc_fail;
+		}
+
+		max_order = min(MAX_ORDER - 1,
+				get_order((sess_queue_depth - alloced_bufs) *
+					  rcv_buf_size));
+		for (alloc_order = max_order; alloc_order > rcv_buf_order;
+		     alloc_order--) {
+			cont_pages = alloc_pages(__GFP_NORETRY | __GFP_NOWARN |
+						 __GFP_ZERO, alloc_order);
+			if (cont_pages) {
+				DEB("Allocated order %d pages\n", alloc_order);
+				break;
+			}
+			DEB("Failed to allocate order %d pages\n", alloc_order);
+		}
+
+		if (cont_pages) {
+			void *recv_buf_start;
+
+			mem_chunk->order = alloc_order;
+			mem_chunk->addr = page_address(cont_pages);
+			list_add_tail(&mem_chunk->list, &pool->chunk_list);
+			alloced_size = (1 << alloc_order) * PAGE_SIZE;
+
+			DEB("Memory chunk size: %d, address: %p\n",
+			    alloced_size, mem_chunk->addr);
+
+			recv_buf_start = mem_chunk->addr;
+			while (alloced_size > rcv_buf_size &&
+			       alloced_bufs < sess_queue_depth) {
+				pool->rcv_bufs[alloced_bufs].buf =
+					recv_buf_start;
+				alloced_bufs++;
+				recv_buf_start += rcv_buf_size;
+				alloced_size -= rcv_buf_size;
+			}
+		} else {
+			/* if allocation of pages to fit multiple rcv_buf's
+			 * failed we fall back to alloc'ing exact number of
+			 * pages
+			 */
+			gfp_t gfp_mask = (GFP_KERNEL | __GFP_REPEAT |
+					  __GFP_ZERO);
+			void *addr = alloc_pages_exact(rcv_buf_size, gfp_mask);
+
+			if (!addr) {
+				ERR_NP("Failed to allocate memory for "
+				       " receive buffer (size %dB)\n",
+				       rcv_buf_size);
+				goto alloc_fail;
+			}
+
+			DEB("Alloced pages exact at %p for rcv_bufs[%d]\n",
+			    addr, alloced_bufs);
+
+			mem_chunk->addr = addr;
+			mem_chunk->order = IBTRS_MEM_CHUNK_NOORDER;
+			list_add_tail(&mem_chunk->list, &pool->chunk_list);
+
+			pool->rcv_bufs[alloced_bufs].buf = addr;
+			alloced_bufs++;
+		}
+	}
+
+	return pool;
+
+alloc_fail:
+	if (!list_empty(&pool->chunk_list)) {
+		struct ibtrs_mem_chunk *tmp;
+
+		list_for_each_entry_safe(mem_chunk, tmp, &pool->chunk_list,
+					 list) {
+			if (mem_chunk->order != IBTRS_MEM_CHUNK_NOORDER)
+				free_pages((unsigned long)mem_chunk->addr,
+					   mem_chunk->order);
+			else
+				free_pages_exact(mem_chunk->addr, rcv_buf_size);
+			list_del(&mem_chunk->list);
+			kfree(mem_chunk);
+		}
+	}
+	kfree(pool->rcv_bufs);
+	kfree(pool);
+	return NULL;
+}
+
+static struct ibtrs_rcv_buf_pool *__get_pool_from_list(void)
+{
+	struct ibtrs_rcv_buf_pool *pool = NULL;
+
+	if (!list_empty(&free_buf_pool_list)) {
+		DEB("Getting buf pool from pre-allocated list\n");
+		pool = list_first_entry(&free_buf_pool_list,
+					struct ibtrs_rcv_buf_pool, list);
+		list_del(&pool->list);
+		nr_free_buf_pool--;
+	}
+
+	return pool;
+}
+
+static void __put_pool_on_list(struct ibtrs_rcv_buf_pool *pool)
+{
+	list_add(&pool->list, &free_buf_pool_list);
+	nr_free_buf_pool++;
+	DEB("Put buf pool back to the free list (nr_free_buf_pool: %d)\n",
+	    nr_free_buf_pool);
+}
+
+static struct ibtrs_rcv_buf_pool *get_alloc_rcv_buf_pool(void)
+{
+	struct ibtrs_rcv_buf_pool *pool = NULL;
+
+	mutex_lock(&buf_pool_mutex);
+	if (nr_active_sessions >= pool_size_hi_wm) {
+		WARN_ON(nr_free_buf_pool || !list_empty(&free_buf_pool_list));
+		DEB("current nr_active_sessions (%d), pool_size_hi_wm (%d),"
+		    ", allocating.\n", nr_active_sessions, pool_size_hi_wm);
+		pool = alloc_rcv_buf_pool();
+	} else if (nr_total_buf_pool < pool_size_hi_wm) {
+		/* try to allocate new pool while used+free is less then
+		 * watermark
+		 */
+		DEB("nr_total_buf_pool (%d) smaller than pool_size_hi_wm (%d)"
+		    ", trying to allocate.\n", nr_total_buf_pool,
+		    pool_size_hi_wm);
+		pool = alloc_rcv_buf_pool();
+		if (pool)
+			nr_total_buf_pool++;
+		else
+			pool = __get_pool_from_list();
+	} else if (nr_total_buf_pool == pool_size_hi_wm) {
+		/* pool size has already reached watermark, check if there are
+		 * free pools on the list
+		 */
+		if (nr_free_buf_pool) {
+			pool = __get_pool_from_list();
+			WARN_ON(!pool);
+			DEB("Got pool from free list (nr_free_buf_pool: %d)\n",
+			    nr_free_buf_pool);
+		} else {
+			/* all pools are already being used */
+			DEB("No free pool on the list\n");
+			WARN_ON((nr_active_sessions != nr_total_buf_pool) ||
+				nr_free_buf_pool);
+			pool = alloc_rcv_buf_pool();
+		}
+	} else {
+		/* all possibilities should be covered */
+		WARN_ON(1);
+	}
+
+	if (pool)
+		nr_active_sessions++;
+
+	mutex_unlock(&buf_pool_mutex);
+
+	return pool;
+}
+
+static void free_recv_buf_pool(struct ibtrs_rcv_buf_pool *pool)
+{
+	struct ibtrs_mem_chunk *mem_chunk, *tmp;
+
+	DEB("Freeing memory chunks for %d receive buffers\n", sess_queue_depth);
+
+	list_for_each_entry_safe(mem_chunk, tmp, &pool->chunk_list, list) {
+		if (mem_chunk->order != IBTRS_MEM_CHUNK_NOORDER)
+			free_pages((unsigned long)mem_chunk->addr,
+				   mem_chunk->order);
+		else
+			free_pages_exact(mem_chunk->addr, rcv_buf_size);
+		list_del(&mem_chunk->list);
+		kfree(mem_chunk);
+	}
+
+	kfree(pool->rcv_bufs);
+	kfree(pool);
+}
+
+static void put_rcv_buf_pool(struct ibtrs_rcv_buf_pool *pool)
+{
+	mutex_lock(&buf_pool_mutex);
+	nr_active_sessions--;
+	if (nr_active_sessions >= pool_size_hi_wm) {
+		mutex_unlock(&buf_pool_mutex);
+		DEB("Freeing buf pool"
+		    " (nr_active_sessions: %d, pool_size_hi_wm: %d)\n",
+		    nr_active_sessions, pool_size_hi_wm);
+		free_recv_buf_pool(pool);
+	} else {
+		__put_pool_on_list(pool);
+		mutex_unlock(&buf_pool_mutex);
+	}
+}
+
+static void unreg_cont_bufs(struct ibtrs_session *sess)
+{
+	struct ibtrs_rcv_buf *buf;
+	int i;
+
+	DEB("Unregistering %d RDMA buffers\n", sess_queue_depth);
+	for (i = 0; i < sess_queue_depth; i++) {
+		buf = &sess->rcv_buf_pool->rcv_bufs[i];
+
+		ib_dma_unmap_single(sess->dev->device, buf->rdma_addr,
+				    rcv_buf_size, DMA_BIDIRECTIONAL);
+	}
+}
+
+static void release_cont_bufs(struct ibtrs_session *sess)
+{
+	unreg_cont_bufs(sess);
+	put_rcv_buf_pool(sess->rcv_buf_pool);
+	sess->rcv_buf_pool = NULL;
+}
+
+static int setup_cont_bufs(struct ibtrs_session *sess)
+{
+	struct ibtrs_rcv_buf *buf;
+	int i, err;
+
+	sess->rcv_buf_pool = get_alloc_rcv_buf_pool();
+	if (!sess->rcv_buf_pool) {
+		ERR(sess, "Failed to allocate receive buffers for session\n");
+		return -ENOMEM;
+	}
+
+	DEB("Mapping %d buffers for RDMA\n", sess->queue_depth);
+	for (i = 0; i < sess->queue_depth; i++) {
+		buf = &sess->rcv_buf_pool->rcv_bufs[i];
+
+		buf->rdma_addr = ib_dma_map_single(sess->dev->device, buf->buf,
+						   rcv_buf_size,
+						   DMA_BIDIRECTIONAL);
+		if (unlikely(ib_dma_mapping_error(sess->dev->device,
+						  buf->rdma_addr))) {
+			ERR_NP("Registering RDMA buf failed,"
+			       " DMA mapping failed\n");
+			err = -EIO;
+			goto err_map;
+		}
+	}
+
+	sess->off_len = 31 - ilog2(sess->queue_depth - 1);
+	sess->off_mask = (1 << sess->off_len) - 1;
+
+	INFO(sess, "Allocated %d %dKB RDMA receive buffers, %dKB in total\n",
+	     sess->queue_depth, rcv_buf_size >> 10,
+	     sess->queue_depth * rcv_buf_size >> 10);
+
+	return 0;
+
+err_map:
+	for (i = 0; i < sess->queue_depth; i++) {
+		buf = &sess->rcv_buf_pool->rcv_bufs[i];
+
+		if (buf->rdma_addr &&
+		    !ib_dma_mapping_error(sess->dev->device, buf->rdma_addr))
+			ib_dma_unmap_single(sess->dev->device, buf->rdma_addr,
+					    rcv_buf_size, DMA_BIDIRECTIONAL);
+	}
+	return err;
+}
+
+static void fill_ibtrs_msg_sess_open_resp(struct ibtrs_msg_sess_open_resp *msg,
+					  struct ibtrs_con *con)
+{
+	int i;
+
+	msg->hdr.type   = IBTRS_MSG_SESS_OPEN_RESP;
+	msg->hdr.tsize  = IBTRS_MSG_SESS_OPEN_RESP_LEN(con->sess->queue_depth);
+
+	msg->ver = con->sess->ver;
+	strlcpy(msg->hostname, hostname, sizeof(msg->hostname));
+	msg->cnt = con->sess->queue_depth;
+	msg->rkey = con->sess->dev->ib_sess.pd->unsafe_global_rkey;
+	msg->max_inflight_msg = con->sess->queue_depth;
+	msg->max_io_size = max_io_size;
+	msg->max_req_size = MAX_REQ_SIZE;
+	for (i = 0; i < con->sess->queue_depth; i++)
+		msg->addr[i] = con->sess->rcv_buf_pool->rcv_bufs[i].rdma_addr;
+}
+
+static void free_sess_rx_bufs(struct ibtrs_session *sess)
+{
+	int i;
+
+	if (sess->dummy_rx_iu) {
+		ibtrs_iu_free(sess->dummy_rx_iu, DMA_FROM_DEVICE,
+			      sess->dev->device);
+		sess->dummy_rx_iu = NULL;
+	}
+
+	if (sess->usr_rx_ring) {
+		for (i = 0; i < USR_CON_BUF_SIZE; ++i)
+			if (sess->usr_rx_ring[i])
+				ibtrs_iu_free(sess->usr_rx_ring[i],
+					      DMA_FROM_DEVICE,
+					      sess->dev->device);
+		kfree(sess->usr_rx_ring);
+		sess->usr_rx_ring = NULL;
+	}
+}
+
+static int alloc_sess_tx_bufs(struct ibtrs_session *sess)
+{
+	struct ibtrs_iu *iu;
+	struct ibtrs_ops_id *id;
+	struct ib_device *ib_dev = sess->dev->device;
+	int i;
+
+	sess->rdma_info_iu =
+		ibtrs_iu_alloc(0, IBTRS_MSG_SESS_OPEN_RESP_LEN(
+			       sess->queue_depth), GFP_KERNEL, ib_dev,
+			       DMA_TO_DEVICE, true);
+	if (unlikely(!sess->rdma_info_iu)) {
+		ERR_RL(sess, "Can't allocate transfer buffer for "
+			     "sess open resp\n");
+		return -ENOMEM;
+	}
+
+	sess->ops_ids = kcalloc(sess->queue_depth, sizeof(*sess->ops_ids),
+				GFP_KERNEL);
+	if (unlikely(!sess->ops_ids)) {
+		ERR_RL(sess, "Can't alloc ops_ids for the session\n");
+		goto err;
+	}
+
+	for (i = 0; i < sess->queue_depth; ++i) {
+		id = ibtrs_zalloc(sizeof(*id));
+		if (unlikely(!id)) {
+			ERR_RL(sess, "Can't alloc ops id for session\n");
+			goto err;
+		}
+		sess->ops_ids[i] = id;
+	}
+
+	for (i = 0; i < USR_MSG_CNT; ++i) {
+		iu = ibtrs_iu_alloc(i, MAX_REQ_SIZE, GFP_KERNEL,
+				    ib_dev, DMA_TO_DEVICE, true);
+		if (!iu) {
+			ERR_RL(sess, "Can't alloc tx bufs for user msgs\n");
+			goto err;
+		}
+		list_add(&iu->list, &sess->tx_bufs);
+	}
+
+	return 0;
+
+err:
+	free_sess_tx_bufs(sess);
+	return -ENOMEM;
+}
+
+static int alloc_sess_rx_bufs(struct ibtrs_session *sess)
+{
+	int i;
+
+	sess->dummy_rx_iu =
+		ibtrs_iu_alloc(0, IBTRS_HDR_LEN, GFP_KERNEL, sess->dev->device,
+			       DMA_FROM_DEVICE, true);
+	if (!sess->dummy_rx_iu) {
+		ERR(sess, "Failed to allocate dummy IU to receive "
+			  "immediate messages on io connections\n");
+		goto err;
+	}
+
+	sess->usr_rx_ring = kcalloc(USR_CON_BUF_SIZE,
+				    sizeof(*sess->usr_rx_ring), GFP_KERNEL);
+	if (!sess->usr_rx_ring) {
+		ERR(sess, "Alloc usr_rx_ring for session failed\n");
+		goto err;
+	}
+
+	for (i = 0; i < USR_CON_BUF_SIZE; ++i) {
+		sess->usr_rx_ring[i] =
+			ibtrs_iu_alloc(i, MAX_REQ_SIZE, GFP_KERNEL,
+				       sess->dev->device, DMA_FROM_DEVICE,
+				       true);
+		if (!sess->usr_rx_ring[i]) {
+			ERR(sess, "Failed to allocate iu for usr_rx_ring\n");
+			goto err;
+		}
+	}
+
+	return 0;
+
+err:
+	free_sess_rx_bufs(sess);
+	return -ENOMEM;
+}
+
+static int alloc_sess_bufs(struct ibtrs_session *sess)
+{
+	int err;
+
+	err = alloc_sess_rx_bufs(sess);
+	if (err)
+		return err;
+	else
+		return alloc_sess_tx_bufs(sess);
+}
+
+static int post_io_con_recv(struct ibtrs_con *con)
+{
+	int i, ret;
+
+	for (i = 0; i < con->sess->queue_depth; i++) {
+		ret = ibtrs_post_recv(con, con->sess->dummy_rx_iu);
+		if (unlikely(ret))
+			return ret;
+	}
+
+	return 0;
+}
+
+static int post_user_con_recv(struct ibtrs_con *con)
+{
+	int i, ret;
+
+	for (i = 0; i < USR_CON_BUF_SIZE; i++) {
+		struct ibtrs_iu *iu = con->sess->usr_rx_ring[i];
+
+		ret = ibtrs_post_recv(con, iu);
+		if (unlikely(ret))
+			return ret;
+	}
+
+	return 0;
+}
+
+static int post_recv(struct ibtrs_con *con)
+{
+	if (con->user)
+		return post_user_con_recv(con);
+	else
+		return post_io_con_recv(con);
+
+	return 0;
+}
+
+static void free_sess_bufs(struct ibtrs_session *sess)
+{
+	free_sess_rx_bufs(sess);
+	free_sess_tx_bufs(sess);
+}
+
+static int init_transfer_bufs(struct ibtrs_con *con)
+{
+	int err;
+	struct ibtrs_session *sess = con->sess;
+
+	if (con->user) {
+		err = alloc_sess_bufs(sess);
+		if (err) {
+			ERR(sess, "Alloc sess bufs failed: %d\n", err);
+			return err;
+		}
+	}
+
+	return post_recv(con);
+}
+
+static void process_rdma_write_req(struct ibtrs_con *con,
+				   struct ibtrs_msg_req_rdma_write *req,
+				   u32 buf_id, u32 off)
+{
+	int ret;
+	struct ibtrs_ops_id *id;
+	struct ibtrs_session *sess = con->sess;
+
+	if (unlikely(sess->state != SSM_STATE_CONNECTED ||
+		     con->state != CSM_STATE_CONNECTED)) {
+		ERR_RL(sess, "Processing RDMA-Write-Req request failed, "
+		       " session is disconnected, sess state %s,"
+		       " con state %s\n", ssm_state_str(sess->state),
+		       csm_state_str(con->state));
+		return;
+	}
+	ibtrs_srv_update_rdma_stats(&sess->stats, off, true);
+	id = sess->ops_ids[buf_id];
+	kfree(id->tx_wr);
+	kfree(id->tx_sg);
+	id->con		= con;
+	id->dir		= READ;
+	id->msg_id	= buf_id;
+	id->req		= req;
+	id->tx_wr	= kcalloc(req->sg_cnt, sizeof(*id->tx_wr), GFP_KERNEL);
+	id->tx_sg	= kcalloc(req->sg_cnt, sizeof(*id->tx_sg), GFP_KERNEL);
+	if (!id->tx_wr || !id->tx_sg) {
+		ERR_RL(sess, "Processing RDMA-Write-Req failed, work request "
+		       "or scatter gather allocation failed for msg_id %d\n",
+		       buf_id);
+		ret = -ENOMEM;
+		goto send_err_msg;
+	}
+
+	id->data_dma_addr = sess->rcv_buf_pool->rcv_bufs[buf_id].rdma_addr;
+	ret = srv_ops->rdma_ev(con->sess, sess->priv, id,
+			       IBTRS_SRV_RDMA_EV_WRITE_REQ,
+			       sess->rcv_buf_pool->rcv_bufs[buf_id].buf, off);
+
+	if (unlikely(ret)) {
+		ERR_RL(sess, "Processing RDMA-Write-Req failed, user "
+		       "module cb reported for msg_id %d, errno: %d\n",
+		       buf_id, ret);
+		goto send_err_msg;
+	}
+
+	return;
+
+send_err_msg:
+	ret = send_io_resp_imm(con, buf_id, ret);
+	if (ret < 0) {
+		ERR_RL(sess, "Sending err msg for failed RDMA-Write-Req"
+		       " failed, msg_id %d, errno: %d\n", buf_id, ret);
+		csm_schedule_event(con, CSM_EV_CON_ERROR);
+	}
+	ibtrs_srv_stats_dec_inflight(sess);
+}
+
+static void process_rdma_write(struct ibtrs_con *con,
+			       struct ibtrs_msg_rdma_write *req,
+			       u32 buf_id, u32 off)
+{
+	int ret;
+	struct ibtrs_ops_id *id;
+	struct ibtrs_session *sess = con->sess;
+
+	if (unlikely(sess->state != SSM_STATE_CONNECTED ||
+		     con->state != CSM_STATE_CONNECTED)) {
+		ERR_RL(sess, "Processing RDMA-Write request failed, "
+		       " session is disconnected, sess state %s,"
+		       " con state %s\n", ssm_state_str(sess->state),
+		       csm_state_str(con->state));
+		return;
+	}
+	ibtrs_srv_update_rdma_stats(&sess->stats, off, false);
+	id = con->sess->ops_ids[buf_id];
+	id->con    = con;
+	id->dir    = WRITE;
+	id->msg_id = buf_id;
+
+	ret = srv_ops->rdma_ev(sess, sess->priv, id, IBTRS_SRV_RDMA_EV_RECV,
+			       sess->rcv_buf_pool->rcv_bufs[buf_id].buf, off);
+	if (unlikely(ret)) {
+		ERR_RL(sess, "Processing RDMA-Write failed, user module"
+		       " callback reports errno: %d\n", ret);
+		goto send_err_msg;
+	}
+
+	return;
+
+send_err_msg:
+	ret = send_io_resp_imm(con, buf_id, ret);
+	if (ret < 0) {
+		ERR_RL(sess, "Processing RDMA-Write failed, sending I/O"
+		       " response failed, msg_id %d, errno: %d\n",
+		       buf_id, ret);
+		csm_schedule_event(con, CSM_EV_CON_ERROR);
+	}
+	ibtrs_srv_stats_dec_inflight(sess);
+}
+
+static int ibtrs_send_usr_msg_ack(struct ibtrs_con *con)
+{
+	struct ibtrs_session *sess;
+	int err;
+
+	sess = con->sess;
+
+	if (unlikely(con->state != CSM_STATE_CONNECTED)) {
+		ERR_RL(sess, "Sending user msg ack failed, disconnected"
+			" Connection state is %s\n", csm_state_str(con->state));
+		return -ECOMM;
+	}
+	DEB("Sending user message ack\n");
+	err = ibtrs_write_empty_imm(con->ib_con.qp, UINT_MAX - 1,
+				    IB_SEND_SIGNALED);
+	if (unlikely(err)) {
+		ERR_RL(sess, "Sending user Ack msg failed, errno: %d\n", err);
+		return err;
+	}
+
+	ibtrs_heartbeat_set_send_ts(&sess->heartbeat);
+	return 0;
+}
+
+static void process_msg_user(struct ibtrs_con *con,
+			     struct ibtrs_msg_user *msg)
+{
+	int len;
+	struct ibtrs_session *sess = con->sess;
+
+	len = msg->hdr.tsize - IBTRS_HDR_LEN;
+	if (unlikely(sess->state < SSM_STATE_CONNECTED || !sess->priv)) {
+		ERR_RL(sess, "Sending user msg failed, session isn't ready."
+			" Session state is %s\n", ssm_state_str(sess->state));
+		return;
+	}
+
+	srv_ops->recv(sess, sess->priv, msg->payl, len);
+
+	atomic64_inc(&sess->stats.user_ib_msgs.recv_msg_cnt);
+	atomic64_add(len, &sess->stats.user_ib_msgs.recv_size);
+}
+
+static void process_msg_user_ack(struct ibtrs_con *con)
+{
+	struct ibtrs_session *sess = con->sess;
+
+	atomic_inc(&sess->peer_usr_msg_bufs);
+	wake_up(&con->sess->mu_buf_wait_q);
+}
+
+static void ibtrs_handle_write(struct ibtrs_con *con, struct ibtrs_iu *iu,
+			       struct ibtrs_msg_hdr *hdr, u32 id, u32 off)
+{
+	struct ibtrs_session *sess = con->sess;
+	int ret;
+
+	if (unlikely(ibtrs_validate_message(sess->queue_depth, hdr))) {
+		ERR(sess,
+		    "Processing I/O failed, message validation failed\n");
+		ret = ibtrs_post_recv(con, iu);
+		if (unlikely(ret != 0))
+			ERR(sess,
+			    "Failed to post receive buffer to HCA, errno: %d\n",
+			    ret);
+		goto err;
+	}
+
+	DEB("recv completion, type 0x%02x, tag %u, id %u, off %u\n",
+	    hdr->type, iu->tag, id, off);
+	print_hex_dump_debug("", DUMP_PREFIX_OFFSET, 8, 1,
+			     hdr, IBTRS_HDR_LEN + 32, true);
+	ret = ibtrs_post_recv(con, iu);
+	if (unlikely(ret != 0)) {
+		ERR(sess, "Posting receive buffer to HCA failed, errno: %d\n",
+		    ret);
+		goto err;
+	}
+
+	switch (hdr->type) {
+	case IBTRS_MSG_RDMA_WRITE:
+		process_rdma_write(con, (struct ibtrs_msg_rdma_write *)hdr,
+				   id, off);
+		break;
+	case IBTRS_MSG_REQ_RDMA_WRITE:
+		process_rdma_write_req(con,
+				       (struct ibtrs_msg_req_rdma_write *)hdr,
+				       id, off);
+		break;
+	default:
+		ERR(sess, "Processing I/O request failed, "
+		    "unknown message type received: 0x%02x\n", hdr->type);
+		goto err;
+	}
+
+	return;
+
+err:
+	csm_schedule_event(con, CSM_EV_CON_ERROR);
+}
+
+static void msg_worker(struct work_struct *work)
+{
+	struct msg_work *w;
+	struct ibtrs_con *con;
+	struct ibtrs_msg_user *msg;
+
+	w = container_of(work, struct msg_work, work);
+	con = w->con;
+	msg = w->msg;
+	kvfree(w);
+	process_msg_user(con, msg);
+	kvfree(msg);
+}
+
+static int ibtrs_schedule_msg(struct ibtrs_con *con, struct ibtrs_msg_user *msg)
+{
+	struct msg_work *w;
+
+	w = ibtrs_malloc(sizeof(*w));
+	if (!w)
+		return -ENOMEM;
+
+	w->con = con;
+	w->msg = ibtrs_malloc(msg->hdr.tsize);
+	if (!w->msg) {
+		kvfree(w);
+		return -ENOMEM;
+	}
+	memcpy(w->msg, msg, msg->hdr.tsize);
+	INIT_WORK(&w->work, msg_worker);
+	queue_work(con->sess->msg_wq, &w->work);
+	return 0;
+}
+
+static void ibtrs_handle_recv(struct ibtrs_con *con,  struct ibtrs_iu *iu)
+{
+	struct ibtrs_msg_hdr *hdr;
+	struct ibtrs_msg_sess_info *req;
+	struct ibtrs_session *sess = con->sess;
+	int ret;
+	u8 type;
+
+	hdr = (struct ibtrs_msg_hdr *)iu->buf;
+	if (unlikely(ibtrs_validate_message(sess->queue_depth, hdr)))
+		goto err1;
+
+	type = hdr->type;
+
+	DEB("recv completion, type 0x%02x, tag %u\n",
+	    type, iu->tag);
+	print_hex_dump_debug("", DUMP_PREFIX_OFFSET, 8, 1,
+			     iu->buf, IBTRS_HDR_LEN, true);
+
+	switch (type) {
+	case IBTRS_MSG_USER:
+		ret = ibtrs_schedule_msg(con, iu->buf);
+		if (unlikely(ret)) {
+			ERR_RL(sess, "Scheduling worker of user message "
+			       "to user module failed, errno: %d\n", ret);
+			goto err1;
+		}
+		ret = ibtrs_post_recv(con, iu);
+		if (unlikely(ret)) {
+			ERR_RL(sess, "Posting receive buffer of user message "
+			       "to HCA failed, errno: %d\n", ret);
+			goto err2;
+		}
+		ret = ibtrs_send_usr_msg_ack(con);
+		if (unlikely(ret)) {
+			ERR_RL(sess, "Sending ACK for user message failed, "
+			       "errno: %d\n", ret);
+			goto err2;
+		}
+		return;
+	case IBTRS_MSG_SESS_INFO:
+		ret = ibtrs_post_recv(con, iu);
+		if (unlikely(ret)) {
+			ERR_RL(sess, "Posting receive buffer of sess info "
+			       "to HCA failed, errno: %d\n", ret);
+			goto err2;
+		}
+		req = (struct ibtrs_msg_sess_info *)hdr;
+		strlcpy(sess->hostname, req->hostname, sizeof(sess->hostname));
+		return;
+	default:
+		ERR(sess, "Processing received message failed, "
+		    "unknown type: 0x%02x\n", type);
+		goto err1;
+	}
+
+err1:
+	ibtrs_post_recv(con, iu);
+err2:
+	ERR(sess, "Failed to process IBTRS message\n");
+	csm_schedule_event(con, CSM_EV_CON_ERROR);
+}
+
+static void add_con_to_list(struct ibtrs_session *sess, struct ibtrs_con *con)
+{
+	mutex_lock(&sess->lock);
+	list_add_tail(&con->list, &sess->con_list);
+	mutex_unlock(&sess->lock);
+}
+
+static void remove_con_from_list(struct ibtrs_con *con)
+{
+	if (WARN_ON(!con->sess))
+		return;
+	mutex_lock(&con->sess->lock);
+	list_del(&con->list);
+	mutex_unlock(&con->sess->lock);
+}
+
+static void close_con(struct ibtrs_con *con)
+{
+	struct ibtrs_session *sess = con->sess;
+
+	DEB("Closing connection %p\n", con);
+
+	if (con->user)
+		cancel_delayed_work(&sess->send_heartbeat_dwork);
+
+	cancel_work_sync(&con->cq_work);
+	destroy_workqueue(con->rdma_resp_wq);
+
+	ib_con_destroy(&con->ib_con);
+	if (!con->user && !con->device_being_removed)
+		rdma_destroy_id(con->cm_id);
+
+	destroy_workqueue(con->cq_wq);
+
+	if (con->user) {
+		/* notify possible user msg ACK thread waiting for a tx iu or
+		 * user msg buffer so they can check the connection state, give
+		 * up waiting and put back any tx_iu reserved
+		 */
+		wake_up(&sess->mu_buf_wait_q);
+		wake_up(&sess->mu_iu_wait_q);
+		destroy_workqueue(sess->msg_wq);
+	}
+
+	con->sess->active_cnt--;
+}
+
+static void destroy_con(struct ibtrs_con *con)
+{
+	remove_con_from_list(con);
+	kvfree(con);
+}
+
+static void destroy_sess(struct kref *kref)
+{
+	struct ibtrs_session *sess = container_of(kref, struct ibtrs_session,
+						  kref);
+	struct ibtrs_con *con, *con_next;
+
+	if (sess->cm_id)
+		rdma_destroy_id(sess->cm_id);
+
+	destroy_workqueue(sess->sm_wq);
+
+	list_for_each_entry_safe(con, con_next, &sess->con_list, list)
+		destroy_con(con);
+
+	mutex_lock(&sess_mutex);
+	list_del(&sess->list);
+	mutex_unlock(&sess_mutex);
+	wake_up(&sess_list_waitq);
+
+	INFO(sess, "Session is closed\n");
+	kvfree(sess);
+}
+
+int ibtrs_srv_sess_get(struct ibtrs_session *sess)
+{
+	return kref_get_unless_zero(&sess->kref);
+}
+
+void ibtrs_srv_sess_put(struct ibtrs_session *sess)
+{
+	kref_put(&sess->kref, destroy_sess);
+}
+
+static void sess_put_worker(struct work_struct *work)
+{
+	struct sess_put_work *w = container_of(work, struct sess_put_work,
+					       work);
+
+	ibtrs_srv_sess_put(w->sess);
+	kvfree(w);
+}
+
+static void schedule_sess_put(struct ibtrs_session *sess)
+{
+	struct sess_put_work *w;
+
+	while (true) {
+		w = ibtrs_malloc(sizeof(*w));
+		if (w)
+			break;
+		cond_resched();
+	}
+
+	/* Since we can be closing this session from a session workqueue,
+	 * we need to schedule another work on the global workqueue to put the
+	 * session, which can destroy the session workqueue and free the
+	 * session.
+	 */
+	w->sess = sess;
+	INIT_WORK(&w->work, sess_put_worker);
+	queue_work(destroy_wq, &w->work);
+}
+
+static void ibtrs_srv_sysfs_put_worker(struct work_struct *work)
+{
+	struct ibtrs_srv_sysfs_put_work *w;
+
+	w = container_of(work, struct ibtrs_srv_sysfs_put_work, work);
+	kobject_put(&w->sess->kobj_stats);
+	kobject_put(&w->sess->kobj);
+
+	kvfree(w);
+}
+
+static void ibtrs_srv_schedule_sysfs_put(struct ibtrs_session *sess)
+{
+	struct ibtrs_srv_sysfs_put_work *w = ibtrs_malloc(sizeof(*w));
+
+	if (WARN_ON(!w))
+		return;
+
+	w->sess	= sess;
+
+	INIT_WORK(&w->work, ibtrs_srv_sysfs_put_worker);
+	queue_work(destroy_wq, &w->work);
+}
+
+static void ibtrs_free_dev(struct kref *ref)
+{
+	struct ibtrs_device *ndev =
+		container_of(ref, struct ibtrs_device, ref);
+
+	mutex_lock(&device_list_mutex);
+	list_del(&ndev->entry);
+	mutex_unlock(&device_list_mutex);
+	ib_session_destroy(&ndev->ib_sess);
+	if (ndev->ib_sess_destroy_completion)
+		complete_all(ndev->ib_sess_destroy_completion);
+	kfree(ndev);
+}
+
+static struct ibtrs_device *
+ibtrs_find_get_device(struct rdma_cm_id *cm_id)
+{
+	struct ibtrs_device *ndev;
+	int err;
+
+	mutex_lock(&device_list_mutex);
+	list_for_each_entry(ndev, &device_list, entry) {
+		if (ndev->device->node_guid == cm_id->device->node_guid &&
+		    kref_get_unless_zero(&ndev->ref))
+			goto out_unlock;
+	}
+
+	ndev = kzalloc(sizeof(*ndev), GFP_KERNEL);
+	if (!ndev)
+		goto out_err;
+
+	ndev->device = cm_id->device;
+	kref_init(&ndev->ref);
+
+	err = ib_session_init(cm_id->device, &ndev->ib_sess);
+	if (err)
+		goto out_free;
+
+	list_add(&ndev->entry, &device_list);
+	DEB("added %s.\n", ndev->device->name);
+out_unlock:
+	mutex_unlock(&device_list_mutex);
+	return ndev;
+
+out_free:
+	kfree(ndev);
+out_err:
+	mutex_unlock(&device_list_mutex);
+	return NULL;
+}
+
+static void ibtrs_srv_destroy_ib_session(struct ibtrs_session *sess)
+{
+	release_cont_bufs(sess);
+	free_sess_bufs(sess);
+	kref_put(&sess->dev->ref, ibtrs_free_dev);
+}
+
+static void process_err_wc(struct ibtrs_con *con, struct ib_wc *wc)
+{
+	struct ibtrs_iu *iu;
+
+	if (wc->wr_id == (uintptr_t)&con->ib_con.beacon) {
+		DEB("beacon received for con %p\n", con);
+		csm_schedule_event(con, CSM_EV_BEACON_COMPLETED);
+		return;
+	}
+
+	/* only wc->wr_id is ensured to be correct in erroneous WCs,
+	 * we can't rely on wc->opcode, use iu->direction to determine if it's
+	 * an tx or rx IU
+	 */
+	iu = (struct ibtrs_iu *)wc->wr_id;
+	if (iu && iu->direction == DMA_TO_DEVICE &&
+	    iu != con->sess->rdma_info_iu)
+		put_tx_iu(con->sess, iu);
+
+	if (wc->status != IB_WC_WR_FLUSH_ERR ||
+	    (con->state != CSM_STATE_CLOSING &&
+	     con->state != CSM_STATE_FLUSHING)) {
+		/* suppress flush errors when the connection has
+		 * just called rdma_disconnect() and is in
+		 * DISCONNECTING state waiting for the second
+		 * CM_DISCONNECTED event
+		 */
+		ERR_RL(con->sess, "%s (wr_id: 0x%llx,"
+		       " type: %s, vendor_err: 0x%x, len: %u)\n",
+		       ib_wc_status_msg(wc->status), wc->wr_id,
+		       ib_wc_opcode_str(wc->opcode),
+		       wc->vendor_err, wc->byte_len);
+	}
+	csm_schedule_event(con, CSM_EV_CON_ERROR);
+}
+
+static int process_wcs(struct ibtrs_con *con, struct ib_wc *wcs, size_t len)
+{
+	int i, ret;
+	struct ibtrs_iu *iu;
+	struct ibtrs_session *sess = con->sess;
+
+	for (i = 0; i < len; i++) {
+		struct ib_wc wc = wcs[i];
+
+		if (unlikely(wc.status != IB_WC_SUCCESS)) {
+			process_err_wc(con, &wc);
+			continue;
+		}
+
+		/* DEB("cq complete with wr_id 0x%llx, len %u "
+		 *  "status %d (%s) type %d (%s)\n", wc.wr_id,
+		 *  wc.byte_len, wc.status, ib_wc_status_msg(wc.status),
+		 *  wc.opcode, ib_wc_opcode_str(wc.opcode));
+		 */
+
+		switch (wc.opcode) {
+		case IB_WC_SEND:
+			iu = (struct ibtrs_iu *)(uintptr_t)wc.wr_id;
+			if (iu == con->sess->rdma_info_iu)
+				break;
+			put_tx_iu(sess, iu);
+			if (con->user)
+				wake_up(&sess->mu_iu_wait_q);
+			break;
+
+		case IB_WC_RECV_RDMA_WITH_IMM: {
+			u32 imm, id, off;
+			struct ibtrs_msg_hdr *hdr;
+
+			ibtrs_set_last_heartbeat(&sess->heartbeat);
+
+			iu = (struct ibtrs_iu *)(uintptr_t)wc.wr_id;
+			imm = be32_to_cpu(wc.ex.imm_data);
+			if (imm == UINT_MAX) {
+				ret = ibtrs_post_recv(con, iu);
+				if (unlikely(ret != 0)) {
+					ERR(sess, "post receive buffer failed,"
+					    " errno: %d\n", ret);
+					return ret;
+				}
+				break;
+			} else if (imm == UINT_MAX - 1) {
+				ret = ibtrs_post_recv(con, iu);
+				if (unlikely(ret))
+					ERR_RL(sess, "Posting receive buffer of"
+					       " user Ack msg to HCA failed,"
+					       " errno: %d\n", ret);
+				process_msg_user_ack(con);
+				break;
+			}
+			id = imm >> sess->off_len;
+			off = imm & sess->off_mask;
+
+			if (id > sess->queue_depth || off > rcv_buf_size) {
+				ERR(sess, "Processing I/O failed, contiguous "
+				    "buf addr is out of reserved area\n");
+				ret = ibtrs_post_recv(con, iu);
+				if (unlikely(ret != 0))
+					ERR(sess, "Processing I/O failed, "
+					    "post receive buffer failed, "
+					    "errno: %d\n", ret);
+				return -EIO;
+			}
+
+			hdr = (struct ibtrs_msg_hdr *)
+				(sess->rcv_buf_pool->rcv_bufs[id].buf + off);
+
+			ibtrs_handle_write(con, iu, hdr, id, off);
+			break;
+		}
+
+		case IB_WC_RDMA_WRITE:
+			break;
+
+		case IB_WC_RECV: {
+			struct ibtrs_msg_hdr *hdr;
+
+			ibtrs_set_last_heartbeat(&sess->heartbeat);
+			iu = (struct ibtrs_iu *)(uintptr_t)wc.wr_id;
+			hdr = (struct ibtrs_msg_hdr *)iu->buf;
+			ibtrs_deb_msg_hdr("Received: ", hdr);
+			ibtrs_handle_recv(con, iu);
+			break;
+		}
+
+		default:
+			ERR(sess, "Processing work completion failed,"
+			    " WC has unknown opcode: %s\n",
+			    ib_wc_opcode_str(wc.opcode));
+			return -EINVAL;
+		}
+	}
+	return 0;
+}
+
+static void ibtrs_srv_update_wc_stats(struct ibtrs_con *con, int cnt)
+{
+	int old_max = atomic_read(&con->sess->stats.wc_comp.max_wc_cnt);
+	int act_max;
+
+	while (cnt > old_max) {
+		act_max = atomic_cmpxchg(&con->sess->stats.wc_comp.max_wc_cnt,
+					 old_max, cnt);
+		if (likely(act_max == old_max))
+			break;
+		old_max = act_max;
+	}
+
+	atomic64_inc(&con->sess->stats.wc_comp.calls);
+	atomic64_add(cnt, &con->sess->stats.wc_comp.total_wc_cnt);
+}
+
+static int get_process_wcs(struct ibtrs_con *con, int *total_cnt)
+{
+	int cnt, err;
+
+	do {
+		cnt = ib_poll_cq(con->ib_con.cq, ARRAY_SIZE(con->wcs),
+				 con->wcs);
+		if (unlikely(cnt < 0)) {
+			ERR(con->sess, "Polling completion queue failed, "
+			    "errno: %d\n", cnt);
+			return cnt;
+		}
+
+		if (likely(cnt > 0)) {
+			err = process_wcs(con, con->wcs, cnt);
+			*total_cnt += cnt;
+			if (unlikely(err))
+				return err;
+		}
+	} while (cnt > 0);
+
+	return 0;
+}
+
+static void wrapper_handle_cq_comp(struct work_struct *work)
+{
+	int err;
+	struct ibtrs_con *con = container_of(work, struct ibtrs_con, cq_work);
+	struct ibtrs_session *sess = con->sess;
+	int total_cnt = 0;
+
+	if (unlikely(con->state == CSM_STATE_CLOSED)) {
+		ERR(sess, "Retrieving work completions from completion"
+		    " queue failed, connection is disconnected\n");
+		goto error;
+	}
+
+	err = get_process_wcs(con, &total_cnt);
+	if (unlikely(err))
+		goto error;
+
+	while ((err = ib_req_notify_cq(con->ib_con.cq, IB_CQ_NEXT_COMP |
+				       IB_CQ_REPORT_MISSED_EVENTS)) > 0) {
+		DEB("Missed %d CQ notifications, processing missed WCs...\n",
+		    err);
+		err = get_process_wcs(con, &total_cnt);
+		if (unlikely(err))
+			goto error;
+	}
+
+	if (unlikely(err))
+		goto error;
+
+	ibtrs_srv_update_wc_stats(con, total_cnt);
+	return;
+
+error:
+	csm_schedule_event(con, CSM_EV_CON_ERROR);
+}
+
+static void cq_event_handler(struct ib_cq *cq, void *ctx)
+{
+	struct ibtrs_con *con = ctx;
+
+	/* queue_work() can return False here.
+	 * The work can be already queued, When CQ notifications were already
+	 * activiated and are activated again after the beacon was posted.
+	 */
+	if (con->state != CSM_STATE_CLOSED)
+		queue_work(con->cq_wq, &con->cq_work);
+}
+
+static int accept(struct ibtrs_con *con)
+{
+	struct rdma_conn_param conn_param;
+	int ret;
+	struct ibtrs_session *sess = con->sess;
+
+	memset(&conn_param, 0, sizeof(conn_param));
+	conn_param.retry_count = retry_count;
+
+	if (con->user)
+		conn_param.rnr_retry_count = 7;
+
+	ret = rdma_accept(con->cm_id, &conn_param);
+	if (ret) {
+		ERR(sess, "Accepting RDMA connection request failed,"
+		    " errno: %d\n", ret);
+		return ret;
+	}
+
+	return 0;
+}
+
+static struct ibtrs_session *
+__create_sess(struct rdma_cm_id *cm_id, const struct ibtrs_msg_sess_open *req)
+{
+	struct ibtrs_session *sess;
+	int err;
+
+	sess = ibtrs_zalloc(sizeof(*sess));
+	if (!sess) {
+		err = -ENOMEM;
+		goto out;
+	}
+
+	err = ibtrs_addr_to_str(&cm_id->route.addr.dst_addr, sess->addr,
+				sizeof(sess->addr));
+	if (err < 0)
+		goto err1;
+
+	sess->est_cnt = 0;
+	sess->state_in_sysfs = false;
+	sess->cur_cq_vector = -1;
+	INIT_LIST_HEAD(&sess->con_list);
+	mutex_init(&sess->lock);
+
+	INIT_LIST_HEAD(&sess->tx_bufs);
+	spin_lock_init(&sess->tx_bufs_lock);
+
+	err = ib_get_max_wr_queue_size(cm_id->device);
+	if (err < 0)
+		goto err1;
+
+	sess->wq_size = err - 1;
+
+	sess->queue_depth		= sess_queue_depth;
+	sess->con_cnt			= req->con_cnt;
+	sess->ver			= min_t(u8, req->ver, IBTRS_VERSION);
+	sess->primary_port_num		= cm_id->port_num;
+
+	init_waitqueue_head(&sess->mu_iu_wait_q);
+	init_waitqueue_head(&sess->mu_buf_wait_q);
+	ibtrs_set_heartbeat_timeout(&sess->heartbeat,
+				    default_heartbeat_timeout_ms <
+				    MIN_HEARTBEAT_TIMEOUT_MS ?
+				    MIN_HEARTBEAT_TIMEOUT_MS :
+				    default_heartbeat_timeout_ms);
+	atomic64_set(&sess->heartbeat.send_ts_ms, 0);
+	atomic64_set(&sess->heartbeat.recv_ts_ms, 0);
+	sess->heartbeat.addr = sess->addr;
+	sess->heartbeat.hostname = sess->hostname;
+
+	atomic_set(&sess->peer_usr_msg_bufs, USR_MSG_CNT);
+	sess->dev = ibtrs_find_get_device(cm_id);
+	if (!sess->dev) {
+		err = -ENOMEM;
+		WRN(sess, "Failed to alloc ibtrs_device\n");
+		goto err1;
+	}
+	err = setup_cont_bufs(sess);
+	if (err)
+		goto err2;
+
+	memcpy(sess->uuid, req->uuid, IBTRS_UUID_SIZE);
+	err = ssm_init(sess);
+	if (err) {
+		WRN(sess, "Failed to initialize the session state machine\n");
+		goto err3;
+	}
+
+	kref_init(&sess->kref);
+	init_waitqueue_head(&sess->bufs_wait);
+
+	list_add(&sess->list, &sess_list);
+	INFO(sess, "IBTRS Session created (queue depth: %d)\n",
+	     sess->queue_depth);
+
+	return sess;
+
+err3:
+	release_cont_bufs(sess);
+err2:
+	kref_put(&sess->dev->ref, ibtrs_free_dev);
+err1:
+	kvfree(sess);
+out:
+	return ERR_PTR(err);
+}
+
+inline const char *ibtrs_srv_get_sess_hostname(struct ibtrs_session *sess)
+{
+	return sess->hostname;
+}
+EXPORT_SYMBOL(ibtrs_srv_get_sess_hostname);
+
+inline const char *ibtrs_srv_get_sess_addr(struct ibtrs_session *sess)
+{
+	return sess->addr;
+}
+EXPORT_SYMBOL(ibtrs_srv_get_sess_addr);
+
+inline int ibtrs_srv_get_sess_qdepth(struct ibtrs_session *sess)
+{
+	return sess->queue_depth;
+}
+EXPORT_SYMBOL(ibtrs_srv_get_sess_qdepth);
+
+static struct ibtrs_session *__find_active_sess(const char *uuid)
+{
+	struct ibtrs_session *n;
+
+	list_for_each_entry(n, &sess_list, list) {
+		if (!memcmp(n->uuid, uuid, sizeof(n->uuid)) &&
+		    n->state != SSM_STATE_CLOSING &&
+		    n->state != SSM_STATE_CLOSED)
+			return n;
+	}
+
+	return NULL;
+}
+
+static int rdma_con_reject(struct rdma_cm_id *cm_id, s16 errno)
+{
+	struct ibtrs_msg_error msg;
+	int ret;
+
+	memset(&msg, 0, sizeof(msg));
+	msg.hdr.type	= IBTRS_MSG_ERROR;
+	msg.hdr.tsize	= sizeof(msg);
+	msg.errno	= errno;
+
+	ret = rdma_reject(cm_id, &msg, sizeof(msg));
+	if (ret)
+		ERR_NP("Rejecting RDMA connection request failed, errno: %d\n",
+		       ret);
+
+	return ret;
+}
+
+static int find_next_bit_ring(int cur)
+{
+	int v = cpumask_next(cur, &cq_affinity_mask);
+
+	if (v >= nr_cpu_ids)
+		v = cpumask_first(&cq_affinity_mask);
+	return v;
+}
+
+static int ibtrs_srv_get_next_cq_vector(struct ibtrs_session *sess)
+{
+	sess->cur_cq_vector = find_next_bit_ring(sess->cur_cq_vector);
+
+	return sess->cur_cq_vector;
+}
+
+static void ssm_create_con_worker(struct work_struct *work)
+{
+	struct ssm_create_con_work *ssm_w =
+			container_of(work, struct ssm_create_con_work, work);
+	struct ibtrs_session *sess = ssm_w->sess;
+	struct rdma_cm_id *cm_id = ssm_w->cm_id;
+	bool user = ssm_w->user;
+	struct ibtrs_con *con;
+	int ret;
+	u16 cq_size, wr_queue_size;
+
+	kvfree(ssm_w);
+
+	if (sess->state == SSM_STATE_CLOSING ||
+	    sess->state == SSM_STATE_CLOSED) {
+		WRN(sess, "Creating connection failed, "
+		    "session is being closed\n");
+		ret = -ECOMM;
+		goto err_reject;
+	}
+
+	con = ibtrs_zalloc(sizeof(*con));
+	if (!con) {
+		ERR(sess, "Creating connection failed, "
+		    "can't allocate memory for connection\n");
+		ret = -ENOMEM;
+		goto err_reject;
+	}
+
+	con->cm_id			= cm_id;
+	con->sess			= sess;
+	con->user			= user;
+	con->device_being_removed	= false;
+
+	atomic_set(&con->wr_cnt, 0);
+	if (con->user) {
+		cq_size		= USR_CON_BUF_SIZE + 1;
+		wr_queue_size	= USR_CON_BUF_SIZE + 1;
+	} else {
+		cq_size		= con->sess->queue_depth;
+		wr_queue_size	= sess->wq_size;
+	}
+
+	con->cq_vector = ibtrs_srv_get_next_cq_vector(sess);
+
+	con->ib_con.addr = sess->addr;
+	con->ib_con.hostname = sess->hostname;
+	ret = ib_con_init(&con->ib_con, con->cm_id,
+			  1, cq_event_handler, con, con->cq_vector, cq_size,
+			  wr_queue_size, &con->sess->dev->ib_sess);
+	if (ret)
+		goto err_init;
+
+	INIT_WORK(&con->cq_work, wrapper_handle_cq_comp);
+	if (con->user)
+		con->cq_wq = alloc_ordered_workqueue("%s",
+						     WQ_HIGHPRI,
+						     "ibtrs_srv_wq");
+	else
+		con->cq_wq = alloc_workqueue("%s",
+					     WQ_CPU_INTENSIVE | WQ_HIGHPRI, 0,
+					     "ibtrs_srv_wq");
+	if (!con->cq_wq) {
+		ERR(sess, "Creating connection failed, can't allocate "
+		    "work queue for completion queue, errno: %d\n", ret);
+		goto err_wq1;
+	}
+
+	con->rdma_resp_wq = alloc_workqueue("%s", 0, WQ_HIGHPRI,
+					    "ibtrs_rdma_resp");
+
+	if (!con->rdma_resp_wq) {
+		ERR(sess, "Creating connection failed, can't allocate"
+		    " work queue for send response, errno: %d\n", ret);
+		goto err_wq2;
+	}
+
+	ret = init_transfer_bufs(con);
+	if (ret) {
+		ERR(sess, "Creating connection failed, can't init"
+		    " transfer buffers, errno: %d\n", ret);
+		goto err_buf;
+	}
+
+	csm_init(con);
+	add_con_to_list(sess, con);
+
+	cm_id->context = con;
+	if (con->user) {
+		con->sess->msg_wq = alloc_ordered_workqueue("sess_msg_wq", 0);
+		if (!con->sess->msg_wq) {
+			ERR(con->sess, "Failed to create user message"
+			    " workqueue\n");
+			ret = -ENOMEM;
+			goto err_accept;
+		}
+	}
+
+	DEB("accept request\n");
+	ret = accept(con);
+	if (ret)
+		goto err_msg;
+
+	if (con->user)
+		con->sess->cm_id = cm_id;
+
+	con->sess->active_cnt++;
+
+	return;
+err_msg:
+	if (con->user)
+		destroy_workqueue(con->sess->msg_wq);
+err_accept:
+	cm_id->context = NULL;
+	remove_con_from_list(con);
+err_buf:
+	destroy_workqueue(con->rdma_resp_wq);
+err_wq2:
+	destroy_workqueue(con->cq_wq);
+err_wq1:
+	ib_con_destroy(&con->ib_con);
+err_init:
+	kvfree(con);
+err_reject:
+	rdma_destroy_id(cm_id);
+
+	ssm_schedule_event(sess, SSM_EV_CON_EST_ERR);
+}
+
+static int ssm_schedule_create_con(struct ibtrs_session *sess,
+				   struct rdma_cm_id *cm_id,
+				   bool user)
+{
+	struct ssm_create_con_work *w;
+
+	w = ibtrs_malloc(sizeof(*w));
+	if (!w)
+		return -ENOMEM;
+
+	w->sess		= sess;
+	w->cm_id	= cm_id;
+	w->user		= user;
+	INIT_WORK(&w->work, ssm_create_con_worker);
+	queue_work(sess->sm_wq, &w->work);
+
+	return 0;
+}
+
+static int rdma_con_establish(struct rdma_cm_id *cm_id, const void *data,
+			      size_t size)
+{
+	struct ibtrs_session *sess;
+	int ret;
+	const char *uuid = NULL;
+	const struct ibtrs_msg_hdr *hdr = data;
+	bool user = false;
+
+	if (unlikely(!srv_ops_are_valid(srv_ops))) {
+		ERR_NP("Establishing connection failed, "
+		       "no user module registered!\n");
+		ret = -ECOMM;
+		goto err_reject;
+	}
+
+	if (unlikely((size < sizeof(struct ibtrs_msg_con_open)) ||
+		     (size < sizeof(struct ibtrs_msg_sess_open)) ||
+		     ibtrs_validate_message(0, hdr))) {
+		ERR_NP("Establishing connection failed, "
+		       "connection request payload size unexpected "
+		       "%zu != %lu or %lu\n", size,
+		       sizeof(struct ibtrs_msg_con_open),
+		       sizeof(struct ibtrs_msg_sess_open));
+		ret = -EINVAL;
+		goto err_reject;
+	}
+
+	if (hdr->type == IBTRS_MSG_SESS_OPEN)
+		uuid = ((struct ibtrs_msg_sess_open *)data)->uuid;
+	else if (hdr->type == IBTRS_MSG_CON_OPEN)
+		uuid = ((struct ibtrs_msg_con_open *)data)->uuid;
+
+	mutex_lock(&sess_mutex);
+	sess = __find_active_sess(uuid);
+	if (sess) {
+		if (unlikely(hdr->type == IBTRS_MSG_SESS_OPEN)) {
+			INFO(sess, "Connection request rejected, "
+			     "session already exists\n");
+			mutex_unlock(&sess_mutex);
+			ret = -EEXIST;
+			goto err_reject;
+		}
+		if (!ibtrs_srv_sess_get(sess)) {
+			INFO(sess, "Connection request rejected,"
+			     " session is being closed\n");
+			mutex_unlock(&sess_mutex);
+			ret = -EINVAL;
+			goto err_reject;
+		}
+	} else {
+		if (unlikely(hdr->type == IBTRS_MSG_CON_OPEN)) {
+			mutex_unlock(&sess_mutex);
+			INFO_NP("Connection request rejected,"
+				" received con_open msg but no active session"
+				" exists.\n");
+			ret = -EINVAL;
+			goto err_reject;
+		}
+
+		sess = __create_sess(cm_id, (struct ibtrs_msg_sess_open *)data);
+		if (IS_ERR(sess)) {
+			mutex_unlock(&sess_mutex);
+			ret = PTR_ERR(sess);
+			ERR_NP("Establishing connection failed, "
+			       "creating local session resource failed, errno:"
+			       " %d\n", ret);
+			goto err_reject;
+		}
+		ibtrs_srv_sess_get(sess);
+		user = true;
+	}
+
+	mutex_unlock(&sess_mutex);
+
+	ret = ssm_schedule_create_con(sess, cm_id, user);
+	if (ret) {
+		ERR(sess, "Unable to schedule creation of connection,"
+		    " session will be closed.\n");
+		goto err_close;
+	}
+
+	ibtrs_srv_sess_put(sess);
+	return 0;
+
+err_close:
+	ssm_schedule_event(sess, SSM_EV_CON_EST_ERR);
+	ibtrs_srv_sess_put(sess);
+err_reject:
+	rdma_con_reject(cm_id, ret);
+	return ret;
+}
+
+static int ibtrs_srv_rdma_cm_ev_handler(struct rdma_cm_id *cm_id,
+					struct rdma_cm_event *event)
+{
+	struct ibtrs_con *con = cm_id->context;
+	int ret = 0;
+
+	DEB("cma_event type %d cma_id %p(%s) on con: %p\n", event->event,
+	    cm_id, rdma_event_msg(event->event), con);
+	if (!con && event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
+		INFO_NP("Ignore cma_event type %d cma_id %p(%s)\n",
+			event->event, cm_id, rdma_event_msg(event->event));
+		return 0;
+	}
+
+	switch (event->event) {
+	case RDMA_CM_EVENT_CONNECT_REQUEST:
+		ret = rdma_con_establish(cm_id, event->param.conn.private_data,
+					 event->param.conn.private_data_len);
+		break;
+	case RDMA_CM_EVENT_ESTABLISHED:
+		csm_schedule_event(con, CSM_EV_CON_ESTABLISHED);
+		break;
+	case RDMA_CM_EVENT_DISCONNECTED:
+	case RDMA_CM_EVENT_TIMEWAIT_EXIT:
+		csm_schedule_event(con, CSM_EV_CON_DISCONNECTED);
+		break;
+
+	case RDMA_CM_EVENT_DEVICE_REMOVAL: {
+		struct completion dc;
+
+		ERR_RL(con->sess,
+		       "IB Device was removed, disconnecting session.\n");
+
+		con->device_being_removed = true;
+		init_completion(&dc);
+		con->sess->dev->ib_sess_destroy_completion = &dc;
+
+		csm_schedule_event(con, CSM_EV_DEVICE_REMOVAL);
+		wait_for_completion(&dc);
+
+		/* If it's user connection, the cm_id will be destroyed by
+		 * destroy_sess(), so return 0 to signal that we will destroy
+		 * it later. Otherwise, return 1 so CMA will destroy it.
+		 */
+		if (con->user)
+			return 0;
+		else
+			return 1;
+	}
+	case RDMA_CM_EVENT_CONNECT_ERROR:
+	case RDMA_CM_EVENT_ROUTE_ERROR:
+	case RDMA_CM_EVENT_UNREACHABLE:
+	case RDMA_CM_EVENT_ADDR_CHANGE:
+		ERR_RL(con->sess, "CM error (CM event: %s, errno: %d)\n",
+		       rdma_event_msg(event->event), event->status);
+
+		csm_schedule_event(con, CSM_EV_CON_ERROR);
+		break;
+	case RDMA_CM_EVENT_REJECTED:
+		/* reject status is defined in enum, not errno */
+		ERR_RL(con->sess,
+		       "Connection rejected (CM event: %s, err: %s)\n",
+		       rdma_event_msg(event->event),
+		       rdma_reject_msg(cm_id, event->status));
+		csm_schedule_event(con, CSM_EV_CON_ERROR);
+		break;
+	default:
+		WRN(con->sess, "Ignoring unexpected CM event %s, errno %d\n",
+		    rdma_event_msg(event->event), event->status);
+		break;
+	}
+	return ret;
+}
+
+static int ibtrs_srv_cm_init(struct rdma_cm_id **cm_id, struct sockaddr *addr,
+			     enum rdma_port_space ps)
+{
+	int ret;
+
+	*cm_id = rdma_create_id(&init_net, ibtrs_srv_rdma_cm_ev_handler, NULL,
+				ps, IB_QPT_RC);
+	if (IS_ERR(*cm_id)) {
+		ret = PTR_ERR(*cm_id);
+		ERR_NP("Creating id for RDMA connection failed, errno: %d\n",
+		       ret);
+		goto err_out;
+	}
+	DEB("created cm_id %p\n", *cm_id);
+	ret = rdma_bind_addr(*cm_id, addr);
+	if (ret) {
+		ERR_NP("Binding RDMA address failed, errno: %d\n", ret);
+		goto err_cm;
+	}
+	DEB("rdma_bind_addr successful\n");
+	/* we currently accept 64 rdma_connects */
+	ret = rdma_listen(*cm_id, 64);
+	if (ret) {
+		ERR_NP("Listening on RDMA connection failed, errno: %d\n", ret);
+		goto err_cm;
+	}
+
+	switch (addr->sa_family) {
+	case AF_INET:
+		DEB("listening on port %u\n",
+		    ntohs(((struct sockaddr_in *)addr)->sin_port));
+		break;
+	case AF_INET6:
+		DEB("listening on port %u\n",
+		    ntohs(((struct sockaddr_in6 *)addr)->sin6_port));
+		break;
+	case AF_IB:
+		DEB("listening on service id 0x%016llx\n",
+		    be64_to_cpu(rdma_get_service_id(*cm_id, addr)));
+		break;
+	default:
+		DEB("listening on address family %u\n", addr->sa_family);
+	}
+
+	return 0;
+
+err_cm:
+	rdma_destroy_id(*cm_id);
+err_out:
+	return ret;
+}
+
+static int ibtrs_srv_rdma_init(void)
+{
+	int ret = 0;
+	struct sockaddr_in6 sin = {
+		.sin6_family	= AF_INET6,
+		.sin6_addr	= IN6ADDR_ANY_INIT,
+		.sin6_port	= htons(IBTRS_SERVER_PORT),
+	};
+	struct sockaddr_ib sib = {
+		.sib_family			= AF_IB,
+		.sib_addr.sib_subnet_prefix	= 0ULL,
+		.sib_addr.sib_interface_id	= 0ULL,
+		.sib_sid	= cpu_to_be64(RDMA_IB_IP_PS_IB |
+					      IBTRS_SERVER_PORT),
+		.sib_sid_mask	= cpu_to_be64(0xffffffffffffffffULL),
+		.sib_pkey	= cpu_to_be16(0xffff),
+	};
+
+	/*
+	 * We accept both IPoIB and IB connections, so we need to keep
+	 * two cm id's, one for each socket type and port space.
+	 * If the cm initialization of one of the id's fails, we abort
+	 * everything.
+	 */
+
+	ret = ibtrs_srv_cm_init(&cm_id_ip, (struct sockaddr *)&sin,
+				RDMA_PS_TCP);
+	if (ret)
+		return ret;
+
+	ret = ibtrs_srv_cm_init(&cm_id_ib, (struct sockaddr *)&sib, RDMA_PS_IB);
+	if (ret)
+		goto err_cm_ib;
+
+	return ret;
+
+err_cm_ib:
+	rdma_destroy_id(cm_id_ip);
+	return ret;
+}
+
+static void ibtrs_srv_destroy_buf_pool(void)
+{
+	struct ibtrs_rcv_buf_pool *pool, *pool_next;
+
+	mutex_lock(&buf_pool_mutex);
+	list_for_each_entry_safe(pool, pool_next, &free_buf_pool_list, list) {
+		list_del(&pool->list);
+		nr_free_buf_pool--;
+		free_recv_buf_pool(pool);
+	}
+	mutex_unlock(&buf_pool_mutex);
+}
+
+static void ibtrs_srv_alloc_ini_buf_pool(void)
+{
+	struct ibtrs_rcv_buf_pool *pool;
+	int i;
+
+	if (init_pool_size == 0)
+		return;
+
+	INFO_NP("Trying to allocate RDMA buffers pool for %d client(s)\n",
+		init_pool_size);
+	for (i = 0; i < init_pool_size; i++) {
+		pool = alloc_rcv_buf_pool();
+		if (!pool) {
+			ERR_NP("Failed to allocate initial RDMA buffer pool"
+			       " #%d\n", i + 1);
+			break;
+		}
+		mutex_lock(&buf_pool_mutex);
+		list_add(&pool->list, &free_buf_pool_list);
+		nr_free_buf_pool++;
+		nr_total_buf_pool++;
+		mutex_unlock(&buf_pool_mutex);
+		DEB("Allocated buffer pool #%d\n", i);
+	}
+
+	INFO_NP("Allocated RDMA buffers pool for %d client(s)\n", i);
+}
+
+int ibtrs_srv_register(const struct ibtrs_srv_ops *ops)
+{
+	int err;
+
+	if (srv_ops) {
+		ERR_NP("Registration failed, module %s already registered,"
+		       " only 1 user module supported\n",
+		srv_ops->owner->name);
+		return -ENOTSUPP;
+	}
+
+	if (unlikely(!srv_ops_are_valid(ops))) {
+		ERR_NP("Registration failed, user module supploed invalid ops"
+		       " parameter\n");
+		return -EFAULT;
+	}
+
+	ibtrs_srv_alloc_ini_buf_pool();
+
+	err = ibtrs_srv_rdma_init();
+	if (err) {
+		ERR_NP("Can't init RDMA resource, errno: %d\n", err);
+		return err;
+	}
+	srv_ops = ops;
+
+	return 0;
+}
+EXPORT_SYMBOL(ibtrs_srv_register);
+
+inline void ibtrs_srv_queue_close(struct ibtrs_session *sess)
+{
+	ssm_schedule_event(sess, SSM_EV_SYSFS_DISCONNECT);
+}
+
+static void close_sessions(void)
+{
+	struct ibtrs_session *sess;
+
+	mutex_lock(&sess_mutex);
+	list_for_each_entry(sess, &sess_list, list) {
+		if (!ibtrs_srv_sess_get(sess))
+			continue;
+		ssm_schedule_event(sess, SSM_EV_SESS_CLOSE);
+		ibtrs_srv_sess_put(sess);
+	}
+	mutex_unlock(&sess_mutex);
+
+	wait_event(sess_list_waitq, list_empty(&sess_list));
+}
+
+void ibtrs_srv_unregister(const struct ibtrs_srv_ops *ops)
+{
+	if (!srv_ops) {
+		WRN_NP("Nothing to unregister - srv_ops = NULL\n");
+		return;
+	}
+
+	/* TODO: in order to support registration of multiple modules,
+	 * introduce a list with srv_ops and search for the correct
+	 * one.
+	 */
+
+	if (srv_ops != ops) {
+		ERR_NP("Ops is not the ops we have registered\n");
+		return;
+	}
+
+	rdma_destroy_id(cm_id_ip);
+	cm_id_ip = NULL;
+	rdma_destroy_id(cm_id_ib);
+	cm_id_ib = NULL;
+	close_sessions();
+	flush_workqueue(destroy_wq);
+	ibtrs_srv_destroy_buf_pool();
+	srv_ops = NULL;
+}
+EXPORT_SYMBOL(ibtrs_srv_unregister);
+
+static int check_module_params(void)
+{
+	if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) {
+		ERR_NP("Invalid sess_queue_depth parameter value\n");
+		return -EINVAL;
+	}
+
+	/* check if IB immediate data size is enough to hold the mem_id and the
+	 * offset inside the memory chunk
+	 */
+	if (ilog2(sess_queue_depth - 1) + ilog2(rcv_buf_size - 1) >
+	    IB_IMM_SIZE_BITS) {
+		ERR_NP("RDMA immediate size (%db) not enough to encode "
+		       "%d buffers of size %dB. Reduce 'sess_queue_depth' "
+		       "or 'max_io_size' parameters.\n", IB_IMM_SIZE_BITS,
+		       sess_queue_depth, rcv_buf_size);
+		return -EINVAL;
+	}
+
+	if (init_pool_size < 0) {
+		ERR_NP("Invalid 'init_pool_size' parameter value."
+		       " Value must be positive.\n");
+		return -EINVAL;
+	}
+
+	if (pool_size_hi_wm < init_pool_size) {
+		ERR_NP("Invalid 'pool_size_hi_wm' parameter value. Value must"
+		       " be iqual or higher than 'init_pool_size'.\n");
+		return -EINVAL;
+	}
+
+	return 0;
+}
+
+static void csm_init(struct ibtrs_con *con)
+{
+	DEB("initializing csm to %s\n", csm_state_str(CSM_STATE_REQUESTED));
+	csm_set_state(con, CSM_STATE_REQUESTED);
+}
+
+static int send_msg_sess_open_resp(struct ibtrs_con *con)
+{
+	struct ibtrs_msg_sess_open_resp *msg;
+	int err;
+	struct ibtrs_session *sess = con->sess;
+
+	msg = sess->rdma_info_iu->buf;
+
+	fill_ibtrs_msg_sess_open_resp(msg, con);
+
+	err = ibtrs_post_send(con->ib_con.qp, con->sess->dev->ib_sess.mr,
+			      sess->rdma_info_iu, msg->hdr.tsize);
+	if (unlikely(err))
+		ERR(sess, "Sending sess open resp failed, "
+			  "posting msg to QP failed, errno: %d\n", err);
+
+	return err;
+}
+
+static void queue_heartbeat_dwork(struct ibtrs_session *sess)
+{
+	ibtrs_set_last_heartbeat(&sess->heartbeat);
+	WARN_ON(!queue_delayed_work(sess->sm_wq,
+				    &sess->send_heartbeat_dwork,
+				    HEARTBEAT_INTV_JIFFIES));
+	WARN_ON(!queue_delayed_work(sess->sm_wq,
+				    &sess->check_heartbeat_dwork,
+				    HEARTBEAT_INTV_JIFFIES));
+}
+
+static void csm_requested(struct ibtrs_con *con, enum csm_ev ev)
+{
+	struct ibtrs_session *sess = con->sess;
+	enum csm_state state = con->state;
+
+	DEB("con %p, event %s\n", con, csm_ev_str(ev));
+	switch (ev) {
+	case CSM_EV_CON_ESTABLISHED: {
+		csm_set_state(con, CSM_STATE_CONNECTED);
+		if (con->user) {
+			/* send back rdma info */
+			if (send_msg_sess_open_resp(con))
+				goto destroy;
+			queue_heartbeat_dwork(con->sess);
+		}
+		ssm_schedule_event(sess, SSM_EV_CON_CONNECTED);
+		break;
+	}
+	case CSM_EV_DEVICE_REMOVAL:
+	case CSM_EV_CON_ERROR:
+	case CSM_EV_SESS_CLOSING:
+	case CSM_EV_CON_DISCONNECTED:
+destroy:
+		csm_set_state(con, CSM_STATE_CLOSED);
+		close_con(con);
+		ssm_schedule_event(sess, SSM_EV_CON_EST_ERR);
+		break;
+	default:
+		ERR(sess, "Connection received unexpected event %s "
+		    "in %s state.\n", csm_ev_str(ev), csm_state_str(state));
+	}
+}
+
+static void csm_connected(struct ibtrs_con *con, enum csm_ev ev)
+{
+	struct ibtrs_session *sess = con->sess;
+	enum csm_state state = con->state;
+
+	DEB("con %p, event %s\n", con, csm_ev_str(ev));
+	switch (ev) {
+	case CSM_EV_CON_ERROR:
+	case CSM_EV_SESS_CLOSING: {
+		int err;
+
+		csm_set_state(con, CSM_STATE_CLOSING);
+		err = rdma_disconnect(con->cm_id);
+		if (err)
+			ERR(sess, "Connection received event %s "
+			    "in %s state, new state is %s, but failed to "
+			    "disconnect connection.\n", csm_ev_str(ev),
+			    csm_state_str(state), csm_state_str(con->state));
+		break;
+		}
+	case CSM_EV_DEVICE_REMOVAL:
+		/* Send a SSM_EV_SESS_CLOSE event to the session to speed up the
+		 * closing of the other connections. If we just wait for the
+		 * client to close all connections this can take a while.
+		 */
+		ssm_schedule_event(sess, SSM_EV_SESS_CLOSE);
+		/* fall-through */
+	case CSM_EV_CON_DISCONNECTED: {
+		int err, cnt = 0;
+
+		csm_set_state(con, CSM_STATE_FLUSHING);
+		err = rdma_disconnect(con->cm_id);
+		if (err)
+			ERR(sess, "Connection received event %s "
+			    "in %s state, new state is %s, but failed to "
+			    "disconnect connection.\n", csm_ev_str(ev),
+			    csm_state_str(state), csm_state_str(con->state));
+
+		wait_event(sess->bufs_wait,
+			   !atomic_read(&sess->stats.rdma_stats.inflight));
+		DEB("posting beacon on con %p\n", con);
+		err = post_beacon(&con->ib_con);
+		if (err) {
+			ERR(sess, "Connection received event %s "
+			    "in %s state, new state is %s but failed to post"
+			    " beacon, closing connection.\n", csm_ev_str(ev),
+			    csm_state_str(state), csm_state_str(con->state));
+			goto destroy;
+		}
+
+		err = ibtrs_request_cq_notifications(&con->ib_con);
+		if (unlikely(err < 0)) {
+			WRN(con->sess, "Requesting CQ Notification for"
+			    " ib_con failed. Connection will be destroyed\n");
+			goto destroy;
+		} else if (err > 0) {
+			err = get_process_wcs(con, &cnt);
+			if (unlikely(err))
+				goto destroy;
+			break;
+		}
+		break;
+
+destroy:
+		csm_set_state(con, CSM_STATE_CLOSED);
+		close_con(con);
+		ssm_schedule_event(sess, SSM_EV_CON_DISCONNECTED);
+
+		break;
+		}
+	default:
+		ERR(sess, "Connection received unexpected event %s "
+		    "in %s state\n", csm_ev_str(ev), csm_state_str(state));
+	}
+}
+
+static void csm_closing(struct ibtrs_con *con, enum csm_ev ev)
+{
+	struct ibtrs_session *sess = con->sess;
+	enum csm_state state = con->state;
+
+	DEB("con %p, event %s\n", con, csm_ev_str(ev));
+	switch (ev) {
+	case CSM_EV_DEVICE_REMOVAL:
+	case CSM_EV_CON_DISCONNECTED: {
+		int err, cnt = 0;
+
+		csm_set_state(con, CSM_STATE_FLUSHING);
+
+		wait_event(sess->bufs_wait,
+			   !atomic_read(&sess->stats.rdma_stats.inflight));
+
+		DEB("posting beacon on con %p\n", con);
+		if (post_beacon(&con->ib_con)) {
+			ERR(sess, "Connection received event %s "
+			    "in %s state, new state is %s but failed to post"
+			    " beacon, closing connection.\n", csm_ev_str(ev),
+			    csm_state_str(state), csm_state_str(con->state));
+			goto destroy;
+		}
+
+		err = ibtrs_request_cq_notifications(&con->ib_con);
+		if (unlikely(err < 0)) {
+			WRN(con->sess, "Requesting CQ Notification for"
+			    " ib_con failed. Connection will be destroyed\n");
+			goto destroy;
+		} else if (err > 0) {
+			err = get_process_wcs(con, &cnt);
+			if (unlikely(err))
+				goto destroy;
+			break;
+		}
+		break;
+
+destroy:
+		csm_set_state(con, CSM_STATE_CLOSED);
+		close_con(con);
+		ssm_schedule_event(sess, SSM_EV_CON_DISCONNECTED);
+		break;
+	}
+	case CSM_EV_CON_ERROR:
+		/* ignore connection errors, just wait for CM_DISCONNECTED */
+	case CSM_EV_SESS_CLOSING:
+		break;
+	default:
+		ERR(sess, "Connection received unexpected event %s "
+		    "in %s state\n", csm_ev_str(ev), csm_state_str(state));
+	}
+}
+
+static void csm_flushing(struct ibtrs_con *con, enum csm_ev ev)
+{
+	struct ibtrs_session *sess = con->sess;
+	enum csm_state state = con->state;
+
+	DEB("con %p, event %s\n", con, csm_ev_str(ev));
+
+	switch (ev) {
+	case CSM_EV_BEACON_COMPLETED:
+		csm_set_state(con, CSM_STATE_CLOSED);
+		close_con(con);
+		ssm_schedule_event(sess, SSM_EV_CON_DISCONNECTED);
+		break;
+	case CSM_EV_SESS_CLOSING:
+	case CSM_EV_DEVICE_REMOVAL:
+		/* Ignore CSM_EV_DEVICE_REMOVAL and CSM_EV_SESS_CLOSING in
+		 * this state. The beacon was already posted, so the
+		 * CSM_EV_BEACON_COMPLETED event should arrive anytime soon.
+		 */
+		break;
+	case CSM_EV_CON_ERROR:
+		break;
+	case CSM_EV_CON_DISCONNECTED:
+		/* Ignore CSM_EV_CON_DISCONNECTED. At this point we could have
+		 * already received a CSM_EV_CON_DISCONNECTED for the same
+		 * connection, but an additional RDMA_CM_EVENT_DISCONNECTED or
+		 * RDMA_CM_EVENT_TIMEWAIT_EXIT could be generated.
+		 */
+		break;
+	default:
+		ERR(sess, "Connection received unexpected event %s "
+		    "in %s state\n", csm_ev_str(ev), csm_state_str(state));
+	}
+}
+
+static void csm_closed(struct ibtrs_con *con, enum csm_ev ev)
+{
+	/* in this state, we ignore every event scheduled for this connection
+	 * and just wait for the session workqueue to be flushed and the
+	 * connection freed
+	 */
+	DEB("con %p, event %s\n", con, csm_ev_str(ev));
+}
+
+typedef void (ibtrs_srv_csm_ev_handler_fn)(struct ibtrs_con *, enum csm_ev);
+
+static ibtrs_srv_csm_ev_handler_fn *ibtrs_srv_csm_ev_handlers[] = {
+	[CSM_STATE_REQUESTED]		= csm_requested,
+	[CSM_STATE_CONNECTED]		= csm_connected,
+	[CSM_STATE_CLOSING]		= csm_closing,
+	[CSM_STATE_FLUSHING]		= csm_flushing,
+	[CSM_STATE_CLOSED]		= csm_closed,
+};
+
+static inline void ibtrs_srv_csm_ev_handle(struct ibtrs_con *con,
+					   enum csm_ev ev)
+{
+	return (*ibtrs_srv_csm_ev_handlers[con->state])(con, ev);
+}
+
+static void csm_worker(struct work_struct *work)
+{
+	struct csm_work *csm_w = container_of(work, struct csm_work, work);
+
+	ibtrs_srv_csm_ev_handle(csm_w->con, csm_w->ev);
+	kvfree(csm_w);
+}
+
+static void csm_schedule_event(struct ibtrs_con *con, enum csm_ev ev)
+{
+	struct csm_work *w;
+
+	if (!ibtrs_srv_sess_get(con->sess))
+		return;
+
+	while (true) {
+		if (con->state == CSM_STATE_CLOSED)
+			goto out;
+		w = ibtrs_malloc(sizeof(*w));
+		if (w)
+			break;
+		cond_resched();
+	}
+
+	w->con = con;
+	w->ev = ev;
+	INIT_WORK(&w->work, csm_worker);
+	queue_work(con->sess->sm_wq, &w->work);
+
+out:
+	ibtrs_srv_sess_put(con->sess);
+}
+
+static void sess_schedule_csm_event(struct ibtrs_session *sess, enum csm_ev ev)
+{
+	struct ibtrs_con *con;
+
+	list_for_each_entry(con, &sess->con_list, list)
+		csm_schedule_event(con, ev);
+}
+
+static void remove_sess_from_sysfs(struct ibtrs_session *sess)
+{
+	if (!sess->state_in_sysfs)
+		return;
+
+	kobject_del(&sess->kobj_stats);
+	kobject_del(&sess->kobj);
+	sess->state_in_sysfs = false;
+
+	ibtrs_srv_schedule_sysfs_put(sess);
+}
+
+static __always_inline int
+__ibtrs_srv_request_cq_notifications(struct ibtrs_con *con)
+{
+	return ibtrs_request_cq_notifications(&con->ib_con);
+}
+
+static int ibtrs_srv_request_cq_notifications(struct ibtrs_session *sess)
+{
+	struct ibtrs_con *con;
+	int err, cnt = 0;
+
+	list_for_each_entry(con, &sess->con_list, list)  {
+		if (con->state == CSM_STATE_CONNECTED) {
+			err = __ibtrs_srv_request_cq_notifications(con);
+			if (unlikely(err < 0)) {
+				return err;
+			} else if (err > 0) {
+				err = get_process_wcs(con, &cnt);
+				if (unlikely(err))
+					return err;
+			}
+		}
+	}
+
+	return 0;
+}
+
+static void ssm_idle(struct ibtrs_session *sess, enum ssm_ev ev)
+{
+	enum ssm_state state = sess->state;
+
+	DEB("sess %p, event %s, est_cnt=%d\n", sess, ssm_ev_str(ev),
+	    sess->est_cnt);
+	switch (ev) {
+	case SSM_EV_CON_DISCONNECTED:
+		sess->est_cnt--;
+		/* fall through */
+	case SSM_EV_CON_EST_ERR:
+		if (!sess->active_cnt) {
+			ibtrs_srv_destroy_ib_session(sess);
+			ssm_set_state(sess, SSM_STATE_CLOSED);
+			cancel_delayed_work(&sess->check_heartbeat_dwork);
+			schedule_sess_put(sess);
+		} else {
+			ssm_set_state(sess, SSM_STATE_CLOSING);
+		}
+		break;
+	case SSM_EV_CON_CONNECTED: {
+		int err;
+
+		sess->est_cnt++;
+		if (sess->est_cnt != sess->con_cnt)
+			break;
+
+		err = ibtrs_srv_create_sess_files(sess);
+		if (err) {
+			if (err == -EEXIST)
+				ERR(sess,
+				    "Session sysfs files already exist,"
+				    " possibly a user-space process is"
+				    " holding them\n");
+			else
+				ERR(sess,
+				    "Create session sysfs files failed,"
+				    " errno: %d\n", err);
+			goto destroy;
+		}
+
+		sess->state_in_sysfs = true;
+
+		err = ibtrs_srv_sess_ev(sess, IBTRS_SRV_SESS_EV_CONNECTED);
+		if (err) {
+			ERR(sess, "Notifying user session event"
+			    " failed, errno: %d\n. Session is closed", err);
+			goto destroy;
+		}
+
+		ssm_set_state(sess, SSM_STATE_CONNECTED);
+		err = ibtrs_srv_request_cq_notifications(sess);
+		if (err) {
+			ERR(sess, "Requesting CQ completion notifications"
+			    " failed, errno: %d. Session will be closed.\n",
+			    err);
+			goto destroy;
+		}
+
+		break;
+destroy:
+		remove_sess_from_sysfs(sess);
+		ssm_set_state(sess, SSM_STATE_CLOSING);
+		sess_schedule_csm_event(sess, CSM_EV_SESS_CLOSING);
+		break;
+	}
+	case SSM_EV_SESS_CLOSE:
+		ssm_set_state(sess, SSM_STATE_CLOSING);
+		sess_schedule_csm_event(sess, CSM_EV_SESS_CLOSING);
+		break;
+	default:
+		ERR(sess, "Session received unexpected event %s "
+		    "in %s state.\n", ssm_ev_str(ev), ssm_state_str(state));
+	}
+}
+
+static void ssm_connected(struct ibtrs_session *sess, enum ssm_ev ev)
+{
+	enum ssm_state state = sess->state;
+
+	DEB("sess %p, event %s, est_cnt=%d\n", sess, ssm_ev_str(ev),
+	    sess->est_cnt);
+	switch (ev) {
+	case SSM_EV_CON_DISCONNECTED:
+		remove_sess_from_sysfs(sess);
+		sess->est_cnt--;
+
+		ssm_set_state(sess, SSM_STATE_CLOSING);
+		ibtrs_srv_sess_ev(sess, IBTRS_SRV_SESS_EV_DISCONNECTING);
+		break;
+	case SSM_EV_SESS_CLOSE:
+	case SSM_EV_SYSFS_DISCONNECT:
+		remove_sess_from_sysfs(sess);
+		ssm_set_state(sess, SSM_STATE_CLOSING);
+		ibtrs_srv_sess_ev(sess, IBTRS_SRV_SESS_EV_DISCONNECTING);
+
+		sess_schedule_csm_event(sess, CSM_EV_SESS_CLOSING);
+		break;
+	default:
+		ERR(sess, "Session received unexpected event %s "
+		    "in %s state.\n", ssm_ev_str(ev), ssm_state_str(state));
+	}
+}
+
+static void ssm_closing(struct ibtrs_session *sess, enum ssm_ev ev)
+{
+	enum ssm_state state = sess->state;
+
+	DEB("sess %p, event %s, est_cnt=%d\n", sess, ssm_ev_str(ev),
+	    sess->est_cnt);
+	switch (ev) {
+	case SSM_EV_CON_CONNECTED:
+		sess->est_cnt++;
+		break;
+	case SSM_EV_CON_DISCONNECTED:
+		sess->est_cnt--;
+		/* fall through */
+	case SSM_EV_CON_EST_ERR:
+		if (sess->active_cnt == 0) {
+			ibtrs_srv_destroy_ib_session(sess);
+			ssm_set_state(sess, SSM_STATE_CLOSED);
+			ibtrs_srv_sess_ev(sess, IBTRS_SRV_SESS_EV_DISCONNECTED);
+			cancel_delayed_work(&sess->check_heartbeat_dwork);
+			schedule_sess_put(sess);
+		}
+		break;
+	case SSM_EV_SESS_CLOSE:
+		sess_schedule_csm_event(sess, CSM_EV_SESS_CLOSING);
+		break;
+	case SSM_EV_SYSFS_DISCONNECT:
+		/* just ignore it, the connection should have a
+		 * CSM_EV_SESS_CLOSING event on the queue to be
+		 * processed later
+		 */
+		break;
+	default:
+		ERR(sess, "Session received unexpected event %s "
+		    "in %s state.\n", ssm_ev_str(ev), ssm_state_str(state));
+	}
+}
+
+static void ssm_closed(struct ibtrs_session *sess, enum ssm_ev ev)
+{
+	/* in this state, we ignore every event and wait for the session
+	 * to be destroyed
+	 */
+	DEB("sess %p, event %s, est_cnt=%d\n", sess, ssm_ev_str(ev),
+	    sess->est_cnt);
+}
+
+typedef void (ssm_ev_handler_fn)(struct ibtrs_session *, enum ssm_ev);
+
+static ssm_ev_handler_fn *ibtrs_srv_ev_handlers[] = {
+	[SSM_STATE_IDLE]		= ssm_idle,
+	[SSM_STATE_CONNECTED]		= ssm_connected,
+	[SSM_STATE_CLOSING]		= ssm_closing,
+	[SSM_STATE_CLOSED]		= ssm_closed,
+};
+
+static void check_heartbeat_work(struct work_struct *work)
+{
+	struct ibtrs_session *sess;
+
+	sess = container_of(to_delayed_work(work), struct ibtrs_session,
+			    check_heartbeat_dwork);
+
+	if (ibtrs_heartbeat_timeout_is_expired(&sess->heartbeat)) {
+		ssm_schedule_event(sess, SSM_EV_SESS_CLOSE);
+		return;
+	}
+
+	ibtrs_heartbeat_warn(&sess->heartbeat);
+
+	if (WARN_ON(!queue_delayed_work(sess->sm_wq,
+					&sess->check_heartbeat_dwork,
+					HEARTBEAT_INTV_JIFFIES)))
+		WRN_RL(sess, "Schedule check heartbeat work failed, "
+		       "check_heartbeat worker already queued?\n");
+}
+
+static void send_heartbeat_work(struct work_struct *work)
+{
+	struct ibtrs_session *sess;
+	int err;
+
+	sess = container_of(to_delayed_work(work), struct ibtrs_session,
+			    send_heartbeat_dwork);
+
+	if (ibtrs_heartbeat_send_ts_diff_ms(&sess->heartbeat) >=
+	    HEARTBEAT_INTV_MS) {
+		err = send_heartbeat(sess);
+		if (unlikely(err)) {
+			WRN_RL(sess,
+			       "Sending heartbeat failed, errno: %d,"
+			       " no further heartbeat will be sent\n", err);
+			return;
+		}
+	}
+
+	if (WARN_ON(!queue_delayed_work(sess->sm_wq,
+					&sess->send_heartbeat_dwork,
+					HEARTBEAT_INTV_JIFFIES)))
+		WRN_RL(sess, "schedule send heartbeat work failed, "
+		       "send_heartbeat worker already queued?\n");
+}
+
+static inline void ssm_ev_handle(struct ibtrs_session *sess, enum ssm_ev ev)
+{
+	return (*ibtrs_srv_ev_handlers[sess->state])(sess, ev);
+}
+
+static void ssm_worker(struct work_struct *work)
+{
+	struct ssm_work *ssm_w = container_of(work, struct ssm_work, work);
+
+	ssm_ev_handle(ssm_w->sess, ssm_w->ev);
+	kvfree(ssm_w);
+}
+
+static int ssm_schedule_event(struct ibtrs_session *sess, enum ssm_ev ev)
+{
+	struct ssm_work *w;
+	int ret = 0;
+
+	if (!ibtrs_srv_sess_get(sess))
+		return -EPERM;
+
+	while (true) {
+		if (sess->state == SSM_STATE_CLOSED) {
+			ret = -EPERM;
+			goto out;
+		}
+		w = ibtrs_malloc(sizeof(*w));
+		if (w)
+			break;
+		cond_resched();
+	}
+
+	w->sess = sess;
+	w->ev = ev;
+	INIT_WORK(&w->work, ssm_worker);
+	queue_work(sess->sm_wq, &w->work);
+
+out:
+	ibtrs_srv_sess_put(sess);
+	return ret;
+}
+
+static int ssm_init(struct ibtrs_session *sess)
+{
+	sess->sm_wq = create_singlethread_workqueue("ibtrs_ssm_wq");
+	if (!sess->sm_wq)
+		return -ENOMEM;
+
+	INIT_DELAYED_WORK(&sess->check_heartbeat_dwork, check_heartbeat_work);
+	INIT_DELAYED_WORK(&sess->send_heartbeat_dwork, send_heartbeat_work);
+
+	ssm_set_state(sess, SSM_STATE_IDLE);
+
+	return 0;
+}
+
+static int ibtrs_srv_create_debugfs_files(void)
+{
+	int ret = 0;
+	struct dentry *file;
+
+	ibtrs_srv_debugfs_dir = debugfs_create_dir("ibtrs_server", NULL);
+	if (IS_ERR_OR_NULL(ibtrs_srv_debugfs_dir)) {
+		ibtrs_srv_debugfs_dir = NULL;
+		ret = PTR_ERR(ibtrs_srv_debugfs_dir);
+		if (ret == -ENODEV)
+			WRN_NP("Debugfs not enabled in kernel\n");
+		else
+			WRN_NP("Failed to create top-level debugfs directory,"
+			       " errno: %d\n", ret);
+		goto out;
+	}
+
+	mempool_debugfs_dir = debugfs_create_dir("mempool",
+						 ibtrs_srv_debugfs_dir);
+	if (IS_ERR_OR_NULL(mempool_debugfs_dir)) {
+		ret = PTR_ERR(mempool_debugfs_dir);
+		WRN_NP("Failed to create mempool debugfs directory,"
+		       " errno: %d\n", ret);
+		goto out_remove;
+	}
+
+	file = debugfs_create_u32("nr_free_buf_pool", 0444,
+				  mempool_debugfs_dir, &nr_free_buf_pool);
+	if (IS_ERR_OR_NULL(file)) {
+		WRN_NP("Failed to create mempool \"nr_free_buf_pool\""
+		       " debugfs file\n");
+		ret = -EINVAL;
+		goto out_remove;
+	}
+
+	file = debugfs_create_u32("nr_total_buf_pool", 0444,
+				  mempool_debugfs_dir, &nr_total_buf_pool);
+	if (IS_ERR_OR_NULL(file)) {
+		WRN_NP("Failed to create mempool \"nr_total_buf_pool\""
+		       " debugfs file\n");
+		ret = -EINVAL;
+		goto out_remove;
+	}
+
+	file = debugfs_create_u32("nr_active_sessions", 0444,
+				  mempool_debugfs_dir, &nr_active_sessions);
+	if (IS_ERR_OR_NULL(file)) {
+		WRN_NP("Failed to create mempool \"nr_active_sessions\""
+		       " debugfs file\n");
+		ret = -EINVAL;
+		goto out_remove;
+	}
+
+	goto out;
+
+out_remove:
+	debugfs_remove_recursive(ibtrs_srv_debugfs_dir);
+	ibtrs_srv_debugfs_dir = NULL;
+	mempool_debugfs_dir = NULL;
+out:
+	return ret;
+}
+
+static void ibtrs_srv_destroy_debugfs_files(void)
+{
+	debugfs_remove_recursive(ibtrs_srv_debugfs_dir);
+}
+
+static int __init ibtrs_server_init(void)
+{
+	int err;
+
+	if (!strlen(cq_affinity_list))
+		init_cq_affinity();
+
+	scnprintf(hostname, sizeof(hostname), "%s", utsname()->nodename);
+	INFO_NP("Loading module ibtrs_server, version: %s ("
+		" retry_count: %d, "
+		" default_heartbeat_timeout_ms: %d,"
+		" cq_affinity_list: %s, max_io_size: %d,"
+		" sess_queue_depth: %d, init_pool_size: %d,"
+		" pool_size_hi_wm: %d, hostname: %s)\n",
+		__stringify(IBTRS_VER),
+		retry_count, default_heartbeat_timeout_ms,
+		cq_affinity_list, max_io_size, sess_queue_depth,
+		init_pool_size, pool_size_hi_wm, hostname);
+
+	err = check_module_params();
+	if (err) {
+		ERR_NP("Failed to load module, invalid module parameters,"
+		       " errno: %d\n", err);
+		return err;
+	}
+
+	destroy_wq = alloc_workqueue("ibtrs_server_destroy_wq", 0, 0);
+	if (!destroy_wq) {
+		ERR_NP("Failed to load module,"
+		       " alloc ibtrs_server_destroy_wq failed\n");
+		return -ENOMEM;
+	}
+
+	err = ibtrs_srv_create_sysfs_files();
+	if (err) {
+		ERR_NP("Failed to load module, can't create sysfs files,"
+		       " errno: %d\n", err);
+		goto out_destroy_wq;
+	}
+
+	err = ibtrs_srv_create_debugfs_files();
+	if (err)
+		WRN_NP("Unable to create debugfs files, errno: %d."
+		       " Continuing without debugfs\n", err);
+
+	return 0;
+
+out_destroy_wq:
+	destroy_workqueue(destroy_wq);
+	return err;
+}
+
+static void __exit ibtrs_server_exit(void)
+{
+	INFO_NP("Unloading module\n");
+	ibtrs_srv_destroy_debugfs_files();
+	ibtrs_srv_destroy_sysfs_files();
+	destroy_workqueue(destroy_wq);
+
+	INFO_NP("Module unloaded\n");
+}
+
+module_init(ibtrs_server_init);
+module_exit(ibtrs_server_exit);
-- 
2.7.4





[Index of Archives]     [Linux RAID]     [Linux SCSI]     [Linux ATA RAID]     [IDE]     [Linux Wireless]     [Linux Kernel]     [ATH6KL]     [Linux Bluetooth]     [Linux Netdev]     [Kernel Newbies]     [Security]     [Git]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Device Mapper]

  Powered by Linux