diff mbox

[librdmacm,4/4] rsocket: Relax requirement for minimal inline data

Message ID 1397858143-22402-4-git-send-email-sean.hefty@intel.com (mailing list archive)
State Rejected
Headers show

Commit Message

Hefty, Sean April 18, 2014, 9:55 p.m. UTC
From: Sean Hefty <sean.hefty@intel.com>

Inline data support is optional.  Allow rsockets to work
with devices that do not support inline data, provided
that they do support RDMA writes with immediate data.
This allows rsockets to work over Intel TrueScale HCA.

Patch derived from work by: Amir Hanania

Signed-off-by: Amir Hanania <amir.hanania@intel.com>
Signed-off-by: Sean Hefty <sean.hefty@intel.com>
---
 src/rsocket.c |   67 +++++++++++++++++++++++++++++++-------------------------
 1 files changed, 37 insertions(+), 30 deletions(-)
diff mbox

Patch

diff --git a/src/rsocket.c b/src/rsocket.c
index 77b3979..3a9ff7d 100644
--- a/src/rsocket.c
+++ b/src/rsocket.c
@@ -61,7 +61,7 @@ 
 #define RS_SNDLOWAT 2048
 #define RS_QP_MIN_SIZE 16
 #define RS_QP_MAX_SIZE 0xFFFE
-#define RS_QP_CTRL_SIZE 4
+#define RS_QP_CTRL_SIZE 4	/* must be power of 2 */
 #define RS_CONN_RETRIES 6
 #define RS_SGL_SIZE 2
 static struct index_map idm;
@@ -195,7 +195,7 @@  struct rs_iomap_mr {
 	int index;	/* -1 if mapping is local and not in iomap_list */
 };
 
-#define RS_MIN_INLINE      (sizeof(struct rs_sge))
+#define RS_MAX_CTRL_MSG    (sizeof(struct rs_sge))
 #define rs_host_is_net()   (1 == htonl(1))
 #define RS_CONN_FLAG_NET   (1 << 0)
 #define RS_CONN_FLAG_IOMAP (1 << 1)
@@ -506,9 +506,6 @@  void rs_configure(void)
 	if ((f = fopen(RS_CONF_DIR "/inline_default", "r"))) {
 		(void) fscanf(f, "%hu", &def_inline);
 		fclose(f);
-
-		if (def_inline < RS_MIN_INLINE)
-			def_inline = RS_MIN_INLINE;
 	}
 
 	if ((f = fopen(RS_CONF_DIR "/sqsize_default", "r"))) {
@@ -678,18 +675,21 @@  static void ds_set_qp_size(struct rsocket *rs)
 
 static int rs_init_bufs(struct rsocket *rs)
 {
-	uint32_t rbuf_msg_size;
+	uint32_t total_rbuf_size, total_sbuf_size;
 	size_t len;
 
 	rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
 	if (!rs->rmsg)
 		return ERR(ENOMEM);
 
-	rs->sbuf = calloc(rs->sbuf_size, sizeof(*rs->sbuf));
+	total_sbuf_size = rs->sbuf_size;
+	if (rs->sq_inline < RS_MAX_CTRL_MSG)
+		total_sbuf_size += RS_MAX_CTRL_MSG * RS_QP_CTRL_SIZE;
+	rs->sbuf = calloc(total_sbuf_size, 1);
 	if (!rs->sbuf)
 		return ERR(ENOMEM);
 
-	rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, rs->sbuf_size);
+	rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, total_sbuf_size);
 	if (!rs->smr)
 		return -1;
 
@@ -708,14 +708,14 @@  static int rs_init_bufs(struct rsocket *rs)
 	if (rs->target_iomap_size)
 		rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE);
 
-	rbuf_msg_size = rs->rbuf_size;
+	total_rbuf_size = rs->rbuf_size;
 	if (rs->opts & RS_OPT_MSG_SEND)
-		rbuf_msg_size += rs->rq_size * RS_MSG_SIZE;
-	rs->rbuf = calloc(rbuf_msg_size, 1);
+		total_rbuf_size += rs->rq_size * RS_MSG_SIZE;
+	rs->rbuf = calloc(total_rbuf_size, 1);
 	if (!rs->rbuf)
 		return ERR(ENOMEM);
 
