diff mbox series

[v9,10/25] RDMA/rtrs: server: main functionality

Message ID 20200221104721.350-11-jinpuwang@gmail.com (mailing list archive)
State New, archived
Headers show
Series RTRS (former IBTRS) RDMA Transport Library and RNBD (former IBNBD) RDMA Network Block Device | expand

Commit Message

Jinpu Wang Feb. 21, 2020, 10:47 a.m. UTC
From: Jack Wang <jinpu.wang@cloud.ionos.com>

This is main functionality of rtrs-server module, which accepts
set of RDMA connections (so called rtrs session), creates/destroys
sysfs entries associated with rtrs session and notifies upper layer
(user of RTRS API) about RDMA requests or link events.

Signed-off-by: Danil Kipnis <danil.kipnis@cloud.ionos.com>
Signed-off-by: Jack Wang <jinpu.wang@cloud.ionos.com>
---
 drivers/infiniband/ulp/rtrs/rtrs-srv.c | 2164 ++++++++++++++++++++++++
 1 file changed, 2164 insertions(+)
 create mode 100644 drivers/infiniband/ulp/rtrs/rtrs-srv.c

Comments

Bart Van Assche March 1, 2020, 1:42 a.m. UTC | #1
On 2020-02-21 02:47, Jack Wang wrote:
> +int rtrs_srv_get_sess_name(struct rtrs_srv *srv, char *sessname, size_t len)
> +{
> +	struct rtrs_srv_sess *sess;
> +	int err = -ENOTCONN;
> +
> +	mutex_lock(&srv->paths_mutex);
> +	list_for_each_entry(sess, &srv->paths_list, s.entry) {
> +		if (sess->state != RTRS_SRV_CONNECTED)
> +			continue;
> +		memcpy(sessname, sess->s.sessname,
> +		       min_t(size_t, sizeof(sess->s.sessname), len));
> +		err = 0;
> +		break;
> +	}
> +	mutex_unlock(&srv->paths_mutex);
> +
> +	return err;
> +}
> +EXPORT_SYMBOL(rtrs_srv_get_sess_name);

Please make sure that the returned string is '\0'-terminated, e.g. by
using strlcpy().

> +static int rtrs_rdma_do_accept(struct rtrs_srv_sess *sess,
> +			       struct rdma_cm_id *cm_id)
> +{
> +	struct rtrs_srv *srv = sess->srv;
> +	struct rtrs_msg_conn_rsp msg;
> +	struct rdma_conn_param param;
> +	int err;
> +
> +	param = (struct rdma_conn_param) {
> +	.rnr_retry_count = 7,
> +	.private_data = &msg,
> +	.private_data_len = sizeof(msg),
> +	};
> +
> +	msg = (struct rtrs_msg_conn_rsp) {
> +	.magic = cpu_to_le16(RTRS_MAGIC),
> +	.version = cpu_to_le16(RTRS_PROTO_VER),
> +	.queue_depth = cpu_to_le16(srv->queue_depth),
> +	.max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE),
> +	.max_hdr_size = cpu_to_le32(MAX_HDR_SIZE),
> +	};
> +
> +	if (always_invalidate)
> +		msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F);
> +
> +	err = rdma_accept(cm_id, &param);
> +	if (err)
> +		pr_err("rdma_accept(), err: %d\n", err);
> +
> +	return err;
> +}

Please indent the members in the structure assignments.

> +static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno)
> +{
> +	struct rtrs_msg_conn_rsp msg;
> +	int err;
> +
> +	msg = (struct rtrs_msg_conn_rsp) {
> +	.magic = cpu_to_le16(RTRS_MAGIC),
> +	.version = cpu_to_le16(RTRS_PROTO_VER),
> +	.errno = cpu_to_le16(errno),
> +	};
> +
> +	err = rdma_reject(cm_id, &msg, sizeof(msg));
> +	if (err)
> +		pr_err("rdma_reject(), err: %d\n", err);
> +
> +	/* Bounce errno back */
> +	return errno;
> +}

Same comment for this function.

Thanks,

Bart.
Jinpu Wang March 2, 2020, 2:39 p.m. UTC | #2
On Sun, Mar 1, 2020 at 2:42 AM Bart Van Assche <bvanassche@acm.org> wrote:
>
> On 2020-02-21 02:47, Jack Wang wrote:
> > +int rtrs_srv_get_sess_name(struct rtrs_srv *srv, char *sessname, size_t len)
> > +{
> > +     struct rtrs_srv_sess *sess;
> > +     int err = -ENOTCONN;
> > +
> > +     mutex_lock(&srv->paths_mutex);
> > +     list_for_each_entry(sess, &srv->paths_list, s.entry) {
> > +             if (sess->state != RTRS_SRV_CONNECTED)
> > +                     continue;
> > +             memcpy(sessname, sess->s.sessname,
> > +                    min_t(size_t, sizeof(sess->s.sessname), len));
> > +             err = 0;
> > +             break;
> > +     }
> > +     mutex_unlock(&srv->paths_mutex);
> > +
> > +     return err;
> > +}
> > +EXPORT_SYMBOL(rtrs_srv_get_sess_name);
>
> Please make sure that the returned string is '\0'-terminated, e.g. by
> using strlcpy().
Ok.
>
> > +static int rtrs_rdma_do_accept(struct rtrs_srv_sess *sess,
> > +                            struct rdma_cm_id *cm_id)
> > +{
> > +     struct rtrs_srv *srv = sess->srv;
> > +     struct rtrs_msg_conn_rsp msg;
> > +     struct rdma_conn_param param;
> > +     int err;
> > +
> > +     param = (struct rdma_conn_param) {
> > +     .rnr_retry_count = 7,
> > +     .private_data = &msg,
> > +     .private_data_len = sizeof(msg),
> > +     };
> > +
> > +     msg = (struct rtrs_msg_conn_rsp) {
> > +     .magic = cpu_to_le16(RTRS_MAGIC),
> > +     .version = cpu_to_le16(RTRS_PROTO_VER),
> > +     .queue_depth = cpu_to_le16(srv->queue_depth),
> > +     .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE),
> > +     .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE),
> > +     };
> > +
> > +     if (always_invalidate)
> > +             msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F);
> > +
> > +     err = rdma_accept(cm_id, &param);
> > +     if (err)
> > +             pr_err("rdma_accept(), err: %d\n", err);
> > +
> > +     return err;
> > +}
>
> Please indent the members in the structure assignments.
ok.
>
> > +static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno)
> > +{
> > +     struct rtrs_msg_conn_rsp msg;
> > +     int err;
> > +
> > +     msg = (struct rtrs_msg_conn_rsp) {
> > +     .magic = cpu_to_le16(RTRS_MAGIC),
> > +     .version = cpu_to_le16(RTRS_PROTO_VER),
> > +     .errno = cpu_to_le16(errno),
> > +     };
> > +
> > +     err = rdma_reject(cm_id, &msg, sizeof(msg));
> > +     if (err)
> > +             pr_err("rdma_reject(), err: %d\n", err);
> > +
> > +     /* Bounce errno back */
> > +     return errno;
> > +}
>
> Same comment for this function.
Ok, thanks Bart
>
> Thanks,
>
> Bart.
Leon Romanovsky March 3, 2020, 11:37 a.m. UTC | #3
On Fri, Feb 21, 2020 at 11:47:06AM +0100, Jack Wang wrote:
> From: Jack Wang <jinpu.wang@cloud.ionos.com>
>
> This is main functionality of rtrs-server module, which accepts
> set of RDMA connections (so called rtrs session), creates/destroys
> sysfs entries associated with rtrs session and notifies upper layer
> (user of RTRS API) about RDMA requests or link events.
>
> Signed-off-by: Danil Kipnis <danil.kipnis@cloud.ionos.com>
> Signed-off-by: Jack Wang <jinpu.wang@cloud.ionos.com>
> ---
>  drivers/infiniband/ulp/rtrs/rtrs-srv.c | 2164 ++++++++++++++++++++++++
>  1 file changed, 2164 insertions(+)
>  create mode 100644 drivers/infiniband/ulp/rtrs/rtrs-srv.c
>
> diff --git a/drivers/infiniband/ulp/rtrs/rtrs-srv.c b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> new file mode 100644
> index 000000000000..e60ee6dd675d
> --- /dev/null
> +++ b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> @@ -0,0 +1,2164 @@
> +// SPDX-License-Identifier: GPL-2.0-or-later
> +/*
> + * RDMA Transport Layer
> + *
> + * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
> + * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
> + * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
> + */
> +
> +#undef pr_fmt
> +#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
> +
> +#include <linux/module.h>
> +#include <linux/mempool.h>
> +
> +#include "rtrs-srv.h"
> +#include "rtrs-log.h"
> +
> +MODULE_DESCRIPTION("RDMA Transport Server");
> +MODULE_LICENSE("GPL");
> +
> +/* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */
> +#define DEFAULT_MAX_CHUNK_SIZE (128 << 10)
> +#define DEFAULT_SESS_QUEUE_DEPTH 512
> +#define MAX_HDR_SIZE PAGE_SIZE
> +#define MAX_SG_COUNT ((MAX_HDR_SIZE - sizeof(struct rtrs_msg_rdma_read)) \
> +		      / sizeof(struct rtrs_sg_desc))
> +
> +/* We guarantee to serve 10 paths at least */
> +#define CHUNK_POOL_SZ 10
> +
> +static struct rtrs_rdma_dev_pd dev_pd;
> +static mempool_t *chunk_pool;
> +struct class *rtrs_dev_class;
> +
> +static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE;
> +static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH;
> +
> +static bool always_invalidate = true;
> +module_param(always_invalidate, bool, 0444);
> +MODULE_PARM_DESC(always_invalidate,
> +		 "Invalidate memory registration for contiguous memory regions before accessing.");
> +
> +module_param_named(max_chunk_size, max_chunk_size, int, 0444);
> +MODULE_PARM_DESC(max_chunk_size,
> +		 "Max size for each IO request, when change the unit is in byte (default: "
> +		 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)");
> +
> +module_param_named(sess_queue_depth, sess_queue_depth, int, 0444);
> +MODULE_PARM_DESC(sess_queue_depth,
> +		 "Number of buffers for pending I/O requests to allocate per session. Maximum: "
> +		 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: "
> +		 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");

We don't like module parameters in the RDMA.

> +
> +static char cq_affinity_list[256];
> +static cpumask_t cq_affinity_mask = { CPU_BITS_ALL };
> +
> +static void init_cq_affinity(void)
> +{
> +	sprintf(cq_affinity_list, "0-%d", nr_cpu_ids - 1);
> +}
> +
> +static int cq_affinity_list_set(const char *val, const struct kernel_param *kp)
> +{
> +	int ret = 0, len = strlen(val);
> +	cpumask_var_t new_value;
> +
> +	init_cq_affinity();
> +
> +	if (len >= sizeof(cq_affinity_list))
> +		return -EINVAL;
> +	if (!alloc_cpumask_var(&new_value, GFP_KERNEL))
> +		return -ENOMEM;
> +
> +	ret = cpulist_parse(val, new_value);
> +	if (ret) {
> +		pr_err("Can't set cq_affinity_list \"%s\": %d\n", val,
> +		       ret);
> +		goto free_cpumask;
> +	}
> +
> +	strlcpy(cq_affinity_list, val, sizeof(cq_affinity_list));
> +	*strchrnul(cq_affinity_list, '\n') = '\0';
> +	cpumask_copy(&cq_affinity_mask, new_value);
> +
> +	pr_info("cq_affinity_list changed to %*pbl\n",
> +		cpumask_pr_args(&cq_affinity_mask));
> +free_cpumask:
> +	free_cpumask_var(new_value);
> +	return ret;
> +}
> +
> +static struct kparam_string cq_affinity_list_kparam_str = {
> +	.maxlen	= sizeof(cq_affinity_list),
> +	.string	= cq_affinity_list
> +};
> +
> +static const struct kernel_param_ops cq_affinity_list_ops = {
> +	.set	= cq_affinity_list_set,
> +	.get	= param_get_string,
> +};
> +
> +module_param_cb(cq_affinity_list, &cq_affinity_list_ops,
> +		&cq_affinity_list_kparam_str, 0644);
> +MODULE_PARM_DESC(cq_affinity_list,
> +		 "Sets the list of cpus to use as cq vectors. (default: use all possible CPUs)");

I don't think that you should mess with device affinity assignment.
Why don't you use ib_get_vector_affinity()?

Thanks
Jinpu Wang March 3, 2020, 4:41 p.m. UTC | #4
On Tue, Mar 3, 2020 at 12:37 PM Leon Romanovsky <leon@kernel.org> wrote:
>
> On Fri, Feb 21, 2020 at 11:47:06AM +0100, Jack Wang wrote:
> > From: Jack Wang <jinpu.wang@cloud.ionos.com>
> >
> > This is main functionality of rtrs-server module, which accepts
> > set of RDMA connections (so called rtrs session), creates/destroys
> > sysfs entries associated with rtrs session and notifies upper layer
> > (user of RTRS API) about RDMA requests or link events.
> >
> > Signed-off-by: Danil Kipnis <danil.kipnis@cloud.ionos.com>
> > Signed-off-by: Jack Wang <jinpu.wang@cloud.ionos.com>
> > ---
> >  drivers/infiniband/ulp/rtrs/rtrs-srv.c | 2164 ++++++++++++++++++++++++
> >  1 file changed, 2164 insertions(+)
> >  create mode 100644 drivers/infiniband/ulp/rtrs/rtrs-srv.c
> >
> > diff --git a/drivers/infiniband/ulp/rtrs/rtrs-srv.c b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > new file mode 100644
> > index 000000000000..e60ee6dd675d
> > --- /dev/null
> > +++ b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > @@ -0,0 +1,2164 @@
> > +// SPDX-License-Identifier: GPL-2.0-or-later
> > +/*
> > + * RDMA Transport Layer
> > + *
> > + * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
> > + * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
> > + * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
> > + */
> > +
> > +#undef pr_fmt
> > +#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
> > +
> > +#include <linux/module.h>
> > +#include <linux/mempool.h>
> > +
> > +#include "rtrs-srv.h"
> > +#include "rtrs-log.h"
> > +
> > +MODULE_DESCRIPTION("RDMA Transport Server");
> > +MODULE_LICENSE("GPL");
> > +
> > +/* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */
> > +#define DEFAULT_MAX_CHUNK_SIZE (128 << 10)
> > +#define DEFAULT_SESS_QUEUE_DEPTH 512
> > +#define MAX_HDR_SIZE PAGE_SIZE
> > +#define MAX_SG_COUNT ((MAX_HDR_SIZE - sizeof(struct rtrs_msg_rdma_read)) \
> > +                   / sizeof(struct rtrs_sg_desc))
> > +
> > +/* We guarantee to serve 10 paths at least */
> > +#define CHUNK_POOL_SZ 10
> > +
> > +static struct rtrs_rdma_dev_pd dev_pd;
> > +static mempool_t *chunk_pool;
> > +struct class *rtrs_dev_class;
> > +
> > +static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE;
> > +static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH;
> > +
> > +static bool always_invalidate = true;
> > +module_param(always_invalidate, bool, 0444);
> > +MODULE_PARM_DESC(always_invalidate,
> > +              "Invalidate memory registration for contiguous memory regions before accessing.");
> > +
> > +module_param_named(max_chunk_size, max_chunk_size, int, 0444);
> > +MODULE_PARM_DESC(max_chunk_size,
> > +              "Max size for each IO request, when change the unit is in byte (default: "
> > +              __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)");
> > +
> > +module_param_named(sess_queue_depth, sess_queue_depth, int, 0444);
> > +MODULE_PARM_DESC(sess_queue_depth,
> > +              "Number of buffers for pending I/O requests to allocate per session. Maximum: "
> > +              __stringify(MAX_SESS_QUEUE_DEPTH) " (default: "
> > +              __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
>
> We don't like module parameters in the RDMA.
Hi Leon,

These paramters are affecting resouce usage/performance, I think would
be good to have them as module parameters,
so admin could choose based their needs.
>
> > +
> > +static char cq_affinity_list[256];
> > +static cpumask_t cq_affinity_mask = { CPU_BITS_ALL };
> > +
> > +static void init_cq_affinity(void)
> > +{
> > +     sprintf(cq_affinity_list, "0-%d", nr_cpu_ids - 1);
> > +}
> > +
> > +static int cq_affinity_list_set(const char *val, const struct kernel_param *kp)
> > +{
> > +     int ret = 0, len = strlen(val);
> > +     cpumask_var_t new_value;
> > +
> > +     init_cq_affinity();
> > +
> > +     if (len >= sizeof(cq_affinity_list))
> > +             return -EINVAL;
> > +     if (!alloc_cpumask_var(&new_value, GFP_KERNEL))
> > +             return -ENOMEM;
> > +
> > +     ret = cpulist_parse(val, new_value);
> > +     if (ret) {
> > +             pr_err("Can't set cq_affinity_list \"%s\": %d\n", val,
> > +                    ret);
> > +             goto free_cpumask;
> > +     }
> > +
> > +     strlcpy(cq_affinity_list, val, sizeof(cq_affinity_list));
> > +     *strchrnul(cq_affinity_list, '\n') = '\0';
> > +     cpumask_copy(&cq_affinity_mask, new_value);
> > +
> > +     pr_info("cq_affinity_list changed to %*pbl\n",
> > +             cpumask_pr_args(&cq_affinity_mask));
> > +free_cpumask:
> > +     free_cpumask_var(new_value);
> > +     return ret;
> > +}
> > +
> > +static struct kparam_string cq_affinity_list_kparam_str = {
> > +     .maxlen = sizeof(cq_affinity_list),
> > +     .string = cq_affinity_list
> > +};
> > +
> > +static const struct kernel_param_ops cq_affinity_list_ops = {
> > +     .set    = cq_affinity_list_set,
> > +     .get    = param_get_string,
> > +};
> > +
> > +module_param_cb(cq_affinity_list, &cq_affinity_list_ops,
> > +             &cq_affinity_list_kparam_str, 0644);
> > +MODULE_PARM_DESC(cq_affinity_list,
> > +              "Sets the list of cpus to use as cq vectors. (default: use all possible CPUs)");
>
> I don't think that you should mess with device affinity assignment.
> Why don't you use ib_get_vector_affinity()?

cq_affinity_list has used the default(all cpu cores) in daily usage,
will remove.
Maybe it's a bit misleading, the cq_affinity_list is to allow sysadmin
to controll rtrs-srv
how to choose which cq_vector to use when create_cq.

ib_get_vector_affinity seems to return the cpumask for a given cq_vector.

Thanks



>
> Thanks
Leon Romanovsky March 3, 2020, 4:59 p.m. UTC | #5
On Tue, Mar 03, 2020 at 05:41:27PM +0100, Jinpu Wang wrote:
> On Tue, Mar 3, 2020 at 12:37 PM Leon Romanovsky <leon@kernel.org> wrote:
> >
> > On Fri, Feb 21, 2020 at 11:47:06AM +0100, Jack Wang wrote:
> > > From: Jack Wang <jinpu.wang@cloud.ionos.com>
> > >
> > > This is main functionality of rtrs-server module, which accepts
> > > set of RDMA connections (so called rtrs session), creates/destroys
> > > sysfs entries associated with rtrs session and notifies upper layer
> > > (user of RTRS API) about RDMA requests or link events.
> > >
> > > Signed-off-by: Danil Kipnis <danil.kipnis@cloud.ionos.com>
> > > Signed-off-by: Jack Wang <jinpu.wang@cloud.ionos.com>
> > > ---
> > >  drivers/infiniband/ulp/rtrs/rtrs-srv.c | 2164 ++++++++++++++++++++++++
> > >  1 file changed, 2164 insertions(+)
> > >  create mode 100644 drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > >
> > > diff --git a/drivers/infiniband/ulp/rtrs/rtrs-srv.c b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > new file mode 100644
> > > index 000000000000..e60ee6dd675d
> > > --- /dev/null
> > > +++ b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > @@ -0,0 +1,2164 @@
> > > +// SPDX-License-Identifier: GPL-2.0-or-later
> > > +/*
> > > + * RDMA Transport Layer
> > > + *
> > > + * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
> > > + * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
> > > + * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
> > > + */
> > > +
> > > +#undef pr_fmt
> > > +#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
> > > +
> > > +#include <linux/module.h>
> > > +#include <linux/mempool.h>
> > > +
> > > +#include "rtrs-srv.h"
> > > +#include "rtrs-log.h"
> > > +
> > > +MODULE_DESCRIPTION("RDMA Transport Server");
> > > +MODULE_LICENSE("GPL");
> > > +
> > > +/* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */
> > > +#define DEFAULT_MAX_CHUNK_SIZE (128 << 10)
> > > +#define DEFAULT_SESS_QUEUE_DEPTH 512
> > > +#define MAX_HDR_SIZE PAGE_SIZE
> > > +#define MAX_SG_COUNT ((MAX_HDR_SIZE - sizeof(struct rtrs_msg_rdma_read)) \
> > > +                   / sizeof(struct rtrs_sg_desc))
> > > +
> > > +/* We guarantee to serve 10 paths at least */
> > > +#define CHUNK_POOL_SZ 10
> > > +
> > > +static struct rtrs_rdma_dev_pd dev_pd;
> > > +static mempool_t *chunk_pool;
> > > +struct class *rtrs_dev_class;
> > > +
> > > +static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE;
> > > +static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH;
> > > +
> > > +static bool always_invalidate = true;
> > > +module_param(always_invalidate, bool, 0444);
> > > +MODULE_PARM_DESC(always_invalidate,
> > > +              "Invalidate memory registration for contiguous memory regions before accessing.");
> > > +
> > > +module_param_named(max_chunk_size, max_chunk_size, int, 0444);
> > > +MODULE_PARM_DESC(max_chunk_size,
> > > +              "Max size for each IO request, when change the unit is in byte (default: "
> > > +              __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)");
> > > +
> > > +module_param_named(sess_queue_depth, sess_queue_depth, int, 0444);
> > > +MODULE_PARM_DESC(sess_queue_depth,
> > > +              "Number of buffers for pending I/O requests to allocate per session. Maximum: "
> > > +              __stringify(MAX_SESS_QUEUE_DEPTH) " (default: "
> > > +              __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
> >
> > We don't like module parameters in the RDMA.
> Hi Leon,
>
> These paramters are affecting resouce usage/performance, I think would
> be good to have them as module parameters,
> so admin could choose based their needs.

It is premature optimization before second user comes, also it is
based on the assumption that everyone uses modules, which is not true.

Thanks
Jinpu Wang March 4, 2020, 11:03 a.m. UTC | #6
On Tue, Mar 3, 2020 at 5:59 PM Leon Romanovsky <leon@kernel.org> wrote:
>
> On Tue, Mar 03, 2020 at 05:41:27PM +0100, Jinpu Wang wrote:
> > On Tue, Mar 3, 2020 at 12:37 PM Leon Romanovsky <leon@kernel.org> wrote:
> > >
> > > On Fri, Feb 21, 2020 at 11:47:06AM +0100, Jack Wang wrote:
> > > > From: Jack Wang <jinpu.wang@cloud.ionos.com>
> > > >
> > > > This is main functionality of rtrs-server module, which accepts
> > > > set of RDMA connections (so called rtrs session), creates/destroys
> > > > sysfs entries associated with rtrs session and notifies upper layer
> > > > (user of RTRS API) about RDMA requests or link events.
> > > >
> > > > Signed-off-by: Danil Kipnis <danil.kipnis@cloud.ionos.com>
> > > > Signed-off-by: Jack Wang <jinpu.wang@cloud.ionos.com>
> > > > ---
> > > >  drivers/infiniband/ulp/rtrs/rtrs-srv.c | 2164 ++++++++++++++++++++++++
> > > >  1 file changed, 2164 insertions(+)
> > > >  create mode 100644 drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > >
> > > > diff --git a/drivers/infiniband/ulp/rtrs/rtrs-srv.c b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > > new file mode 100644
> > > > index 000000000000..e60ee6dd675d
> > > > --- /dev/null
> > > > +++ b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > > @@ -0,0 +1,2164 @@
> > > > +// SPDX-License-Identifier: GPL-2.0-or-later
> > > > +/*
> > > > + * RDMA Transport Layer
> > > > + *
> > > > + * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
> > > > + * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
> > > > + * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
> > > > + */
> > > > +
> > > > +#undef pr_fmt
> > > > +#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
> > > > +
> > > > +#include <linux/module.h>
> > > > +#include <linux/mempool.h>
> > > > +
> > > > +#include "rtrs-srv.h"
> > > > +#include "rtrs-log.h"
> > > > +
> > > > +MODULE_DESCRIPTION("RDMA Transport Server");
> > > > +MODULE_LICENSE("GPL");
> > > > +
> > > > +/* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */
> > > > +#define DEFAULT_MAX_CHUNK_SIZE (128 << 10)
> > > > +#define DEFAULT_SESS_QUEUE_DEPTH 512
> > > > +#define MAX_HDR_SIZE PAGE_SIZE
> > > > +#define MAX_SG_COUNT ((MAX_HDR_SIZE - sizeof(struct rtrs_msg_rdma_read)) \
> > > > +                   / sizeof(struct rtrs_sg_desc))
> > > > +
> > > > +/* We guarantee to serve 10 paths at least */
> > > > +#define CHUNK_POOL_SZ 10
> > > > +
> > > > +static struct rtrs_rdma_dev_pd dev_pd;
> > > > +static mempool_t *chunk_pool;
> > > > +struct class *rtrs_dev_class;
> > > > +
> > > > +static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE;
> > > > +static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH;
> > > > +
> > > > +static bool always_invalidate = true;
> > > > +module_param(always_invalidate, bool, 0444);
> > > > +MODULE_PARM_DESC(always_invalidate,
> > > > +              "Invalidate memory registration for contiguous memory regions before accessing.");
> > > > +
> > > > +module_param_named(max_chunk_size, max_chunk_size, int, 0444);
> > > > +MODULE_PARM_DESC(max_chunk_size,
> > > > +              "Max size for each IO request, when change the unit is in byte (default: "
> > > > +              __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)");
> > > > +
> > > > +module_param_named(sess_queue_depth, sess_queue_depth, int, 0444);
> > > > +MODULE_PARM_DESC(sess_queue_depth,
> > > > +              "Number of buffers for pending I/O requests to allocate per session. Maximum: "
> > > > +              __stringify(MAX_SESS_QUEUE_DEPTH) " (default: "
> > > > +              __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
> > >
> > > We don't like module parameters in the RDMA.
> > Hi Leon,
> >
> > These paramters are affecting resouce usage/performance, I think would
> > be good to have them as module parameters,
> > so admin could choose based their needs.
>
> It is premature optimization before second user comes, also it is
> based on the assumption that everyone uses modules, which is not true.
The idea to have module parameters is to cover more use cases, IMHO.

Even you builtin the module to the kernel, you can still change the
module parameters
by passing the "moduls_name.paramters" in kernel command line, eg:
kvm.nx_huge_pages=true
>
> Thanks
Thanks
Leon Romanovsky March 5, 2020, 8 a.m. UTC | #7
On Wed, Mar 04, 2020 at 12:03:32PM +0100, Jinpu Wang wrote:
> On Tue, Mar 3, 2020 at 5:59 PM Leon Romanovsky <leon@kernel.org> wrote:
> >
> > On Tue, Mar 03, 2020 at 05:41:27PM +0100, Jinpu Wang wrote:
> > > On Tue, Mar 3, 2020 at 12:37 PM Leon Romanovsky <leon@kernel.org> wrote:
> > > >
> > > > On Fri, Feb 21, 2020 at 11:47:06AM +0100, Jack Wang wrote:
> > > > > From: Jack Wang <jinpu.wang@cloud.ionos.com>
> > > > >
> > > > > This is main functionality of rtrs-server module, which accepts
> > > > > set of RDMA connections (so called rtrs session), creates/destroys
> > > > > sysfs entries associated with rtrs session and notifies upper layer
> > > > > (user of RTRS API) about RDMA requests or link events.
> > > > >
> > > > > Signed-off-by: Danil Kipnis <danil.kipnis@cloud.ionos.com>
> > > > > Signed-off-by: Jack Wang <jinpu.wang@cloud.ionos.com>
> > > > > ---
> > > > >  drivers/infiniband/ulp/rtrs/rtrs-srv.c | 2164 ++++++++++++++++++++++++
> > > > >  1 file changed, 2164 insertions(+)
> > > > >  create mode 100644 drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > > >
> > > > > diff --git a/drivers/infiniband/ulp/rtrs/rtrs-srv.c b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > > > new file mode 100644
> > > > > index 000000000000..e60ee6dd675d
> > > > > --- /dev/null
> > > > > +++ b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > > > @@ -0,0 +1,2164 @@
> > > > > +// SPDX-License-Identifier: GPL-2.0-or-later
> > > > > +/*
> > > > > + * RDMA Transport Layer
> > > > > + *
> > > > > + * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
> > > > > + * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
> > > > > + * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
> > > > > + */
> > > > > +
> > > > > +#undef pr_fmt
> > > > > +#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
> > > > > +
> > > > > +#include <linux/module.h>
> > > > > +#include <linux/mempool.h>
> > > > > +
> > > > > +#include "rtrs-srv.h"
> > > > > +#include "rtrs-log.h"
> > > > > +
> > > > > +MODULE_DESCRIPTION("RDMA Transport Server");
> > > > > +MODULE_LICENSE("GPL");
> > > > > +
> > > > > +/* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */
> > > > > +#define DEFAULT_MAX_CHUNK_SIZE (128 << 10)
> > > > > +#define DEFAULT_SESS_QUEUE_DEPTH 512
> > > > > +#define MAX_HDR_SIZE PAGE_SIZE
> > > > > +#define MAX_SG_COUNT ((MAX_HDR_SIZE - sizeof(struct rtrs_msg_rdma_read)) \
> > > > > +                   / sizeof(struct rtrs_sg_desc))
> > > > > +
> > > > > +/* We guarantee to serve 10 paths at least */
> > > > > +#define CHUNK_POOL_SZ 10
> > > > > +
> > > > > +static struct rtrs_rdma_dev_pd dev_pd;
> > > > > +static mempool_t *chunk_pool;
> > > > > +struct class *rtrs_dev_class;
> > > > > +
> > > > > +static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE;
> > > > > +static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH;
> > > > > +
> > > > > +static bool always_invalidate = true;
> > > > > +module_param(always_invalidate, bool, 0444);
> > > > > +MODULE_PARM_DESC(always_invalidate,
> > > > > +              "Invalidate memory registration for contiguous memory regions before accessing.");
> > > > > +
> > > > > +module_param_named(max_chunk_size, max_chunk_size, int, 0444);
> > > > > +MODULE_PARM_DESC(max_chunk_size,
> > > > > +              "Max size for each IO request, when change the unit is in byte (default: "
> > > > > +              __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)");
> > > > > +
> > > > > +module_param_named(sess_queue_depth, sess_queue_depth, int, 0444);
> > > > > +MODULE_PARM_DESC(sess_queue_depth,
> > > > > +              "Number of buffers for pending I/O requests to allocate per session. Maximum: "
> > > > > +              __stringify(MAX_SESS_QUEUE_DEPTH) " (default: "
> > > > > +              __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
> > > >
> > > > We don't like module parameters in the RDMA.
> > > Hi Leon,
> > >
> > > These paramters are affecting resouce usage/performance, I think would
> > > be good to have them as module parameters,
> > > so admin could choose based their needs.
> >
> > It is premature optimization before second user comes, also it is
> > based on the assumption that everyone uses modules, which is not true.
> The idea to have module parameters is to cover more use cases, IMHO.
>
> Even you builtin the module to the kernel, you can still change the
> module parameters
> by passing the "moduls_name.paramters" in kernel command line, eg:
> kvm.nx_huge_pages=true

I know about that, but it doesn't make them helpful.

Thanks

> >
> > Thanks
> Thanks
Leon Romanovsky March 5, 2020, 12:16 p.m. UTC | #8
On Thu, Mar 05, 2020 at 01:01:08PM +0100, Danil Kipnis wrote:
> On Thu, 5 Mar 2020, 09:00 Leon Romanovsky, <leon@kernel.org> wrote:
>
> > On Wed, Mar 04, 2020 at 12:03:32PM +0100, Jinpu Wang wrote:
> > > On Tue, Mar 3, 2020 at 5:59 PM Leon Romanovsky <leon@kernel.org> wrote:
> > > >
> > > > On Tue, Mar 03, 2020 at 05:41:27PM +0100, Jinpu Wang wrote:
> > > > > On Tue, Mar 3, 2020 at 12:37 PM Leon Romanovsky <leon@kernel.org>
> > wrote:
> > > > > >
> > > > > > On Fri, Feb 21, 2020 at 11:47:06AM +0100, Jack Wang wrote:
> > > > > > > From: Jack Wang <jinpu.wang@cloud.ionos.com>
> > > > > > >
> > > > > > > This is main functionality of rtrs-server module, which accepts
> > > > > > > set of RDMA connections (so called rtrs session),
> > creates/destroys
> > > > > > > sysfs entries associated with rtrs session and notifies upper
> > layer
> > > > > > > (user of RTRS API) about RDMA requests or link events.
> > > > > > >
> > > > > > > Signed-off-by: Danil Kipnis <danil.kipnis@cloud.ionos.com>
> > > > > > > Signed-off-by: Jack Wang <jinpu.wang@cloud.ionos.com>
> > > > > > > ---
> > > > > > >  drivers/infiniband/ulp/rtrs/rtrs-srv.c | 2164
> > ++++++++++++++++++++++++
> > > > > > >  1 file changed, 2164 insertions(+)
> > > > > > >  create mode 100644 drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > > > > >
> > > > > > > diff --git a/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > > > > > new file mode 100644
> > > > > > > index 000000000000..e60ee6dd675d
> > > > > > > --- /dev/null
> > > > > > > +++ b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > > > > > @@ -0,0 +1,2164 @@
> > > > > > > +// SPDX-License-Identifier: GPL-2.0-or-later
> > > > > > > +/*
> > > > > > > + * RDMA Transport Layer
> > > > > > > + *
> > > > > > > + * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights
> > reserved.
> > > > > > > + * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights
> > reserved.
> > > > > > > + * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
> > > > > > > + */
> > > > > > > +
> > > > > > > +#undef pr_fmt
> > > > > > > +#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__)
> > ": " fmt
> > > > > > > +
> > > > > > > +#include <linux/module.h>
> > > > > > > +#include <linux/mempool.h>
> > > > > > > +
> > > > > > > +#include "rtrs-srv.h"
> > > > > > > +#include "rtrs-log.h"
> > > > > > > +
> > > > > > > +MODULE_DESCRIPTION("RDMA Transport Server");
> > > > > > > +MODULE_LICENSE("GPL");
> > > > > > > +
> > > > > > > +/* Must be power of 2, see mask from mr->page_size in
> > ib_sg_to_pages() */
> > > > > > > +#define DEFAULT_MAX_CHUNK_SIZE (128 << 10)
> > > > > > > +#define DEFAULT_SESS_QUEUE_DEPTH 512
> > > > > > > +#define MAX_HDR_SIZE PAGE_SIZE
> > > > > > > +#define MAX_SG_COUNT ((MAX_HDR_SIZE - sizeof(struct
> > rtrs_msg_rdma_read)) \
> > > > > > > +                   / sizeof(struct rtrs_sg_desc))
> > > > > > > +
> > > > > > > +/* We guarantee to serve 10 paths at least */
> > > > > > > +#define CHUNK_POOL_SZ 10
> > > > > > > +
> > > > > > > +static struct rtrs_rdma_dev_pd dev_pd;
> > > > > > > +static mempool_t *chunk_pool;
> > > > > > > +struct class *rtrs_dev_class;
> > > > > > > +
> > > > > > > +static int __read_mostly max_chunk_size =
> > DEFAULT_MAX_CHUNK_SIZE;
> > > > > > > +static int __read_mostly sess_queue_depth =
> > DEFAULT_SESS_QUEUE_DEPTH;
> > > > > > > +
> > > > > > > +static bool always_invalidate = true;
> > > > > > > +module_param(always_invalidate, bool, 0444);
> > > > > > > +MODULE_PARM_DESC(always_invalidate,
> > > > > > > +              "Invalidate memory registration for contiguous
> > memory regions before accessing.");
> > > > > > > +
> > > > > > > +module_param_named(max_chunk_size, max_chunk_size, int, 0444);
> > > > > > > +MODULE_PARM_DESC(max_chunk_size,
> > > > > > > +              "Max size for each IO request, when change the
> > unit is in byte (default: "
> > > > > > > +              __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)");
> > > > > > > +
> > > > > > > +module_param_named(sess_queue_depth, sess_queue_depth, int,
> > 0444);
> > > > > > > +MODULE_PARM_DESC(sess_queue_depth,
> > > > > > > +              "Number of buffers for pending I/O requests to
> > allocate per session. Maximum: "
> > > > > > > +              __stringify(MAX_SESS_QUEUE_DEPTH) " (default: "
> > > > > > > +              __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
> > > > > >
> > > > > > We don't like module parameters in the RDMA.
> > > > > Hi Leon,
> > > > >
> > > > > These paramters are affecting resouce usage/performance, I think
> > would
> > > > > be good to have them as module parameters,
> > > > > so admin could choose based their needs.
> > > >
> > > > It is premature optimization before second user comes, also it is
> > > > based on the assumption that everyone uses modules, which is not true.
> > > The idea to have module parameters is to cover more use cases, IMHO.
> > >
> > > Even you builtin the module to the kernel, you can still change the
> > > module parameters
> > > by passing the "moduls_name.paramters" in kernel command line, eg:
> > > kvm.nx_huge_pages=true
> >
> > I know about that, but it doesn't make them helpful.
> >
> > Thanks
> >
> Hi Leon,
>
> Queue_depth and max_chunksize parameters control the tradeoff between
> throuput performance and memory consumption. We do use them to set
> different values for storages equipped with SSDs (fast) and on storages
> equipped with HDDs (slow). The last parameter always_invaldate enforces the
> invalidation of an rdma buffer before its hand over to the block layer. We
> set it to no in our datacenters, since they are closed and malicious
> clients are not a threat in our scenario. In general case it defaults to
> yes, as requested by Jason. Our admins need to have control over those
> control knobs somehow... We could make sysfs entries out of them or
> something, but would it really make sense?

blk_queue_nonrot() inside your code?

>
> Thank you,
> Danil
>
> > >
> > > > Thanks
> > > Thanks
> >
Jinpu Wang March 5, 2020, 12:28 p.m. UTC | #9
On Thu, Mar 5, 2020 at 1:16 PM Leon Romanovsky <leon@kernel.org> wrote:
>
> On Thu, Mar 05, 2020 at 01:01:08PM +0100, Danil Kipnis wrote:
> > On Thu, 5 Mar 2020, 09:00 Leon Romanovsky, <leon@kernel.org> wrote:
> >
> > > On Wed, Mar 04, 2020 at 12:03:32PM +0100, Jinpu Wang wrote:
> > > > On Tue, Mar 3, 2020 at 5:59 PM Leon Romanovsky <leon@kernel.org> wrote:
> > > > >
> > > > > On Tue, Mar 03, 2020 at 05:41:27PM +0100, Jinpu Wang wrote:
> > > > > > On Tue, Mar 3, 2020 at 12:37 PM Leon Romanovsky <leon@kernel.org>
> > > wrote:
> > > > > > >
> > > > > > > On Fri, Feb 21, 2020 at 11:47:06AM +0100, Jack Wang wrote:
> > > > > > > > From: Jack Wang <jinpu.wang@cloud.ionos.com>
> > > > > > > >
> > > > > > > > This is main functionality of rtrs-server module, which accepts
> > > > > > > > set of RDMA connections (so called rtrs session),
> > > creates/destroys
> > > > > > > > sysfs entries associated with rtrs session and notifies upper
> > > layer
> > > > > > > > (user of RTRS API) about RDMA requests or link events.
> > > > > > > >
> > > > > > > > Signed-off-by: Danil Kipnis <danil.kipnis@cloud.ionos.com>
> > > > > > > > Signed-off-by: Jack Wang <jinpu.wang@cloud.ionos.com>
> > > > > > > > ---
> > > > > > > >  drivers/infiniband/ulp/rtrs/rtrs-srv.c | 2164
> > > ++++++++++++++++++++++++
> > > > > > > >  1 file changed, 2164 insertions(+)
> > > > > > > >  create mode 100644 drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > > > > > >
> > > > > > > > diff --git a/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > > > > > > new file mode 100644
> > > > > > > > index 000000000000..e60ee6dd675d
> > > > > > > > --- /dev/null
> > > > > > > > +++ b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > > > > > > @@ -0,0 +1,2164 @@
> > > > > > > > +// SPDX-License-Identifier: GPL-2.0-or-later
> > > > > > > > +/*
> > > > > > > > + * RDMA Transport Layer
> > > > > > > > + *
> > > > > > > > + * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights
> > > reserved.
> > > > > > > > + * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights
> > > reserved.
> > > > > > > > + * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
> > > > > > > > + */
> > > > > > > > +
> > > > > > > > +#undef pr_fmt
> > > > > > > > +#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__)
> > > ": " fmt
> > > > > > > > +
> > > > > > > > +#include <linux/module.h>
> > > > > > > > +#include <linux/mempool.h>
> > > > > > > > +
> > > > > > > > +#include "rtrs-srv.h"
> > > > > > > > +#include "rtrs-log.h"
> > > > > > > > +
> > > > > > > > +MODULE_DESCRIPTION("RDMA Transport Server");
> > > > > > > > +MODULE_LICENSE("GPL");
> > > > > > > > +
> > > > > > > > +/* Must be power of 2, see mask from mr->page_size in
> > > ib_sg_to_pages() */
> > > > > > > > +#define DEFAULT_MAX_CHUNK_SIZE (128 << 10)
> > > > > > > > +#define DEFAULT_SESS_QUEUE_DEPTH 512
> > > > > > > > +#define MAX_HDR_SIZE PAGE_SIZE
> > > > > > > > +#define MAX_SG_COUNT ((MAX_HDR_SIZE - sizeof(struct
> > > rtrs_msg_rdma_read)) \
> > > > > > > > +                   / sizeof(struct rtrs_sg_desc))
> > > > > > > > +
> > > > > > > > +/* We guarantee to serve 10 paths at least */
> > > > > > > > +#define CHUNK_POOL_SZ 10
> > > > > > > > +
> > > > > > > > +static struct rtrs_rdma_dev_pd dev_pd;
> > > > > > > > +static mempool_t *chunk_pool;
> > > > > > > > +struct class *rtrs_dev_class;
> > > > > > > > +
> > > > > > > > +static int __read_mostly max_chunk_size =
> > > DEFAULT_MAX_CHUNK_SIZE;
> > > > > > > > +static int __read_mostly sess_queue_depth =
> > > DEFAULT_SESS_QUEUE_DEPTH;
> > > > > > > > +
> > > > > > > > +static bool always_invalidate = true;
> > > > > > > > +module_param(always_invalidate, bool, 0444);
> > > > > > > > +MODULE_PARM_DESC(always_invalidate,
> > > > > > > > +              "Invalidate memory registration for contiguous
> > > memory regions before accessing.");
> > > > > > > > +
> > > > > > > > +module_param_named(max_chunk_size, max_chunk_size, int, 0444);
> > > > > > > > +MODULE_PARM_DESC(max_chunk_size,
> > > > > > > > +              "Max size for each IO request, when change the
> > > unit is in byte (default: "
> > > > > > > > +              __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)");
> > > > > > > > +
> > > > > > > > +module_param_named(sess_queue_depth, sess_queue_depth, int,
> > > 0444);
> > > > > > > > +MODULE_PARM_DESC(sess_queue_depth,
> > > > > > > > +              "Number of buffers for pending I/O requests to
> > > allocate per session. Maximum: "
> > > > > > > > +              __stringify(MAX_SESS_QUEUE_DEPTH) " (default: "
> > > > > > > > +              __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
> > > > > > >
> > > > > > > We don't like module parameters in the RDMA.
> > > > > > Hi Leon,
> > > > > >
> > > > > > These paramters are affecting resouce usage/performance, I think
> > > would
> > > > > > be good to have them as module parameters,
> > > > > > so admin could choose based their needs.
> > > > >
> > > > > It is premature optimization before second user comes, also it is
> > > > > based on the assumption that everyone uses modules, which is not true.
> > > > The idea to have module parameters is to cover more use cases, IMHO.
> > > >
> > > > Even you builtin the module to the kernel, you can still change the
> > > > module parameters
> > > > by passing the "moduls_name.paramters" in kernel command line, eg:
> > > > kvm.nx_huge_pages=true
> > >
> > > I know about that, but it doesn't make them helpful.
> > >
> > > Thanks
> > >
> > Hi Leon,
> >
> > Queue_depth and max_chunksize parameters control the tradeoff between
> > throuput performance and memory consumption. We do use them to set
> > different values for storages equipped with SSDs (fast) and on storages
> > equipped with HDDs (slow). The last parameter always_invaldate enforces the
> > invalidation of an rdma buffer before its hand over to the block layer. We
> > set it to no in our datacenters, since they are closed and malicious
> > clients are not a threat in our scenario. In general case it defaults to
> > yes, as requested by Jason. Our admins need to have control over those
> > control knobs somehow... We could make sysfs entries out of them or
> > something, but would it really make sense?
>
> blk_queue_nonrot() inside your code?
It's exported function, and also used by other drivers like
md/dm/target core, right?

Thanks
Leon Romanovsky March 5, 2020, 12:35 p.m. UTC | #10
On Thu, Mar 05, 2020 at 01:28:39PM +0100, Jinpu Wang wrote:
> On Thu, Mar 5, 2020 at 1:16 PM Leon Romanovsky <leon@kernel.org> wrote:
> >
> > On Thu, Mar 05, 2020 at 01:01:08PM +0100, Danil Kipnis wrote:
> > > On Thu, 5 Mar 2020, 09:00 Leon Romanovsky, <leon@kernel.org> wrote:
> > >
> > > > On Wed, Mar 04, 2020 at 12:03:32PM +0100, Jinpu Wang wrote:
> > > > > On Tue, Mar 3, 2020 at 5:59 PM Leon Romanovsky <leon@kernel.org> wrote:
> > > > > >
> > > > > > On Tue, Mar 03, 2020 at 05:41:27PM +0100, Jinpu Wang wrote:
> > > > > > > On Tue, Mar 3, 2020 at 12:37 PM Leon Romanovsky <leon@kernel.org>
> > > > wrote:
> > > > > > > >
> > > > > > > > On Fri, Feb 21, 2020 at 11:47:06AM +0100, Jack Wang wrote:
> > > > > > > > > From: Jack Wang <jinpu.wang@cloud.ionos.com>
> > > > > > > > >
> > > > > > > > > This is main functionality of rtrs-server module, which accepts
> > > > > > > > > set of RDMA connections (so called rtrs session),
> > > > creates/destroys
> > > > > > > > > sysfs entries associated with rtrs session and notifies upper
> > > > layer
> > > > > > > > > (user of RTRS API) about RDMA requests or link events.
> > > > > > > > >
> > > > > > > > > Signed-off-by: Danil Kipnis <danil.kipnis@cloud.ionos.com>
> > > > > > > > > Signed-off-by: Jack Wang <jinpu.wang@cloud.ionos.com>
> > > > > > > > > ---
> > > > > > > > >  drivers/infiniband/ulp/rtrs/rtrs-srv.c | 2164
> > > > ++++++++++++++++++++++++
> > > > > > > > >  1 file changed, 2164 insertions(+)
> > > > > > > > >  create mode 100644 drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > > > > > > >
> > > > > > > > > diff --git a/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > > b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > > > > > > > new file mode 100644
> > > > > > > > > index 000000000000..e60ee6dd675d
> > > > > > > > > --- /dev/null
> > > > > > > > > +++ b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > > > > > > > @@ -0,0 +1,2164 @@
> > > > > > > > > +// SPDX-License-Identifier: GPL-2.0-or-later
> > > > > > > > > +/*
> > > > > > > > > + * RDMA Transport Layer
> > > > > > > > > + *
> > > > > > > > > + * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights
> > > > reserved.
> > > > > > > > > + * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights
> > > > reserved.
> > > > > > > > > + * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
> > > > > > > > > + */
> > > > > > > > > +
> > > > > > > > > +#undef pr_fmt
> > > > > > > > > +#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__)
> > > > ": " fmt
> > > > > > > > > +
> > > > > > > > > +#include <linux/module.h>
> > > > > > > > > +#include <linux/mempool.h>
> > > > > > > > > +
> > > > > > > > > +#include "rtrs-srv.h"
> > > > > > > > > +#include "rtrs-log.h"
> > > > > > > > > +
> > > > > > > > > +MODULE_DESCRIPTION("RDMA Transport Server");
> > > > > > > > > +MODULE_LICENSE("GPL");
> > > > > > > > > +
> > > > > > > > > +/* Must be power of 2, see mask from mr->page_size in
> > > > ib_sg_to_pages() */
> > > > > > > > > +#define DEFAULT_MAX_CHUNK_SIZE (128 << 10)
> > > > > > > > > +#define DEFAULT_SESS_QUEUE_DEPTH 512
> > > > > > > > > +#define MAX_HDR_SIZE PAGE_SIZE
> > > > > > > > > +#define MAX_SG_COUNT ((MAX_HDR_SIZE - sizeof(struct
> > > > rtrs_msg_rdma_read)) \
> > > > > > > > > +                   / sizeof(struct rtrs_sg_desc))
> > > > > > > > > +
> > > > > > > > > +/* We guarantee to serve 10 paths at least */
> > > > > > > > > +#define CHUNK_POOL_SZ 10
> > > > > > > > > +
> > > > > > > > > +static struct rtrs_rdma_dev_pd dev_pd;
> > > > > > > > > +static mempool_t *chunk_pool;
> > > > > > > > > +struct class *rtrs_dev_class;
> > > > > > > > > +
> > > > > > > > > +static int __read_mostly max_chunk_size =
> > > > DEFAULT_MAX_CHUNK_SIZE;
> > > > > > > > > +static int __read_mostly sess_queue_depth =
> > > > DEFAULT_SESS_QUEUE_DEPTH;
> > > > > > > > > +
> > > > > > > > > +static bool always_invalidate = true;
> > > > > > > > > +module_param(always_invalidate, bool, 0444);
> > > > > > > > > +MODULE_PARM_DESC(always_invalidate,
> > > > > > > > > +              "Invalidate memory registration for contiguous
> > > > memory regions before accessing.");
> > > > > > > > > +
> > > > > > > > > +module_param_named(max_chunk_size, max_chunk_size, int, 0444);
> > > > > > > > > +MODULE_PARM_DESC(max_chunk_size,
> > > > > > > > > +              "Max size for each IO request, when change the
> > > > unit is in byte (default: "
> > > > > > > > > +              __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)");
> > > > > > > > > +
> > > > > > > > > +module_param_named(sess_queue_depth, sess_queue_depth, int,
> > > > 0444);
> > > > > > > > > +MODULE_PARM_DESC(sess_queue_depth,
> > > > > > > > > +              "Number of buffers for pending I/O requests to
> > > > allocate per session. Maximum: "
> > > > > > > > > +              __stringify(MAX_SESS_QUEUE_DEPTH) " (default: "
> > > > > > > > > +              __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
> > > > > > > >
> > > > > > > > We don't like module parameters in the RDMA.
> > > > > > > Hi Leon,
> > > > > > >
> > > > > > > These paramters are affecting resouce usage/performance, I think
> > > > would
> > > > > > > be good to have them as module parameters,
> > > > > > > so admin could choose based their needs.
> > > > > >
> > > > > > It is premature optimization before second user comes, also it is
> > > > > > based on the assumption that everyone uses modules, which is not true.
> > > > > The idea to have module parameters is to cover more use cases, IMHO.
> > > > >
> > > > > Even you builtin the module to the kernel, you can still change the
> > > > > module parameters
> > > > > by passing the "moduls_name.paramters" in kernel command line, eg:
> > > > > kvm.nx_huge_pages=true
> > > >
> > > > I know about that, but it doesn't make them helpful.
> > > >
> > > > Thanks
> > > >
> > > Hi Leon,
> > >
> > > Queue_depth and max_chunksize parameters control the tradeoff between
> > > throuput performance and memory consumption. We do use them to set
> > > different values for storages equipped with SSDs (fast) and on storages
> > > equipped with HDDs (slow). The last parameter always_invaldate enforces the
> > > invalidation of an rdma buffer before its hand over to the block layer. We
> > > set it to no in our datacenters, since they are closed and malicious
> > > clients are not a threat in our scenario. In general case it defaults to
> > > yes, as requested by Jason. Our admins need to have control over those
> > > control knobs somehow... We could make sysfs entries out of them or
> > > something, but would it really make sense?
> >
> > blk_queue_nonrot() inside your code?
> It's exported function, and also used by other drivers like
> md/dm/target core, right?

I have no clue.

Thanks

>
> Thanks
Jinpu Wang March 5, 2020, 1:02 p.m. UTC | #11
On Thu, Mar 5, 2020 at 1:35 PM Leon Romanovsky <leon@kernel.org> wrote:
>
> On Thu, Mar 05, 2020 at 01:28:39PM +0100, Jinpu Wang wrote:
> > On Thu, Mar 5, 2020 at 1:16 PM Leon Romanovsky <leon@kernel.org> wrote:
> > >
> > > On Thu, Mar 05, 2020 at 01:01:08PM +0100, Danil Kipnis wrote:
> > > > On Thu, 5 Mar 2020, 09:00 Leon Romanovsky, <leon@kernel.org> wrote:
> > > >
> > > > > On Wed, Mar 04, 2020 at 12:03:32PM +0100, Jinpu Wang wrote:
> > > > > > On Tue, Mar 3, 2020 at 5:59 PM Leon Romanovsky <leon@kernel.org> wrote:
> > > > > > >
> > > > > > > On Tue, Mar 03, 2020 at 05:41:27PM +0100, Jinpu Wang wrote:
> > > > > > > > On Tue, Mar 3, 2020 at 12:37 PM Leon Romanovsky <leon@kernel.org>
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > On Fri, Feb 21, 2020 at 11:47:06AM +0100, Jack Wang wrote:
> > > > > > > > > > From: Jack Wang <jinpu.wang@cloud.ionos.com>
> > > > > > > > > >
> > > > > > > > > > This is main functionality of rtrs-server module, which accepts
> > > > > > > > > > set of RDMA connections (so called rtrs session),
> > > > > creates/destroys
> > > > > > > > > > sysfs entries associated with rtrs session and notifies upper
> > > > > layer
> > > > > > > > > > (user of RTRS API) about RDMA requests or link events.
> > > > > > > > > >
> > > > > > > > > > Signed-off-by: Danil Kipnis <danil.kipnis@cloud.ionos.com>
> > > > > > > > > > Signed-off-by: Jack Wang <jinpu.wang@cloud.ionos.com>
> > > > > > > > > > ---
> > > > > > > > > >  drivers/infiniband/ulp/rtrs/rtrs-srv.c | 2164
> > > > > ++++++++++++++++++++++++
> > > > > > > > > >  1 file changed, 2164 insertions(+)
> > > > > > > > > >  create mode 100644 drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > > > > > > > >
> > > > > > > > > > diff --git a/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > > > b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > > > > > > > > new file mode 100644
> > > > > > > > > > index 000000000000..e60ee6dd675d
> > > > > > > > > > --- /dev/null
> > > > > > > > > > +++ b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
> > > > > > > > > > @@ -0,0 +1,2164 @@
> > > > > > > > > > +// SPDX-License-Identifier: GPL-2.0-or-later
> > > > > > > > > > +/*
> > > > > > > > > > + * RDMA Transport Layer
> > > > > > > > > > + *
> > > > > > > > > > + * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights
> > > > > reserved.
> > > > > > > > > > + * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights
> > > > > reserved.
> > > > > > > > > > + * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
> > > > > > > > > > + */
> > > > > > > > > > +
> > > > > > > > > > +#undef pr_fmt
> > > > > > > > > > +#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__)
> > > > > ": " fmt
> > > > > > > > > > +
> > > > > > > > > > +#include <linux/module.h>
> > > > > > > > > > +#include <linux/mempool.h>
> > > > > > > > > > +
> > > > > > > > > > +#include "rtrs-srv.h"
> > > > > > > > > > +#include "rtrs-log.h"
> > > > > > > > > > +
> > > > > > > > > > +MODULE_DESCRIPTION("RDMA Transport Server");
> > > > > > > > > > +MODULE_LICENSE("GPL");
> > > > > > > > > > +
> > > > > > > > > > +/* Must be power of 2, see mask from mr->page_size in
> > > > > ib_sg_to_pages() */
> > > > > > > > > > +#define DEFAULT_MAX_CHUNK_SIZE (128 << 10)
> > > > > > > > > > +#define DEFAULT_SESS_QUEUE_DEPTH 512
> > > > > > > > > > +#define MAX_HDR_SIZE PAGE_SIZE
> > > > > > > > > > +#define MAX_SG_COUNT ((MAX_HDR_SIZE - sizeof(struct
> > > > > rtrs_msg_rdma_read)) \
> > > > > > > > > > +                   / sizeof(struct rtrs_sg_desc))
> > > > > > > > > > +
> > > > > > > > > > +/* We guarantee to serve 10 paths at least */
> > > > > > > > > > +#define CHUNK_POOL_SZ 10
> > > > > > > > > > +
> > > > > > > > > > +static struct rtrs_rdma_dev_pd dev_pd;
> > > > > > > > > > +static mempool_t *chunk_pool;
> > > > > > > > > > +struct class *rtrs_dev_class;
> > > > > > > > > > +
> > > > > > > > > > +static int __read_mostly max_chunk_size =
> > > > > DEFAULT_MAX_CHUNK_SIZE;
> > > > > > > > > > +static int __read_mostly sess_queue_depth =
> > > > > DEFAULT_SESS_QUEUE_DEPTH;
> > > > > > > > > > +
> > > > > > > > > > +static bool always_invalidate = true;
> > > > > > > > > > +module_param(always_invalidate, bool, 0444);
> > > > > > > > > > +MODULE_PARM_DESC(always_invalidate,
> > > > > > > > > > +              "Invalidate memory registration for contiguous
> > > > > memory regions before accessing.");
> > > > > > > > > > +
> > > > > > > > > > +module_param_named(max_chunk_size, max_chunk_size, int, 0444);
> > > > > > > > > > +MODULE_PARM_DESC(max_chunk_size,
> > > > > > > > > > +              "Max size for each IO request, when change the
> > > > > unit is in byte (default: "
> > > > > > > > > > +              __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)");
> > > > > > > > > > +
> > > > > > > > > > +module_param_named(sess_queue_depth, sess_queue_depth, int,
> > > > > 0444);
> > > > > > > > > > +MODULE_PARM_DESC(sess_queue_depth,
> > > > > > > > > > +              "Number of buffers for pending I/O requests to
> > > > > allocate per session. Maximum: "
> > > > > > > > > > +              __stringify(MAX_SESS_QUEUE_DEPTH) " (default: "
> > > > > > > > > > +              __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
> > > > > > > > >
> > > > > > > > > We don't like module parameters in the RDMA.
> > > > > > > > Hi Leon,
> > > > > > > >
> > > > > > > > These paramters are affecting resouce usage/performance, I think
> > > > > would
> > > > > > > > be good to have them as module parameters,
> > > > > > > > so admin could choose based their needs.
> > > > > > >
> > > > > > > It is premature optimization before second user comes, also it is
> > > > > > > based on the assumption that everyone uses modules, which is not true.
> > > > > > The idea to have module parameters is to cover more use cases, IMHO.
> > > > > >
> > > > > > Even you builtin the module to the kernel, you can still change the
> > > > > > module parameters
> > > > > > by passing the "moduls_name.paramters" in kernel command line, eg:
> > > > > > kvm.nx_huge_pages=true
> > > > >
> > > > > I know about that, but it doesn't make them helpful.
> > > > >
> > > > > Thanks
> > > > >
> > > > Hi Leon,
> > > >
> > > > Queue_depth and max_chunksize parameters control the tradeoff between
> > > > throuput performance and memory consumption. We do use them to set
> > > > different values for storages equipped with SSDs (fast) and on storages
> > > > equipped with HDDs (slow). The last parameter always_invaldate enforces the
> > > > invalidation of an rdma buffer before its hand over to the block layer. We
> > > > set it to no in our datacenters, since they are closed and malicious
> > > > clients are not a threat in our scenario. In general case it defaults to
> > > > yes, as requested by Jason. Our admins need to have control over those
> > > > control knobs somehow... We could make sysfs entries out of them or
> > > > something, but would it really make sense?
> > >
> > > blk_queue_nonrot() inside your code?
> > It's exported function, and also used by other drivers like
> > md/dm/target core, right?
>
> I have no clue.
After discuss with Danil, I realized, maybe you meant to use
blk_queue_nonrot to report to client,
We already did it in rnbd-srv, we pass the nonrot attribute to
rnbd-clt, so rnbd-clt will know it.
But it's not related to the module paramters here.

Thanks!
diff mbox series

Patch

diff --git a/drivers/infiniband/ulp/rtrs/rtrs-srv.c b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
new file mode 100644
index 000000000000..e60ee6dd675d
--- /dev/null
+++ b/drivers/infiniband/ulp/rtrs/rtrs-srv.c
@@ -0,0 +1,2164 @@ 
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * RDMA Transport Layer
+ *
+ * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
+ * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
+ * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
+ */
+
+#undef pr_fmt
+#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
+
+#include <linux/module.h>
+#include <linux/mempool.h>
+
+#include "rtrs-srv.h"
+#include "rtrs-log.h"
+
+MODULE_DESCRIPTION("RDMA Transport Server");
+MODULE_LICENSE("GPL");
+
+/* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */
+#define DEFAULT_MAX_CHUNK_SIZE (128 << 10)
+#define DEFAULT_SESS_QUEUE_DEPTH 512
+#define MAX_HDR_SIZE PAGE_SIZE
+#define MAX_SG_COUNT ((MAX_HDR_SIZE - sizeof(struct rtrs_msg_rdma_read)) \
+		      / sizeof(struct rtrs_sg_desc))
+
+/* We guarantee to serve 10 paths at least */
+#define CHUNK_POOL_SZ 10
+
+static struct rtrs_rdma_dev_pd dev_pd;
+static mempool_t *chunk_pool;
+struct class *rtrs_dev_class;
+
+static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE;
+static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH;
+
+static bool always_invalidate = true;
+module_param(always_invalidate, bool, 0444);
+MODULE_PARM_DESC(always_invalidate,
+		 "Invalidate memory registration for contiguous memory regions before accessing.");
+
+module_param_named(max_chunk_size, max_chunk_size, int, 0444);
+MODULE_PARM_DESC(max_chunk_size,
+		 "Max size for each IO request, when change the unit is in byte (default: "
+		 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)");
+
+module_param_named(sess_queue_depth, sess_queue_depth, int, 0444);
+MODULE_PARM_DESC(sess_queue_depth,
+		 "Number of buffers for pending I/O requests to allocate per session. Maximum: "
+		 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: "
+		 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
+
+static char cq_affinity_list[256];
+static cpumask_t cq_affinity_mask = { CPU_BITS_ALL };
+
+static void init_cq_affinity(void)
+{
+	sprintf(cq_affinity_list, "0-%d", nr_cpu_ids - 1);
+}
+
+static int cq_affinity_list_set(const char *val, const struct kernel_param *kp)
+{
+	int ret = 0, len = strlen(val);
+	cpumask_var_t new_value;
+
+	init_cq_affinity();
+
+	if (len >= sizeof(cq_affinity_list))
+		return -EINVAL;
+	if (!alloc_cpumask_var(&new_value, GFP_KERNEL))
+		return -ENOMEM;
+
+	ret = cpulist_parse(val, new_value);
+	if (ret) {
+		pr_err("Can't set cq_affinity_list \"%s\": %d\n", val,
+		       ret);
+		goto free_cpumask;
+	}
+
+	strlcpy(cq_affinity_list, val, sizeof(cq_affinity_list));
+	*strchrnul(cq_affinity_list, '\n') = '\0';
+	cpumask_copy(&cq_affinity_mask, new_value);
+
+	pr_info("cq_affinity_list changed to %*pbl\n",
+		cpumask_pr_args(&cq_affinity_mask));
+free_cpumask:
+	free_cpumask_var(new_value);
+	return ret;
+}
+
+static struct kparam_string cq_affinity_list_kparam_str = {
+	.maxlen	= sizeof(cq_affinity_list),
+	.string	= cq_affinity_list
+};
+
+static const struct kernel_param_ops cq_affinity_list_ops = {
+	.set	= cq_affinity_list_set,
+	.get	= param_get_string,
+};
+
+module_param_cb(cq_affinity_list, &cq_affinity_list_ops,
+		&cq_affinity_list_kparam_str, 0644);
+MODULE_PARM_DESC(cq_affinity_list,
+		 "Sets the list of cpus to use as cq vectors. (default: use all possible CPUs)");
+
+static struct workqueue_struct *rtrs_wq;
+
+static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c)
+{
+	return container_of(c, struct rtrs_srv_con, c);
+}
+
+static inline struct rtrs_srv_sess *to_srv_sess(struct rtrs_sess *s)
+{
+	return container_of(s, struct rtrs_srv_sess, s);
+}
+
+static bool __rtrs_srv_change_state(struct rtrs_srv_sess *sess,
+				     enum rtrs_srv_state new_state)
+{
+	enum rtrs_srv_state old_state;
+	bool changed = false;
+
+	lockdep_assert_held(&sess->state_lock);
+	old_state = sess->state;
+	switch (new_state) {
+	case RTRS_SRV_CONNECTED:
+		switch (old_state) {
+		case RTRS_SRV_CONNECTING:
+			changed = true;
+			/* FALLTHRU */
+		default:
+			break;
+		}
+		break;
+	case RTRS_SRV_CLOSING:
+		switch (old_state) {
+		case RTRS_SRV_CONNECTING:
+		case RTRS_SRV_CONNECTED:
+			changed = true;
+			/* FALLTHRU */
+		default:
+			break;
+		}
+		break;
+	case RTRS_SRV_CLOSED:
+		switch (old_state) {
+		case RTRS_SRV_CLOSING:
+			changed = true;
+			/* FALLTHRU */
+		default:
+			break;
+		}
+		break;
+	default:
+		break;
+	}
+	if (changed)
+		sess->state = new_state;
+
+	return changed;
+}
+
+static bool rtrs_srv_change_state_get_old(struct rtrs_srv_sess *sess,
+					   enum rtrs_srv_state new_state,
+					   enum rtrs_srv_state *old_state)
+{
+	bool changed;
+
+	spin_lock_irq(&sess->state_lock);
+	*old_state = sess->state;
+	changed = __rtrs_srv_change_state(sess, new_state);
+	spin_unlock_irq(&sess->state_lock);
+
+	return changed;
+}
+
+static bool rtrs_srv_change_state(struct rtrs_srv_sess *sess,
+				   enum rtrs_srv_state new_state)
+{
+	enum rtrs_srv_state old_state;
+
+	return rtrs_srv_change_state_get_old(sess, new_state, &old_state);
+}
+
+static void free_id(struct rtrs_srv_op *id)
+{
+	if (!id)
+		return;
+	kfree(id->tx_wr);
+	kfree(id->tx_sg);
+	kfree(id);
+}
+
+static void rtrs_srv_free_ops_ids(struct rtrs_srv_sess *sess)
+{
+	struct rtrs_srv *srv = sess->srv;
+	int i;
+
+	WARN_ON(atomic_read(&sess->ids_inflight));
+	if (sess->ops_ids) {
+		for (i = 0; i < srv->queue_depth; i++)
+			free_id(sess->ops_ids[i]);
+		kfree(sess->ops_ids);
+		sess->ops_ids = NULL;
+	}
+}
+
+static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_sess *sess)
+{
+	struct rtrs_srv *srv = sess->srv;
+	struct rtrs_srv_op *id;
+	int i;
+
+	sess->ops_ids = kcalloc(srv->queue_depth, sizeof(*sess->ops_ids),
+				GFP_KERNEL);
+	if (!sess->ops_ids)
+		goto err;
+
+	for (i = 0; i < srv->queue_depth; ++i) {
+		id = kzalloc(sizeof(*id), GFP_KERNEL);
+		if (!id)
+			goto err;
+
+		sess->ops_ids[i] = id;
+		id->tx_wr = kcalloc(MAX_SG_COUNT, sizeof(*id->tx_wr),
+				    GFP_KERNEL);
+		if (!id->tx_wr)
+			goto err;
+
+		id->tx_sg = kcalloc(MAX_SG_COUNT, sizeof(*id->tx_sg),
+				    GFP_KERNEL);
+		if (!id->tx_sg)
+			goto err;
+	}
+	init_waitqueue_head(&sess->ids_waitq);
+	atomic_set(&sess->ids_inflight, 0);
+
+	return 0;
+
+err:
+	rtrs_srv_free_ops_ids(sess);
+	return -ENOMEM;
+}
+
+static void rtrs_srv_get_ops_ids(struct rtrs_srv_sess *sess)
+{
+	atomic_inc(&sess->ids_inflight);
+}
+
+static void rtrs_srv_put_ops_ids(struct rtrs_srv_sess *sess)
+{
+	if (atomic_dec_and_test(&sess->ids_inflight))
+		wake_up(&sess->ids_waitq);
+}
+
+static void rtrs_srv_wait_ops_ids(struct rtrs_srv_sess *sess)
+{
+	wait_event(sess->ids_waitq, !atomic_read(&sess->ids_inflight));
+}
+
+static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc);
+
+static struct ib_cqe io_comp_cqe = {
+	.done = rtrs_srv_rdma_done
+};
+
+static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct rtrs_srv_con *con = cq->cq_context;
+	struct rtrs_sess *s = con->c.sess;
+	struct rtrs_srv_sess *sess = to_srv_sess(s);
+
+	if (unlikely(wc->status != IB_WC_SUCCESS)) {
+		rtrs_err(s, "REG MR failed: %s\n",
+			  ib_wc_status_msg(wc->status));
+		close_sess(sess);
+		return;
+	}
+}
+
+static struct ib_cqe local_reg_cqe = {
+	.done = rtrs_srv_reg_mr_done
+};
+
+static int rdma_write_sg(struct rtrs_srv_op *id)
+{
+	struct rtrs_sess *s = id->con->c.sess;
+	struct rtrs_srv_sess *sess = to_srv_sess(s);
+	dma_addr_t dma_addr = sess->dma_addr[id->msg_id];
+	struct rtrs_srv_mr *srv_mr;
+	struct rtrs_srv *srv = sess->srv;
+	struct ib_send_wr inv_wr, imm_wr;
+	struct ib_rdma_wr *wr = NULL;
+	enum ib_send_flags flags;
+	size_t sg_cnt;
+	int err, i, offset;
+	bool need_inval;
+	u32 rkey = 0;
+	struct ib_reg_wr rwr;
+
+	sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt);
+	need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F;
+	if (unlikely(!sg_cnt))
+		return -EINVAL;
+
+	offset = 0;
+	for (i = 0; i < sg_cnt; i++) {
+		struct ib_sge *list;
+
+		wr		= &id->tx_wr[i];
+		list		= &id->tx_sg[i];
+		list->addr	= dma_addr + offset;
+		list->length	= le32_to_cpu(id->rd_msg->desc[i].len);
+
+		/* WR will fail with length error
+		 * if this is 0
+		 */
+		if (unlikely(list->length == 0)) {
+			rtrs_err(s, "Invalid RDMA-Write sg list length 0\n");
+			return -EINVAL;
+		}
+
+		list->lkey = sess->s.dev->ib_pd->local_dma_lkey;
+		offset += list->length;
+
+		wr->wr.wr_cqe	= &io_comp_cqe;
+		wr->wr.sg_list	= list;
+		wr->wr.num_sge	= 1;
+		wr->remote_addr	= le64_to_cpu(id->rd_msg->desc[i].addr);
+		wr->rkey	= le32_to_cpu(id->rd_msg->desc[i].key);
+		if (rkey == 0)
+			rkey = wr->rkey;
+		else
+			/* Only one key is actually used */
+			WARN_ON_ONCE(rkey != wr->rkey);
+
+		if (i < (sg_cnt - 1))
+			wr->wr.next = &id->tx_wr[i + 1].wr;
+
+		wr->wr.opcode = IB_WR_RDMA_WRITE;
+		wr->wr.ex.imm_data = 0;
+		wr->wr.send_flags  = 0;
+	}
+
+	if (need_inval && always_invalidate) {
+		wr->wr.next = &rwr.wr;
+		rwr.wr.next = &inv_wr;
+		inv_wr.next = &imm_wr;
+	} else if (always_invalidate) {
+		wr->wr.next = &rwr.wr;
+		rwr.wr.next = &imm_wr;
+	} else if (need_inval) {
+		wr->wr.next = &inv_wr;
+		inv_wr.next = &imm_wr;
+	} else {
+		wr->wr.next = &imm_wr;
+	}
+	/*
+	 * From time to time we have to post signaled sends,
+	 * or send queue will fill up and only QP reset can help.
+	 */
+	flags = atomic_inc_return(&id->con->wr_cnt) % srv->queue_depth ?
+			0 : IB_SEND_SIGNALED;
+
+	if (need_inval) {
+		inv_wr.wr_cqe = &io_comp_cqe;
+		inv_wr.sg_list = NULL;
+		inv_wr.num_sge = 0;
+		inv_wr.opcode = IB_WR_SEND_WITH_INV;
+		inv_wr.send_flags = 0;
+		inv_wr.ex.invalidate_rkey = rkey;
+	}
+
+	imm_wr.next = NULL;
+	imm_wr.wr_cqe = &io_comp_cqe;
+	if (always_invalidate) {
+		struct ib_sge list;
+		struct rtrs_msg_rkey_rsp *msg;
+
+		srv_mr = &sess->mrs[id->msg_id];
+		rwr.wr.opcode = IB_WR_REG_MR;
+		rwr.wr.wr_cqe = &local_reg_cqe;
+		rwr.wr.num_sge = 0;
+		rwr.mr = srv_mr->mr;
+		rwr.wr.send_flags = 0;
+		rwr.key = srv_mr->mr->rkey;
+		rwr.access = (IB_ACCESS_LOCAL_WRITE |
+			      IB_ACCESS_REMOTE_WRITE);
+		msg = srv_mr->iu->buf;
+		msg->buf_id = cpu_to_le16(id->msg_id);
+		msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
+		msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
+
+		list.addr   = srv_mr->iu->dma_addr;
+		list.length = sizeof(*msg);
+		list.lkey   = sess->s.dev->ib_pd->local_dma_lkey;
+		imm_wr.sg_list = &list;
+		imm_wr.num_sge = 1;
+		imm_wr.opcode = IB_WR_SEND_WITH_IMM;
+		ib_dma_sync_single_for_device(sess->s.dev->ib_dev,
+					      srv_mr->iu->dma_addr,
+					      srv_mr->iu->size, DMA_TO_DEVICE);
+	} else {
+		imm_wr.sg_list = NULL;
+		imm_wr.num_sge = 0;
+		imm_wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
+	}
+	imm_wr.send_flags = flags;
+	imm_wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id,
+							     0, need_inval));
+
+	ib_dma_sync_single_for_device(sess->s.dev->ib_dev, dma_addr,
+				      offset, DMA_BIDIRECTIONAL);
+
+	err = ib_post_send(id->con->c.qp, &id->tx_wr[0].wr, NULL);
+	if (unlikely(err))
+		rtrs_err(s,
+			  "Posting RDMA-Write-Request to QP failed, err: %d\n",
+			  err);
+
+	return err;
+}
+
+/**
+ * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE
+ *                      requests or on successful WRITE request.
+ * @con:	the connection to send back result
+ * @id:		the id associated with the IO
+ * @errno:	the error number of the IO.
+ *
+ * Return 0 on success, errno otherwise.
+ */
+static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id,
+			    int errno)
+{
+	struct rtrs_sess *s = con->c.sess;
+	struct rtrs_srv_sess *sess = to_srv_sess(s);
+	struct ib_send_wr inv_wr, imm_wr, *wr = NULL;
+	struct ib_reg_wr rwr;
+	struct rtrs_srv *srv = sess->srv;
+	struct rtrs_srv_mr *srv_mr;
+	bool need_inval = false;
+	enum ib_send_flags flags;
+	u32 imm;
+	int err;
+
+	if (id->dir == READ) {
+		struct rtrs_msg_rdma_read *rd_msg = id->rd_msg;
+		size_t sg_cnt;
+
+		need_inval = le16_to_cpu(rd_msg->flags) &
+				RTRS_MSG_NEED_INVAL_F;
+		sg_cnt = le16_to_cpu(rd_msg->sg_cnt);
+
+		if (need_inval) {
+			if (likely(sg_cnt)) {
+				inv_wr.wr_cqe = &io_comp_cqe;
+				inv_wr.sg_list = NULL;
+				inv_wr.num_sge = 0;
+				inv_wr.opcode = IB_WR_SEND_WITH_INV;
+				inv_wr.send_flags = 0;
+				/* Only one key is actually used */
+				inv_wr.ex.invalidate_rkey =
+					le32_to_cpu(rd_msg->desc[0].key);
+			} else {
+				WARN_ON_ONCE(1);
+				need_inval = false;
+			}
+		}
+	}
+
+	if (need_inval && always_invalidate) {
+		wr = &inv_wr;
+		inv_wr.next = &rwr.wr;
+		rwr.wr.next = &imm_wr;
+	} else if (always_invalidate) {
+		wr = &rwr.wr;
+		rwr.wr.next = &imm_wr;
+	} else if (need_inval) {
+		wr = &inv_wr;
+		inv_wr.next = &imm_wr;
+	} else {
+		wr = &imm_wr;
+	}
+	/*
+	 * From time to time we have to post signalled sends,
+	 * or send queue will fill up and only QP reset can help.
+	 */
+	flags = atomic_inc_return(&con->wr_cnt) % srv->queue_depth ?
+			0 : IB_SEND_SIGNALED;
+	imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval);
+	imm_wr.next = NULL;
+	imm_wr.wr_cqe = &io_comp_cqe;
+	if (always_invalidate) {
+		struct ib_sge list;
+		struct rtrs_msg_rkey_rsp *msg;
+
+		srv_mr = &sess->mrs[id->msg_id];
+		rwr.wr.next = &imm_wr;
+		rwr.wr.opcode = IB_WR_REG_MR;
+		rwr.wr.wr_cqe = &local_reg_cqe;
+		rwr.wr.num_sge = 0;
+		rwr.wr.send_flags = 0;
+		rwr.mr = srv_mr->mr;
+		rwr.key = srv_mr->mr->rkey;
+		rwr.access = (IB_ACCESS_LOCAL_WRITE |
+			      IB_ACCESS_REMOTE_WRITE);
+		msg = srv_mr->iu->buf;
+		msg->buf_id = cpu_to_le16(id->msg_id);
+		msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
+		msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
+
+		list.addr   = srv_mr->iu->dma_addr;
+		list.length = sizeof(*msg);
+		list.lkey   = sess->s.dev->ib_pd->local_dma_lkey;
+		imm_wr.sg_list = &list;
+		imm_wr.num_sge = 1;
+		imm_wr.opcode = IB_WR_SEND_WITH_IMM;
+		ib_dma_sync_single_for_device(sess->s.dev->ib_dev,
+					      srv_mr->iu->dma_addr,
+					      srv_mr->iu->size, DMA_TO_DEVICE);
+	} else {
+		imm_wr.sg_list = NULL;
+		imm_wr.num_sge = 0;
+		imm_wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
+	}
+	imm_wr.send_flags = flags;
+	imm_wr.ex.imm_data = cpu_to_be32(imm);
+
+	err = ib_post_send(id->con->c.qp, wr, NULL);
+	if (unlikely(err))
+		rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n",
+			     err);
+
+	return err;
+}
+
+void close_sess(struct rtrs_srv_sess *sess)
+{
+	enum rtrs_srv_state old_state;
+
+	if (rtrs_srv_change_state_get_old(sess, RTRS_SRV_CLOSING,
+					   &old_state))
+		queue_work(rtrs_wq, &sess->close_work);
+	WARN_ON(sess->state != RTRS_SRV_CLOSING);
+}
+
+static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state)
+{
+	switch (state) {
+	case RTRS_SRV_CONNECTING:
+		return "RTRS_SRV_CONNECTING";
+	case RTRS_SRV_CONNECTED:
+		return "RTRS_SRV_CONNECTED";
+	case RTRS_SRV_CLOSING:
+		return "RTRS_SRV_CLOSING";
+	case RTRS_SRV_CLOSED:
+		return "RTRS_SRV_CLOSED";
+	default:
+		return "UNKNOWN";
+	}
+}
+
+/*
+ * rtrs_srv_resp_rdma() - sends response to the client.
+ *
+ * Context: any
+ */
+void rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status)
+{
+	struct rtrs_srv_con *con = id->con;
+	struct rtrs_sess *s = con->c.sess;
+	struct rtrs_srv_sess *sess = to_srv_sess(s);
+	int err;
+
+	if (WARN_ON(!id))
+		return;
+
+	if (unlikely(sess->state != RTRS_SRV_CONNECTED)) {
+		rtrs_err_rl(s,
+			     "Sending I/O response failed,  session is disconnected, sess state %s\n",
+			     rtrs_srv_state_str(sess->state));
+		goto out;
+	}
+	if (always_invalidate) {
+		struct rtrs_srv_mr *mr = &sess->mrs[id->msg_id];
+
+		ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey));
+	}
+	if (status || id->dir == WRITE || !id->rd_msg->sg_cnt)
+		err = send_io_resp_imm(con, id, status);
+	else
+		err = rdma_write_sg(id);
+	if (unlikely(err)) {
+		rtrs_err_rl(s, "IO response failed: %d\n", err);
+		close_sess(sess);
+	}
+out:
+	rtrs_srv_put_ops_ids(sess);
+}
+EXPORT_SYMBOL(rtrs_srv_resp_rdma);
+
+void rtrs_srv_set_sess_priv(struct rtrs_srv *srv, void *priv)
+{
+	srv->priv = priv;
+}
+EXPORT_SYMBOL(rtrs_srv_set_sess_priv);
+
+static void unmap_cont_bufs(struct rtrs_srv_sess *sess)
+{
+	int i;
+
+	for (i = 0; i < sess->mrs_num; i++) {
+		struct rtrs_srv_mr *srv_mr;
+
+		srv_mr = &sess->mrs[i];
+		rtrs_iu_free(srv_mr->iu, DMA_TO_DEVICE,
+			      sess->s.dev->ib_dev, 1);
+		ib_dereg_mr(srv_mr->mr);
+		ib_dma_unmap_sg(sess->s.dev->ib_dev, srv_mr->sgt.sgl,
+				srv_mr->sgt.nents, DMA_BIDIRECTIONAL);
+		sg_free_table(&srv_mr->sgt);
+	}
+	kfree(sess->mrs);
+}
+
+static int map_cont_bufs(struct rtrs_srv_sess *sess)
+{
+	struct rtrs_srv *srv = sess->srv;
+	struct rtrs_sess *ss = &sess->s;
+	int i, mri, err, mrs_num;
+	unsigned int chunk_bits;
+	int chunks_per_mr = 1;
+
+	/*
+	 * Here we map queue_depth chunks to MR.  Firstly we have to
+	 * figure out how many chunks can we map per MR.
+	 */
+	if (always_invalidate) {
+		/*
+		 * in order to do invalidate for each chunks of memory, we needs
+		 * more memory regions.
+		 */
+		mrs_num = srv->queue_depth;
+	} else {
+		chunks_per_mr =
+			sess->s.dev->ib_dev->attrs.max_fast_reg_page_list_len;
+		mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr);
+		chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num);
+	}
+
+	sess->mrs = kcalloc(mrs_num, sizeof(*sess->mrs), GFP_KERNEL);
+	if (!sess->mrs)
+		return -ENOMEM;
+
+	sess->mrs_num = mrs_num;
+
+	for (mri = 0; mri < mrs_num; mri++) {
+		struct rtrs_srv_mr *srv_mr = &sess->mrs[mri];
+		struct sg_table *sgt = &srv_mr->sgt;
+		struct scatterlist *s;
+		struct ib_mr *mr;
+		int nr, chunks;
+		struct rtrs_msg_rkey_rsp *rsp;
+
+		chunks = chunks_per_mr * mri;
+		if (!always_invalidate)
+			chunks_per_mr = min_t(int, chunks_per_mr,
+					      srv->queue_depth - chunks);
+
+		err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL);
+		if (err)
+			goto err;
+
+		for_each_sg(sgt->sgl, s, chunks_per_mr, i)
+			sg_set_page(s, srv->chunks[chunks + i],
+				    max_chunk_size, 0);
+
+		nr = ib_dma_map_sg(sess->s.dev->ib_dev, sgt->sgl,
+				   sgt->nents, DMA_BIDIRECTIONAL);
+		if (nr < sgt->nents) {
+			err = nr < 0 ? nr : -EINVAL;
+			goto free_sg;
+		}
+		mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG,
+				 sgt->nents);
+		if (IS_ERR(mr)) {
+			err = PTR_ERR(mr);
+			goto unmap_sg;
+		}
+		nr = ib_map_mr_sg(mr, sgt->sgl, sgt->nents,
+				  NULL, max_chunk_size);
+		if (nr < sgt->nents) {
+			err = nr < 0 ? nr : -EINVAL;
+			goto dereg_mr;
+		}
+
+		if (always_invalidate) {
+			srv_mr->iu = rtrs_iu_alloc(1, sizeof(*rsp), GFP_KERNEL,
+						    sess->s.dev->ib_dev,
+						    DMA_TO_DEVICE,
+						    rtrs_srv_rdma_done);
+			if (!srv_mr->iu) {
+				rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n",
+					  -ENOMEM);
+				goto free_iu;
+			}
+		}
+		/* Eventually dma addr for each chunk can be cached */
+		for_each_sg(sgt->sgl, s, sgt->orig_nents, i)
+			sess->dma_addr[chunks + i] = sg_dma_address(s);
+
+		ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey));
+		srv_mr->mr = mr;
+
+		continue;
+err:
+		while (mri--) {
+			srv_mr = &sess->mrs[mri];
+			sgt = &srv_mr->sgt;
+			mr = srv_mr->mr;
+free_iu:
+			rtrs_iu_free(srv_mr->iu, DMA_TO_DEVICE,
+				      sess->s.dev->ib_dev, 1);
+dereg_mr:
+			ib_dereg_mr(mr);
+unmap_sg:
+			ib_dma_unmap_sg(sess->s.dev->ib_dev, sgt->sgl,
+					sgt->nents, DMA_BIDIRECTIONAL);
+free_sg:
+			sg_free_table(sgt);
+		}
+		kfree(sess->mrs);
+
+		return err;
+	}
+
+	chunk_bits = ilog2(srv->queue_depth - 1) + 1;
+	sess->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits);
+
+	return 0;
+}
+
+static void rtrs_srv_hb_err_handler(struct rtrs_con *c)
+{
+	close_sess(to_srv_sess(c->sess));
+}
+
+static void rtrs_srv_init_hb(struct rtrs_srv_sess *sess)
+{
+	rtrs_init_hb(&sess->s, &io_comp_cqe,
+		      RTRS_HB_INTERVAL_MS,
+		      RTRS_HB_MISSED_MAX,
+		      rtrs_srv_hb_err_handler,
+		      rtrs_wq);
+}
+
+static void rtrs_srv_start_hb(struct rtrs_srv_sess *sess)
+{
+	rtrs_start_hb(&sess->s);
+}
+
+static void rtrs_srv_stop_hb(struct rtrs_srv_sess *sess)
+{
+	rtrs_stop_hb(&sess->s);
+}
+
+static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct rtrs_srv_con *con = cq->cq_context;
+	struct rtrs_sess *s = con->c.sess;
+	struct rtrs_srv_sess *sess = to_srv_sess(s);
+	struct rtrs_iu *iu;
+
+	iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
+	rtrs_iu_free(iu, DMA_TO_DEVICE, sess->s.dev->ib_dev, 1);
+
+	if (unlikely(wc->status != IB_WC_SUCCESS)) {
+		rtrs_err(s, "Sess info response send failed: %s\n",
+			  ib_wc_status_msg(wc->status));
+		close_sess(sess);
+		return;
+	}
+	WARN_ON(wc->opcode != IB_WC_SEND);
+}
+
+static void rtrs_srv_sess_up(struct rtrs_srv_sess *sess)
+{
+	struct rtrs_srv *srv = sess->srv;
+	struct rtrs_srv_ctx *ctx = srv->ctx;
+	int up;
+
+	mutex_lock(&srv->paths_ev_mutex);
+	up = ++srv->paths_up;
+	if (up == 1)
+		ctx->link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL);
+	mutex_unlock(&srv->paths_ev_mutex);
+
+	/* Mark session as established */
+	sess->established = true;
+}
+
+static void rtrs_srv_sess_down(struct rtrs_srv_sess *sess)
+{
+	struct rtrs_srv *srv = sess->srv;
+	struct rtrs_srv_ctx *ctx = srv->ctx;
+
+	if (!sess->established)
+		return;
+
+	sess->established = false;
+	mutex_lock(&srv->paths_ev_mutex);
+	WARN_ON(!srv->paths_up);
+	if (--srv->paths_up == 0)
+		ctx->link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv);
+	mutex_unlock(&srv->paths_ev_mutex);
+}
+
+static int post_recv_sess(struct rtrs_srv_sess *sess);
+
+static int process_info_req(struct rtrs_srv_con *con,
+			    struct rtrs_msg_info_req *msg)
+{
+	struct rtrs_sess *s = con->c.sess;
+	struct rtrs_srv_sess *sess = to_srv_sess(s);
+	struct ib_send_wr *reg_wr = NULL;
+	struct rtrs_msg_info_rsp *rsp;
+	struct rtrs_iu *tx_iu;
+	struct ib_reg_wr *rwr;
+	int mri, err;
+	size_t tx_sz;
+
+	err = post_recv_sess(sess);
+	if (unlikely(err)) {
+		rtrs_err(s, "post_recv_sess(), err: %d\n", err);
+		return err;
+	}
+	rwr = kcalloc(sess->mrs_num, sizeof(*rwr), GFP_KERNEL);
+	if (unlikely(!rwr)) {
+		rtrs_err(s, "No memory\n");
+		return -ENOMEM;
+	}
+	memcpy(sess->s.sessname, msg->sessname, sizeof(sess->s.sessname));
+
+	tx_sz  = sizeof(*rsp);
+	tx_sz += sizeof(rsp->desc[0]) * sess->mrs_num;
+	tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, sess->s.dev->ib_dev,
+			       DMA_TO_DEVICE, rtrs_srv_info_rsp_done);
+	if (unlikely(!tx_iu)) {
+		rtrs_err(s, "rtrs_iu_alloc(), err: %d\n", -ENOMEM);
+		err = -ENOMEM;
+		goto rwr_free;
+	}
+
+	rsp = tx_iu->buf;
+	rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP);
+	rsp->sg_cnt = cpu_to_le16(sess->mrs_num);
+
+	for (mri = 0; mri < sess->mrs_num; mri++) {
+		struct ib_mr *mr = sess->mrs[mri].mr;
+
+		rsp->desc[mri].addr = cpu_to_le64(mr->iova);
+		rsp->desc[mri].key  = cpu_to_le32(mr->rkey);
+		rsp->desc[mri].len  = cpu_to_le32(mr->length);
+
+		/*
+		 * Fill in reg MR request and chain them *backwards*
+		 */
+		rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL;
+		rwr[mri].wr.opcode = IB_WR_REG_MR;
+		rwr[mri].wr.wr_cqe = &local_reg_cqe;
+		rwr[mri].wr.num_sge = 0;
+		rwr[mri].wr.send_flags = mri ? 0 : IB_SEND_SIGNALED;
+		rwr[mri].mr = mr;
+		rwr[mri].key = mr->rkey;
+		rwr[mri].access = (IB_ACCESS_LOCAL_WRITE |
+				   IB_ACCESS_REMOTE_WRITE);
+		reg_wr = &rwr[mri].wr;
+	}
+
+	err = rtrs_srv_create_sess_files(sess);
+	if (unlikely(err))
+		goto iu_free;
+	get_device(&sess->srv->dev);
+	rtrs_srv_change_state(sess, RTRS_SRV_CONNECTED);
+	rtrs_srv_start_hb(sess);
+
+	/*
+	 * We do not account number of established connections at the current
+	 * moment, we rely on the client, which should send info request when
+	 * all connections are successfully established.  Thus, simply notify
+	 * listener with a proper event if we are the first path.
+	 */
+	rtrs_srv_sess_up(sess);
+
+	ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr,
+				      tx_iu->size, DMA_TO_DEVICE);
+
+	/* Send info response */
+	err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr);
+	if (unlikely(err)) {
+		rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err);
+iu_free:
+		rtrs_iu_free(tx_iu, DMA_TO_DEVICE, sess->s.dev->ib_dev, 1);
+	}
+rwr_free:
+	kfree(rwr);
+
+	return err;
+}
+
+static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct rtrs_srv_con *con = cq->cq_context;
+	struct rtrs_sess *s = con->c.sess;
+	struct rtrs_srv_sess *sess = to_srv_sess(s);
+	struct rtrs_msg_info_req *msg;
+	struct rtrs_iu *iu;
+	int err;
+
+	WARN_ON(con->c.cid);
+
+	iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
+	if (unlikely(wc->status != IB_WC_SUCCESS)) {
+		rtrs_err(s, "Sess info request receive failed: %s\n",
+			  ib_wc_status_msg(wc->status));
+		goto close;
+	}
+	WARN_ON(wc->opcode != IB_WC_RECV);
+
+	if (unlikely(wc->byte_len < sizeof(*msg))) {
+		rtrs_err(s, "Sess info request is malformed: size %d\n",
+			  wc->byte_len);
+		goto close;
+	}
+	ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr,
+				   iu->size, DMA_FROM_DEVICE);
+	msg = iu->buf;
+	if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ)) {
+		rtrs_err(s, "Sess info request is malformed: type %d\n",
+			  le16_to_cpu(msg->type));
+		goto close;
+	}
+	err = process_info_req(con, msg);
+	if (unlikely(err))
+		goto close;
+
+out:
+	rtrs_iu_free(iu, DMA_FROM_DEVICE, sess->s.dev->ib_dev, 1);
+	return;
+close:
+	close_sess(sess);
+	goto out;
+}
+
+static int post_recv_info_req(struct rtrs_srv_con *con)
+{
+	struct rtrs_sess *s = con->c.sess;
+	struct rtrs_srv_sess *sess = to_srv_sess(s);
+	struct rtrs_iu *rx_iu;
+	int err;
+
+	rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req),
+			       GFP_KERNEL, sess->s.dev->ib_dev,
+			       DMA_FROM_DEVICE, rtrs_srv_info_req_done);
+	if (unlikely(!rx_iu)) {
+		rtrs_err(s, "rtrs_iu_alloc(): no memory\n");
+		return -ENOMEM;
+	}
+	/* Prepare for getting info response */
+	err = rtrs_iu_post_recv(&con->c, rx_iu);
+	if (unlikely(err)) {
+		rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err);
+		rtrs_iu_free(rx_iu, DMA_FROM_DEVICE, sess->s.dev->ib_dev, 1);
+		return err;
+	}
+
+	return 0;
+}
+
+static int post_recv_io(struct rtrs_srv_con *con, size_t q_size)
+{
+	int i, err;
+
+	for (i = 0; i < q_size; i++) {
+		err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
+		if (unlikely(err))
+			return err;
+	}
+
+	return 0;
+}
+
+static int post_recv_sess(struct rtrs_srv_sess *sess)
+{
+	struct rtrs_srv *srv = sess->srv;
+	struct rtrs_sess *s = &sess->s;
+	size_t q_size;
+	int err, cid;
+
+	for (cid = 0; cid < sess->s.con_num; cid++) {
+		if (cid == 0)
+			q_size = SERVICE_CON_QUEUE_DEPTH;
+		else
+			q_size = srv->queue_depth;
+
+		err = post_recv_io(to_srv_con(sess->s.con[cid]), q_size);
+		if (unlikely(err)) {
+			rtrs_err(s, "post_recv_io(), err: %d\n", err);
+			return err;
+		}
+	}
+
+	return 0;
+}
+
+static void process_read(struct rtrs_srv_con *con,
+			 struct rtrs_msg_rdma_read *msg,
+			 u32 buf_id, u32 off)
+{
+	struct rtrs_sess *s = con->c.sess;
+	struct rtrs_srv_sess *sess = to_srv_sess(s);
+	struct rtrs_srv *srv = sess->srv;
+	struct rtrs_srv_ctx *ctx = srv->ctx;
+	struct rtrs_srv_op *id;
+
+	size_t usr_len, data_len;
+	void *data;
+	int ret;
+
+	if (unlikely(sess->state != RTRS_SRV_CONNECTED)) {
+		rtrs_err_rl(s,
+			     "Processing read request failed,  session is disconnected, sess state %s\n",
+			     rtrs_srv_state_str(sess->state));
+		return;
+	}
+	rtrs_srv_get_ops_ids(sess);
+	rtrs_srv_update_rdma_stats(&sess->stats, off, READ);
+	id = sess->ops_ids[buf_id];
+	id->con		= con;
+	id->dir		= READ;
+	id->msg_id	= buf_id;
+	id->rd_msg	= msg;
+	usr_len = le16_to_cpu(msg->usr_len);
+	data_len = off - usr_len;
+	data = page_address(srv->chunks[buf_id]);
+	ret = ctx->rdma_ev(srv, srv->priv, id, READ, data, data_len,
+			   data + data_len, usr_len);
+
+	if (unlikely(ret)) {
+		rtrs_err_rl(s,
+			     "Processing read request failed, user module cb reported for msg_id %d, err: %d\n",
+			     buf_id, ret);
+		goto send_err_msg;
+	}
+
+	return;
+
+send_err_msg:
+	ret = send_io_resp_imm(con, id, ret);
+	if (ret < 0) {
+		rtrs_err_rl(s,
+			     "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n",
+			     buf_id, ret);
+		close_sess(sess);
+	}
+	rtrs_srv_put_ops_ids(sess);
+}
+
+static void process_write(struct rtrs_srv_con *con,
+			  struct rtrs_msg_rdma_write *req,
+			  u32 buf_id, u32 off)
+{
+	struct rtrs_sess *s = con->c.sess;
+	struct rtrs_srv_sess *sess = to_srv_sess(s);
+	struct rtrs_srv *srv = sess->srv;
+	struct rtrs_srv_ctx *ctx = srv->ctx;
+	struct rtrs_srv_op *id;
+
+	size_t data_len, usr_len;
+	void *data;
+	int ret;
+
+	if (unlikely(sess->state != RTRS_SRV_CONNECTED)) {
+		rtrs_err_rl(s,
+			     "Processing write request failed,  session is disconnected, sess state %s\n",
+			     rtrs_srv_state_str(sess->state));
+		return;
+	}
+	rtrs_srv_get_ops_ids(sess);
+	rtrs_srv_update_rdma_stats(&sess->stats, off, WRITE);
+	id = sess->ops_ids[buf_id];
+	id->con    = con;
+	id->dir    = WRITE;
+	id->msg_id = buf_id;
+
+	usr_len = le16_to_cpu(req->usr_len);
+	data_len = off - usr_len;
+	data = page_address(srv->chunks[buf_id]);
+	ret = ctx->rdma_ev(srv, srv->priv, id, WRITE, data, data_len,
+			   data + data_len, usr_len);
+	if (unlikely(ret)) {
+		rtrs_err_rl(s,
+			     "Processing write request failed, user module callback reports err: %d\n",
+			     ret);
+		goto send_err_msg;
+	}
+
+	return;
+
+send_err_msg:
+	ret = send_io_resp_imm(con, id, ret);
+	if (ret < 0) {
+		rtrs_err_rl(s,
+			     "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n",
+			     buf_id, ret);
+		close_sess(sess);
+	}
+	rtrs_srv_put_ops_ids(sess);
+}
+
+static void process_io_req(struct rtrs_srv_con *con, void *msg,
+			   u32 id, u32 off)
+{
+	struct rtrs_sess *s = con->c.sess;
+	struct rtrs_srv_sess *sess = to_srv_sess(s);
+	struct rtrs_msg_rdma_hdr *hdr;
+	unsigned int type;
+
+	ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, sess->dma_addr[id],
+				   max_chunk_size, DMA_BIDIRECTIONAL);
+	hdr = msg;
+	type = le16_to_cpu(hdr->type);
+
+	switch (type) {
+	case RTRS_MSG_WRITE:
+		process_write(con, msg, id, off);
+		break;
+	case RTRS_MSG_READ:
+		process_read(con, msg, id, off);
+		break;
+	default:
+		rtrs_err(s,
+			  "Processing I/O request failed, unknown message type received: 0x%02x\n",
+			  type);
+		goto err;
+	}
+
+	return;
+
+err:
+	close_sess(sess);
+}
+
+static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct rtrs_srv_mr *mr =
+		container_of(wc->wr_cqe, typeof(*mr), inv_cqe);
+	struct rtrs_srv_con *con = cq->cq_context;
+	struct rtrs_sess *s = con->c.sess;
+	struct rtrs_srv_sess *sess = to_srv_sess(s);
+	struct rtrs_srv *srv = sess->srv;
+	u32 msg_id, off;
+	void *data;
+
+	if (unlikely(wc->status != IB_WC_SUCCESS)) {
+		rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n",
+			  ib_wc_status_msg(wc->status));
+		close_sess(sess);
+	}
+	msg_id = mr->msg_id;
+	off = mr->msg_off;
+	data = page_address(srv->chunks[msg_id]) + off;
+	process_io_req(con, data, msg_id, off);
+}
+
+static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con,
+			      struct rtrs_srv_mr *mr)
+{
+	struct ib_send_wr wr = {
+		.opcode		    = IB_WR_LOCAL_INV,
+		.wr_cqe		    = &mr->inv_cqe,
+		.next		    = NULL,
+		.num_sge	    = 0,
+		.send_flags	    = IB_SEND_SIGNALED,
+		.ex.invalidate_rkey = mr->mr->rkey,
+	};
+	mr->inv_cqe.done = rtrs_srv_inv_rkey_done;
+
+	return ib_post_send(con->c.qp, &wr, NULL);
+}
+
+static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct rtrs_srv_con *con = cq->cq_context;
+	struct rtrs_sess *s = con->c.sess;
+	struct rtrs_srv_sess *sess = to_srv_sess(s);
+	struct rtrs_srv *srv = sess->srv;
+	u32 imm_type, imm_payload;
+	int err;
+
+	if (unlikely(wc->status != IB_WC_SUCCESS)) {
+		if (wc->status != IB_WC_WR_FLUSH_ERR) {
+			rtrs_err(s,
+				  "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n",
+				  ib_wc_status_msg(wc->status), wc->wr_cqe,
+				  wc->opcode, wc->vendor_err, wc->byte_len);
+			close_sess(sess);
+		}
+		return;
+	}
+
+	switch (wc->opcode) {
+	case IB_WC_RECV_RDMA_WITH_IMM:
+		/*
+		 * post_recv() RDMA write completions of IO reqs (read/write)
+		 * and hb
+		 */
+		if (WARN_ON(wc->wr_cqe != &io_comp_cqe))
+			return;
+		err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
+		if (unlikely(err)) {
+			rtrs_err(s, "rtrs_post_recv(), err: %d\n", err);
+			close_sess(sess);
+			break;
+		}
+		rtrs_from_imm(be32_to_cpu(wc->ex.imm_data),
+			       &imm_type, &imm_payload);
+		if (likely(imm_type == RTRS_IO_REQ_IMM)) {
+			u32 msg_id, off;
+			void *data;
+
+			msg_id = imm_payload >> sess->mem_bits;
+			off = imm_payload & ((1 << sess->mem_bits) - 1);
+			if (unlikely(msg_id > srv->queue_depth ||
+				     off > max_chunk_size)) {
+				rtrs_err(s, "Wrong msg_id %u, off %u\n",
+					  msg_id, off);
+				close_sess(sess);
+				return;
+			}
+			if (always_invalidate) {
+				struct rtrs_srv_mr *mr = &sess->mrs[msg_id];
+
+				mr->msg_off = off;
+				mr->msg_id = msg_id;
+				err = rtrs_srv_inv_rkey(con, mr);
+				if (unlikely(err)) {
+					rtrs_err(s, "rtrs_post_recv(), err: %d\n",
+						  err);
+					close_sess(sess);
+					break;
+				}
+			} else {
+				data = page_address(srv->chunks[msg_id]) + off;
+				process_io_req(con, data, msg_id, off);
+			}
+		} else if (imm_type == RTRS_HB_MSG_IMM) {
+			WARN_ON(con->c.cid);
+			rtrs_send_hb_ack(&sess->s);
+		} else if (imm_type == RTRS_HB_ACK_IMM) {
+			WARN_ON(con->c.cid);
+			sess->s.hb_missed_cnt = 0;
+		} else {
+			rtrs_wrn(s, "Unknown IMM type %u\n", imm_type);
+		}
+		break;
+	case IB_WC_RDMA_WRITE:
+	case IB_WC_SEND:
+		/*
+		 * post_send() RDMA write completions of IO reqs (read/write)
+		 * and hb
+		 */
+		break;
+	default:
+		rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode);
+		return;
+	}
+}
+
+int rtrs_srv_get_sess_name(struct rtrs_srv *srv, char *sessname, size_t len)
+{
+	struct rtrs_srv_sess *sess;
+	int err = -ENOTCONN;
+
+	mutex_lock(&srv->paths_mutex);
+	list_for_each_entry(sess, &srv->paths_list, s.entry) {
+		if (sess->state != RTRS_SRV_CONNECTED)
+			continue;
+		memcpy(sessname, sess->s.sessname,
+		       min_t(size_t, sizeof(sess->s.sessname), len));
+		err = 0;
+		break;
+	}
+	mutex_unlock(&srv->paths_mutex);
+
+	return err;
+}
+EXPORT_SYMBOL(rtrs_srv_get_sess_name);
+
+int rtrs_srv_get_queue_depth(struct rtrs_srv *srv)
+{
+	return srv->queue_depth;
+}
+EXPORT_SYMBOL(rtrs_srv_get_queue_depth);
+
+static int find_next_bit_ring(struct rtrs_srv_sess *sess)
+{
+	struct ib_device *ib_dev = sess->s.dev->ib_dev;
+	int v;
+
+	v = cpumask_next(sess->cur_cq_vector, &cq_affinity_mask);
+	if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors)
+		v = cpumask_first(&cq_affinity_mask);
+	return v;
+}
+
+static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_sess *sess)
+{
+	sess->cur_cq_vector = find_next_bit_ring(sess);
+
+	return sess->cur_cq_vector;
+}
+
+static struct rtrs_srv *__alloc_srv(struct rtrs_srv_ctx *ctx,
+				     const uuid_t *paths_uuid)
+{
+	struct rtrs_srv *srv;
+	int i;
+
+	srv = kzalloc(sizeof(*srv), GFP_KERNEL);
+	if  (!srv)
+		return NULL;
+
+	refcount_set(&srv->refcount, 1);
+	INIT_LIST_HEAD(&srv->paths_list);
+	mutex_init(&srv->paths_mutex);
+	mutex_init(&srv->paths_ev_mutex);
+	uuid_copy(&srv->paths_uuid, paths_uuid);
+	srv->queue_depth = sess_queue_depth;
+	srv->ctx = ctx;
+
+	srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks),
+			      GFP_KERNEL);
+	if (!srv->chunks)
+		goto err_free_srv;
+
+	for (i = 0; i < srv->queue_depth; i++) {
+		srv->chunks[i] = mempool_alloc(chunk_pool, GFP_KERNEL);
+		if (!srv->chunks[i]) {
+			pr_err("mempool_alloc() failed\n");
+			goto err_free_chunks;
+		}
+	}
+	list_add(&srv->ctx_list, &ctx->srv_list);
+
+	return srv;
+
+err_free_chunks:
+	while (i--)
+		mempool_free(srv->chunks[i], chunk_pool);
+	kfree(srv->chunks);
+
+err_free_srv:
+	kfree(srv);
+
+	return NULL;
+}
+
+static void free_srv(struct rtrs_srv *srv)
+{
+	int i;
+
+	WARN_ON(refcount_read(&srv->refcount));
+	for (i = 0; i < srv->queue_depth; i++)
+		mempool_free(srv->chunks[i], chunk_pool);
+	kfree(srv->chunks);
+	/* last put to release the srv structure */
+	put_device(&srv->dev);
+}
+
+static inline struct rtrs_srv *__find_srv_and_get(struct rtrs_srv_ctx *ctx,
+						   const uuid_t *paths_uuid)
+{
+	struct rtrs_srv *srv;
+
+	list_for_each_entry(srv, &ctx->srv_list, ctx_list) {
+		if (uuid_equal(&srv->paths_uuid, paths_uuid) &&
+		    refcount_inc_not_zero(&srv->refcount))
+			return srv;
+	}
+
+	return NULL;
+}
+
+static struct rtrs_srv *get_or_create_srv(struct rtrs_srv_ctx *ctx,
+					   const uuid_t *paths_uuid)
+{
+	struct rtrs_srv *srv;
+
+	mutex_lock(&ctx->srv_mutex);
+	srv = __find_srv_and_get(ctx, paths_uuid);
+	if (!srv)
+		srv = __alloc_srv(ctx, paths_uuid);
+	mutex_unlock(&ctx->srv_mutex);
+
+	return srv;
+}
+
+static void put_srv(struct rtrs_srv *srv)
+{
+	if (refcount_dec_and_test(&srv->refcount)) {
+		struct rtrs_srv_ctx *ctx = srv->ctx;
+
+		WARN_ON(srv->dev.kobj.state_in_sysfs);
+		WARN_ON(srv->kobj_paths.state_in_sysfs);
+
+		mutex_lock(&ctx->srv_mutex);
+		list_del(&srv->ctx_list);
+		mutex_unlock(&ctx->srv_mutex);
+		free_srv(srv);
+	}
+}
+
+static void __add_path_to_srv(struct rtrs_srv *srv,
+			      struct rtrs_srv_sess *sess)
+{
+	list_add_tail(&sess->s.entry, &srv->paths_list);
+	srv->paths_num++;
+	WARN_ON(srv->paths_num >= MAX_PATHS_NUM);
+}
+
+static void del_path_from_srv(struct rtrs_srv_sess *sess)
+{
+	struct rtrs_srv *srv = sess->srv;
+
+	if (WARN_ON(!srv))
+		return;
+
+	mutex_lock(&srv->paths_mutex);
+	list_del(&sess->s.entry);
+	WARN_ON(!srv->paths_num);
+	srv->paths_num--;
+	mutex_unlock(&srv->paths_mutex);
+}
+
+/* return true if addresses are the same, error other wise */
+static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b)
+{
+	switch (a->sa_family) {
+	case AF_IB:
+		return memcmp(&((struct sockaddr_ib *)a)->sib_addr,
+			      &((struct sockaddr_ib *)b)->sib_addr,
+			      sizeof(struct ib_addr)) &&
+			(b->sa_family == AF_IB);
+	case AF_INET:
+		return memcmp(&((struct sockaddr_in *)a)->sin_addr,
+			      &((struct sockaddr_in *)b)->sin_addr,
+			      sizeof(struct in_addr)) &&
+			(b->sa_family == AF_INET);
+	case AF_INET6:
+		return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr,
+			      &((struct sockaddr_in6 *)b)->sin6_addr,
+			      sizeof(struct in6_addr)) &&
+			(b->sa_family == AF_INET6);
+	default:
+		return -ENOENT;
+	}
+}
+
+static bool __is_path_w_addr_exists(struct rtrs_srv *srv,
+				    struct rdma_addr *addr)
+{
+	struct rtrs_srv_sess *sess;
+
+	list_for_each_entry(sess, &srv->paths_list, s.entry)
+		if (!sockaddr_cmp((struct sockaddr *)&sess->s.dst_addr,
+				  (struct sockaddr *)&addr->dst_addr) &&
+		    !sockaddr_cmp((struct sockaddr *)&sess->s.src_addr,
+				  (struct sockaddr *)&addr->src_addr))
+			return true;
+
+	return false;
+}
+
+static void rtrs_srv_close_work(struct work_struct *work)
+{
+	struct rtrs_srv_sess *sess;
+	struct rtrs_srv_con *con;
+	int i;
+
+	sess = container_of(work, typeof(*sess), close_work);
+
+	rtrs_srv_destroy_sess_files(sess);
+	rtrs_srv_stop_hb(sess);
+
+	for (i = 0; i < sess->s.con_num; i++) {
+		if (!sess->s.con[i])
+			continue;
+		con = to_srv_con(sess->s.con[i]);
+		rdma_disconnect(con->c.cm_id);
+		ib_drain_qp(con->c.qp);
+	}
+	/* Wait for all inflights */
+	rtrs_srv_wait_ops_ids(sess);
+
+	/* Notify upper layer if we are the last path */
+	rtrs_srv_sess_down(sess);
+
+	unmap_cont_bufs(sess);
+	rtrs_srv_free_ops_ids(sess);
+
+	for (i = 0; i < sess->s.con_num; i++) {
+		if (!sess->s.con[i])
+			continue;
+		con = to_srv_con(sess->s.con[i]);
+		rtrs_cq_qp_destroy(&con->c);
+		rdma_destroy_id(con->c.cm_id);
+		kfree(con);
+	}
+	rtrs_ib_dev_put(sess->s.dev);
+
+	del_path_from_srv(sess);
+	put_srv(sess->srv);
+	sess->srv = NULL;
+	rtrs_srv_change_state(sess, RTRS_SRV_CLOSED);
+
+	kfree(sess->dma_addr);
+	kfree(sess->s.con);
+	kfree(sess);
+}
+
+static int rtrs_rdma_do_accept(struct rtrs_srv_sess *sess,
+			       struct rdma_cm_id *cm_id)
+{
+	struct rtrs_srv *srv = sess->srv;
+	struct rtrs_msg_conn_rsp msg;
+	struct rdma_conn_param param;
+	int err;
+
+	param = (struct rdma_conn_param) {
+	.rnr_retry_count = 7,
+	.private_data = &msg,
+	.private_data_len = sizeof(msg),
+	};
+
+	msg = (struct rtrs_msg_conn_rsp) {
+	.magic = cpu_to_le16(RTRS_MAGIC),
+	.version = cpu_to_le16(RTRS_PROTO_VER),
+	.queue_depth = cpu_to_le16(srv->queue_depth),
+	.max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE),
+	.max_hdr_size = cpu_to_le32(MAX_HDR_SIZE),
+	};
+
+	if (always_invalidate)
+		msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F);
+
+	err = rdma_accept(cm_id, &param);
+	if (err)
+		pr_err("rdma_accept(), err: %d\n", err);
+
+	return err;
+}
+
+static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno)
+{
+	struct rtrs_msg_conn_rsp msg;
+	int err;
+
+	msg = (struct rtrs_msg_conn_rsp) {
+	.magic = cpu_to_le16(RTRS_MAGIC),
+	.version = cpu_to_le16(RTRS_PROTO_VER),
+	.errno = cpu_to_le16(errno),
+	};
+
+	err = rdma_reject(cm_id, &msg, sizeof(msg));
+	if (err)
+		pr_err("rdma_reject(), err: %d\n", err);
+
+	/* Bounce errno back */
+	return errno;
+}
+
+static struct rtrs_srv_sess *
+__find_sess(struct rtrs_srv *srv, const uuid_t *sess_uuid)
+{
+	struct rtrs_srv_sess *sess;
+
+	list_for_each_entry(sess, &srv->paths_list, s.entry) {
+		if (uuid_equal(&sess->s.uuid, sess_uuid))
+			return sess;
+	}
+
+	return NULL;
+}
+
+static int create_con(struct rtrs_srv_sess *sess,
+		      struct rdma_cm_id *cm_id,
+		      unsigned int cid)
+{
+	struct rtrs_srv *srv = sess->srv;
+	struct rtrs_sess *s = &sess->s;
+	struct rtrs_srv_con *con;
+
+	u16 cq_size, wr_queue_size;
+	int err, cq_vector;
+
+	con = kzalloc(sizeof(*con), GFP_KERNEL);
+	if (!con) {
+		rtrs_err(s, "kzalloc() failed\n");
+		err = -ENOMEM;
+		goto err;
+	}
+
+	con->c.cm_id = cm_id;
+	con->c.sess = &sess->s;
+	con->c.cid = cid;
+	atomic_set(&con->wr_cnt, 0);
+
+	if (con->c.cid == 0) {
+		/*
+		 * All receive and all send (each requiring invalidate)
+		 * + 2 for drain and heartbeat
+		 */
+		wr_queue_size = SERVICE_CON_QUEUE_DEPTH * 3 + 2;
+		cq_size = wr_queue_size;
+	} else {
+		/*
+		 * If we have all receive requests posted and
+		 * all write requests posted and each read request
+		 * requires an invalidate request + drain
+		 * and qp gets into error state.
+		 */
+		cq_size = srv->queue_depth * 3 + 1;
+		/*
+		 * In theory we might have queue_depth * 32
+		 * outstanding requests if an unsafe global key is used
+		 * and we have queue_depth read requests each consisting
+		 * of 32 different addresses. div 3 for mlx5.
+		 */
+		wr_queue_size = sess->s.dev->ib_dev->attrs.max_qp_wr / 3;
+	}
+
+	cq_vector = rtrs_srv_get_next_cq_vector(sess);
+
+	/* TODO: SOFTIRQ can be faster, but be careful with softirq context */
+	err = rtrs_cq_qp_create(&sess->s, &con->c, 1, cq_vector, cq_size,
+				 wr_queue_size, IB_POLL_WORKQUEUE);
+	if (err) {
+		rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err);
+		goto free_con;
+	}
+	if (con->c.cid == 0) {
+		err = post_recv_info_req(con);
+		if (err)
+			goto free_cqqp;
+	}
+	WARN_ON(sess->s.con[cid]);
+	sess->s.con[cid] = &con->c;
+
+	/*
+	 * Change context from server to current connection.  The other
+	 * way is to use cm_id->qp->qp_context, which does not work on OFED.
+	 */
+	cm_id->context = &con->c;
+
+	return 0;
+
+free_cqqp:
+	rtrs_cq_qp_destroy(&con->c);
+free_con:
+	kfree(con);
+
+err:
+	return err;
+}
+
+static struct rtrs_srv_sess *__alloc_sess(struct rtrs_srv *srv,
+					   struct rdma_cm_id *cm_id,
+					   unsigned int con_num,
+					   unsigned int recon_cnt,
+					   const uuid_t *uuid)
+{
+	struct rtrs_srv_sess *sess;
+	int err = -ENOMEM;
+
+	if (srv->paths_num >= MAX_PATHS_NUM) {
+		err = -ECONNRESET;
+		goto err;
+	}
+	if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) {
+		err = -EEXIST;
+		goto err;
+	}
+	sess = kzalloc(sizeof(*sess), GFP_KERNEL);
+	if (!sess)
+		goto err;
+
+	sess->dma_addr = kcalloc(srv->queue_depth, sizeof(*sess->dma_addr),
+				 GFP_KERNEL);
+	if (!sess->dma_addr)
+		goto err_free_sess;
+
+	sess->s.con = kcalloc(con_num, sizeof(*sess->s.con), GFP_KERNEL);
+	if (!sess->s.con)
+		goto err_free_dma_addr;
+
+	sess->state = RTRS_SRV_CONNECTING;
+	sess->srv = srv;
+	sess->cur_cq_vector = -1;
+	sess->s.dst_addr = cm_id->route.addr.dst_addr;
+	sess->s.src_addr = cm_id->route.addr.src_addr;
+	sess->s.con_num = con_num;
+	sess->s.recon_cnt = recon_cnt;
+	uuid_copy(&sess->s.uuid, uuid);
+	spin_lock_init(&sess->state_lock);
+	INIT_WORK(&sess->close_work, rtrs_srv_close_work);
+	rtrs_srv_init_hb(sess);
+
+	sess->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd);
+	if (!sess->s.dev) {
+		err = -ENOMEM;
+		goto err_free_con;
+	}
+	err = map_cont_bufs(sess);
+	if (err)
+		goto err_put_dev;
+
+	err = rtrs_srv_alloc_ops_ids(sess);
+	if (err)
+		goto err_unmap_bufs;
+
+	__add_path_to_srv(srv, sess);
+
+	return sess;
+
+err_unmap_bufs:
+	unmap_cont_bufs(sess);
+err_put_dev:
+	rtrs_ib_dev_put(sess->s.dev);
+err_free_con:
+	kfree(sess->s.con);
+err_free_dma_addr:
+	kfree(sess->dma_addr);
+err_free_sess:
+	kfree(sess);
+
+err:
+	return ERR_PTR(err);
+}
+
+static int rtrs_rdma_connect(struct rdma_cm_id *cm_id,
+			      const struct rtrs_msg_conn_req *msg,
+			      size_t len)
+{
+	struct rtrs_srv_ctx *ctx = cm_id->context;
+	struct rtrs_srv_sess *sess;
+	struct rtrs_srv *srv;
+
+	u16 version, con_num, cid;
+	u16 recon_cnt;
+	int err;
+
+	if (len < sizeof(*msg)) {
+		pr_err("Invalid RTRS connection request\n");
+		goto reject_w_econnreset;
+	}
+	if (le16_to_cpu(msg->magic) != RTRS_MAGIC) {
+		pr_err("Invalid RTRS magic\n");
+		goto reject_w_econnreset;
+	}
+	version = le16_to_cpu(msg->version);
+	if (version >> 8 != RTRS_PROTO_VER_MAJOR) {
+		pr_err("Unsupported major RTRS version: %d, expected %d\n",
+		       version >> 8, RTRS_PROTO_VER_MAJOR);
+		goto reject_w_econnreset;
+	}
+	con_num = le16_to_cpu(msg->cid_num);
+	if (con_num > 4096) {
+		/* Sanity check */
+		pr_err("Too many connections requested: %d\n", con_num);
+		goto reject_w_econnreset;
+	}
+	cid = le16_to_cpu(msg->cid);
+	if (cid >= con_num) {
+		/* Sanity check */
+		pr_err("Incorrect cid: %d >= %d\n", cid, con_num);
+		goto reject_w_econnreset;
+	}
+	recon_cnt = le16_to_cpu(msg->recon_cnt);
+	srv = get_or_create_srv(ctx, &msg->paths_uuid);
+	if (!srv) {
+		err = -ENOMEM;
+		goto reject_w_err;
+	}
+	mutex_lock(&srv->paths_mutex);
+	sess = __find_sess(srv, &msg->sess_uuid);
+	if (sess) {
+		struct rtrs_sess *s = &sess->s;
+
+		/* Session already holds a reference */
+		put_srv(srv);
+
+		if (sess->state != RTRS_SRV_CONNECTING) {
+			rtrs_err(s, "Session in wrong state: %s\n",
+				  rtrs_srv_state_str(sess->state));
+			mutex_unlock(&srv->paths_mutex);
+			goto reject_w_econnreset;
+		}
+		/*
+		 * Sanity checks
+		 */
+		if (con_num != sess->s.con_num || cid >= sess->s.con_num) {
+			rtrs_err(s, "Incorrect request: %d, %d\n",
+				  cid, con_num);
+			mutex_unlock(&srv->paths_mutex);
+			goto reject_w_econnreset;
+		}
+		if (sess->s.con[cid]) {
+			rtrs_err(s, "Connection already exists: %d\n",
+				  cid);
+			mutex_unlock(&srv->paths_mutex);
+			goto reject_w_econnreset;
+		}
+	} else {
+		sess = __alloc_sess(srv, cm_id, con_num, recon_cnt,
+				    &msg->sess_uuid);
+		if (IS_ERR(sess)) {
+			mutex_unlock(&srv->paths_mutex);
+			put_srv(srv);
+			err = PTR_ERR(sess);
+			goto reject_w_err;
+		}
+	}
+	err = create_con(sess, cm_id, cid);
+	if (err) {
+		(void)rtrs_rdma_do_reject(cm_id, err);
+		/*
+		 * Since session has other connections we follow normal way
+		 * through workqueue, but still return an error to tell cma.c
+		 * to call rdma_destroy_id() for current connection.
+		 */
+		goto close_and_return_err;
+	}
+	err = rtrs_rdma_do_accept(sess, cm_id);
+	if (err) {
+		(void)rtrs_rdma_do_reject(cm_id, err);
+		/*
+		 * Since current connection was successfully added to the
+		 * session we follow normal way through workqueue to close the
+		 * session, thus return 0 to tell cma.c we call
+		 * rdma_destroy_id() ourselves.
+		 */
+		err = 0;
+		goto close_and_return_err;
+	}
+	mutex_unlock(&srv->paths_mutex);
+
+	return 0;
+
+reject_w_err:
+	return rtrs_rdma_do_reject(cm_id, err);
+
+reject_w_econnreset:
+	return rtrs_rdma_do_reject(cm_id, -ECONNRESET);
+
+close_and_return_err:
+	close_sess(sess);
+	mutex_unlock(&srv->paths_mutex);
+
+	return err;
+}
+
+static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id,
+				     struct rdma_cm_event *ev)
+{
+	struct rtrs_srv_sess *sess = NULL;
+	struct rtrs_sess *s = NULL;
+
+	if (ev->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
+		struct rtrs_con *c = cm_id->context;
+
+		s = c->sess;
+		sess = to_srv_sess(s);
+	}
+
+	switch (ev->event) {
+	case RDMA_CM_EVENT_CONNECT_REQUEST:
+		/*
+		 * In case of error cma.c will destroy cm_id,
+		 * see cma_process_remove()
+		 */
+		return rtrs_rdma_connect(cm_id, ev->param.conn.private_data,
+					  ev->param.conn.private_data_len);
+	case RDMA_CM_EVENT_ESTABLISHED:
+		/* Nothing here */
+		break;
+	case RDMA_CM_EVENT_REJECTED:
+	case RDMA_CM_EVENT_CONNECT_ERROR:
+	case RDMA_CM_EVENT_UNREACHABLE:
+		rtrs_err(s, "CM error (CM event: %s, err: %d)\n",
+			  rdma_event_msg(ev->event), ev->status);
+		close_sess(sess);
+		break;
+	case RDMA_CM_EVENT_DISCONNECTED:
+	case RDMA_CM_EVENT_ADDR_CHANGE:
+	case RDMA_CM_EVENT_TIMEWAIT_EXIT:
+		close_sess(sess);
+		break;
+	case RDMA_CM_EVENT_DEVICE_REMOVAL:
+		close_sess(sess);
+		break;
+	default:
+		pr_err("Ignoring unexpected CM event %s, err %d\n",
+		       rdma_event_msg(ev->event), ev->status);
+		break;
+	}
+
+	return 0;
+}
+
+static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx,
+					    struct sockaddr *addr,
+					    enum rdma_ucm_port_space ps)
+{
+	struct rdma_cm_id *cm_id;
+	int ret;
+
+	cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler,
+			       ctx, ps, IB_QPT_RC);
+	if (IS_ERR(cm_id)) {
+		ret = PTR_ERR(cm_id);
+		pr_err("Creating id for RDMA connection failed, err: %d\n",
+		       ret);
+		goto err_out;
+	}
+	ret = rdma_bind_addr(cm_id, addr);
+	if (ret) {
+		pr_err("Binding RDMA address failed, err: %d\n", ret);
+		goto err_cm;
+	}
+	ret = rdma_listen(cm_id, 64);
+	if (ret) {
+		pr_err("Listening on RDMA connection failed, err: %d\n",
+		       ret);
+		goto err_cm;
+	}
+
+	return cm_id;
+
+err_cm:
+	rdma_destroy_id(cm_id);
+err_out:
+
+	return ERR_PTR(ret);
+}
+
+static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, unsigned int port)
+{
+	struct sockaddr_in6 sin = {
+		.sin6_family	= AF_INET6,
+		.sin6_addr	= IN6ADDR_ANY_INIT,
+		.sin6_port	= htons(port),
+	};
+	struct sockaddr_ib sib = {
+		.sib_family			= AF_IB,
+		.sib_sid	= cpu_to_be64(RDMA_IB_IP_PS_IB | port),
+		.sib_sid_mask	= cpu_to_be64(0xffffffffffffffffULL),
+		.sib_pkey	= cpu_to_be16(0xffff),
+	};
+	struct rdma_cm_id *cm_ip, *cm_ib;
+	int ret;
+
+	/*
+	 * We accept both IPoIB and IB connections, so we need to keep
+	 * two cm id's, one for each socket type and port space.
+	 * If the cm initialization of one of the id's fails, we abort
+	 * everything.
+	 */
+	cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP);
+	if (IS_ERR(cm_ip))
+		return PTR_ERR(cm_ip);
+
+	cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB);
+	if (IS_ERR(cm_ib)) {
+		ret = PTR_ERR(cm_ib);
+		goto free_cm_ip;
+	}
+
+	ctx->cm_id_ip = cm_ip;
+	ctx->cm_id_ib = cm_ib;
+
+	return 0;
+
+free_cm_ip:
+	rdma_destroy_id(cm_ip);
+
+	return ret;
+}
+
+static struct rtrs_srv_ctx *alloc_srv_ctx(rdma_ev_fn *rdma_ev,
+					   link_ev_fn *link_ev)
+{
+	struct rtrs_srv_ctx *ctx;
+
+	ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
+	if (!ctx)
+		return NULL;
+
+	ctx->rdma_ev = rdma_ev;
+	ctx->link_ev = link_ev;
+	mutex_init(&ctx->srv_mutex);
+	INIT_LIST_HEAD(&ctx->srv_list);
+
+	return ctx;
+}
+
+static void free_srv_ctx(struct rtrs_srv_ctx *ctx)
+{
+	WARN_ON(!list_empty(&ctx->srv_list));
+	kfree(ctx);
+}
+
+struct rtrs_srv_ctx *rtrs_srv_open(rdma_ev_fn *rdma_ev, link_ev_fn *link_ev,
+				     unsigned int port)
+{
+	struct rtrs_srv_ctx *ctx;
+	int err;
+
+	ctx = alloc_srv_ctx(rdma_ev, link_ev);
+	if (!ctx)
+		return ERR_PTR(-ENOMEM);
+
+	err = rtrs_srv_rdma_init(ctx, port);
+	if (err) {
+		free_srv_ctx(ctx);
+		return ERR_PTR(err);
+	}
+	/* Do not let module be unloaded if server context is alive */
+	__module_get(THIS_MODULE);
+
+	return ctx;
+}
+EXPORT_SYMBOL(rtrs_srv_open);
+
+static void close_sessions(struct rtrs_srv *srv)
+{
+	struct rtrs_srv_sess *sess;
+
+	mutex_lock(&srv->paths_mutex);
+	list_for_each_entry(sess, &srv->paths_list, s.entry)
+		close_sess(sess);
+	mutex_unlock(&srv->paths_mutex);
+}
+
+static void close_ctx(struct rtrs_srv_ctx *ctx)
+{
+	struct rtrs_srv *srv;
+
+	mutex_lock(&ctx->srv_mutex);
+	list_for_each_entry(srv, &ctx->srv_list, ctx_list)
+		close_sessions(srv);
+	mutex_unlock(&ctx->srv_mutex);
+	flush_workqueue(rtrs_wq);
+}
+
+void rtrs_srv_close(struct rtrs_srv_ctx *ctx)
+{
+	rdma_destroy_id(ctx->cm_id_ip);
+	rdma_destroy_id(ctx->cm_id_ib);
+	close_ctx(ctx);
+	free_srv_ctx(ctx);
+	module_put(THIS_MODULE);
+}
+EXPORT_SYMBOL(rtrs_srv_close);
+
+static int check_module_params(void)
+{
+	if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) {
+		pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n",
+		       sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH);
+		return -EINVAL;
+	}
+	if (max_chunk_size < 4096 || !is_power_of_2(max_chunk_size)) {
+		pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n",
+		       max_chunk_size, 4096);
+		return -EINVAL;
+	}
+
+	/*
+	 * Check if IB immediate data size is enough to hold the mem_id and the
+	 * offset inside the memory chunk
+	 */
+	if ((ilog2(sess_queue_depth - 1) + 1) +
+	    (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) {
+		pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n",
+		       MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size);
+		return -EINVAL;
+	}
+
+	return 0;
+}
+
+static int __init rtrs_server_init(void)
+{
+	int err;
+
+	init_cq_affinity();
+
+	pr_info("Loading module %s, proto %s: (cq_affinity_list: %s, max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n",
+		KBUILD_MODNAME, RTRS_PROTO_VER_STRING,
+		cq_affinity_list, max_chunk_size,
+		max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE,
+		sess_queue_depth, always_invalidate);
+
+	rtrs_rdma_dev_pd_init(0, &dev_pd);
+
+	err = check_module_params();
+	if (err) {
+		pr_err("Failed to load module, invalid module parameters, err: %d\n",
+		       err);
+		return err;
+	}
+	chunk_pool = mempool_create_page_pool(sess_queue_depth * CHUNK_POOL_SZ,
+					      get_order(max_chunk_size));
+	if (!chunk_pool) {
+		pr_err("Failed preallocate pool of chunks\n");
+		return -ENOMEM;
+	}
+	rtrs_dev_class = class_create(THIS_MODULE, "rtrs-server");
+	if (IS_ERR(rtrs_dev_class)) {
+		pr_err("Failed to create rtrs-server dev class\n");
+		err = PTR_ERR(rtrs_dev_class);
+		goto out_chunk_pool;
+	}
+	rtrs_wq = alloc_workqueue("rtrs_server_wq", WQ_MEM_RECLAIM, 0);
+	if (!rtrs_wq) {
+		pr_err("Failed to load module, alloc rtrs_server_wq failed\n");
+		goto out_dev_class;
+	}
+
+	return 0;
+
+out_dev_class:
+	class_destroy(rtrs_dev_class);
+out_chunk_pool:
+	mempool_destroy(chunk_pool);
+
+	return err;
+}
+
+static void __exit rtrs_server_exit(void)
+{
+	destroy_workqueue(rtrs_wq);
+	class_destroy(rtrs_dev_class);
+	mempool_destroy(chunk_pool);
+	rtrs_rdma_dev_pd_deinit(&dev_pd);
+}
+
+module_init(rtrs_server_init);
+module_exit(rtrs_server_exit);