-	rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, rbuf_msg_size);
+	rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, total_rbuf_size);
 	if (!rs->rmr)
 		return -1;
 
@@ -862,8 +862,8 @@  static int rs_create_ep(struct rsocket *rs)
 		return ret;
 
 	rs->sq_inline = qp_attr.cap.max_inline_data;
-	if (rs->sq_inline < RS_MIN_INLINE)
-		return ERR(EINVAL);
+	if ((rs->opts & RS_OPT_MSG_SEND) && (rs->sq_inline < RS_MSG_SIZE))
+		return ERR(ENOTSUP);
 
 	for (i = 0; i < rs->rq_size; i++) {
 		ret = rs_post_recv(rs);
@@ -1497,11 +1497,6 @@  static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr,
 		goto err;
 
 	rs->sq_inline = qp_attr.cap.max_inline_data;
-	if (rs->sq_inline < RS_MIN_INLINE) {
-		ret = ERR(ENOMEM);
-		goto err;
-	}
-
 	ret = ds_add_qp_dest(qp, src_addr, addrlen);
 	if (ret)
 		goto err;
@@ -1613,6 +1608,12 @@  int rconnect(int socket, const struct sockaddr *addr, socklen_t addrlen)
 	return ret;
 }
 
+static void *rs_get_ctrl_buf(struct rsocket *rs)
+{
+	return rs->sbuf + rs->sbuf_size +
+		RS_MAX_CTRL_MSG * (rs->ctrl_seqno & (RS_QP_CTRL_SIZE - 1));
+}
+
 static int rs_post_msg(struct rsocket *rs, uint32_t msg)
 {
 	struct ibv_send_wr wr, *bad;
@@ -1774,7 +1775,7 @@  static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
 
 	addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
 	return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index),
-			         flags, addr, rs->remote_iomap.key);
+				 flags, addr, rs->remote_iomap.key);
 }
 
 static uint32_t rs_sbuf_left(struct rsocket *rs)
@@ -1786,7 +1787,8 @@  static uint32_t rs_sbuf_left(struct rsocket *rs)
 static void rs_send_credits(struct rsocket *rs)
 {
 	struct ibv_sge ibsge;
-	struct rs_sge sge;
+	struct rs_sge sge, *sge_buf;
+	int flags;
 
 	rs->ctrl_seqno++;
 	rs->rseq_comp = rs->rseq_no + (rs->rq_size >> 1);
@@ -1804,16 +1806,23 @@  static void rs_send_credits(struct rsocket *rs)
 			sge.length = bswap_32(rs->rbuf_size >> 1);
 		}
 
-		ibsge.addr = (uintptr_t) &sge;
-		ibsge.lkey = 0;
+		if (rs->sq_inline < sizeof sge) {
+			sge_buf = rs_get_ctrl_buf(rs);
+			memcpy(sge_buf, &sge, sizeof sge);
+			ibsge.addr = (uintptr_t) sge_buf;
+			ibsge.lkey = rs->smr->lkey;
+			flags = 0;
+		} else {
+			ibsge.addr = (uintptr_t) &sge;
+			ibsge.lkey = 0;
+			flags = IBV_SEND_INLINE;
+		}
 		ibsge.length = sizeof(sge);
 
 		rs_post_write_msg(rs, &ibsge, 1,
-				  rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
-				  IBV_SEND_INLINE,
-				  rs->remote_sgl.addr +
-				  rs->remote_sge * sizeof(struct rs_sge),
-				  rs->remote_sgl.key);
+			rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), flags,
+			rs->remote_sgl.addr + rs->remote_sge * sizeof(struct rs_sge),
+			rs->remote_sgl.key);
 
 		rs->rbuf_bytes_avail -= rs->rbuf_size >> 1;
 		rs->rbuf_free_offset += rs->rbuf_size >> 1;
@@ -3456,8 +3465,6 @@  int rsetsockopt(int socket, int level, int optname,
 			break;
 		case RDMA_INLINE:
 			rs->sq_inline = min(*(uint32_t *) optval, RS_QP_MAX_SIZE);
-			if (rs->sq_inline < RS_MIN_INLINE)
-				rs->sq_inline = RS_MIN_INLINE;
 			ret = 0;
 			break;
 		case RDMA_IOMAPSIZE: