@@ -149,6 +149,8 @@ struct svc_rdma_send_ctxt {
struct list_head sc_list;
struct ib_send_wr sc_send_wr;
struct ib_cqe sc_cqe;
+ struct xdr_buf sc_hdrbuf;
+ struct xdr_stream sc_stream;
void *sc_xprt_buf;
int sc_page_count;
int sc_cur_sge_no;
@@ -181,7 +181,9 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
if (!ctxt)
goto drop_connection;
- p = ctxt->sc_xprt_buf;
+ p = xdr_reserve_space(&ctxt->sc_stream, RPCRDMA_HDRLEN_MIN);
+ if (!p)
+ goto put_ctxt;
*p++ = rqst->rq_xid;
*p++ = rpcrdma_version;
*p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests);
@@ -189,7 +191,7 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
*p++ = xdr_zero;
*p++ = xdr_zero;
*p = xdr_zero;
- svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_MIN);
+ svc_rdma_sync_reply_hdr(rdma, ctxt, ctxt->sc_hdrbuf.len);
#ifdef SVCRDMA_BACKCHANNEL_DEBUG
pr_info("%s: %*ph\n", __func__, 64, rqst->rq_buffer);
@@ -197,12 +199,13 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
rqst->rq_xtime = ktime_get();
rc = svc_rdma_bc_sendto(rdma, rqst, ctxt);
- if (rc) {
- svc_rdma_send_ctxt_put(rdma, ctxt);
- goto drop_connection;
- }
+ if (rc)
+ goto put_ctxt;
return 0;
+put_ctxt:
+ svc_rdma_send_ctxt_put(rdma, ctxt);
+
drop_connection:
dprintk("svcrdma: failed to send bc call\n");
return -ENOTCONN;
@@ -698,7 +698,6 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
__be32 *rdma_argp, int status)
{
struct svc_rdma_send_ctxt *ctxt;
- unsigned int length;
__be32 *p;
int ret;
@@ -706,29 +705,48 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
if (!ctxt)
return;
- p = ctxt->sc_xprt_buf;
+ p = xdr_reserve_space(&ctxt->sc_stream,
+ rpcrdma_fixed_maxsz * sizeof(*p));
+ if (!p)
+ goto put_ctxt;
+
*p++ = *rdma_argp;
*p++ = *(rdma_argp + 1);
*p++ = xprt->sc_fc_credits;
- *p++ = rdma_error;
+ *p = rdma_error;
+
switch (status) {
case -EPROTONOSUPPORT:
+ p = xdr_reserve_space(&ctxt->sc_stream,
+ 3 * sizeof(*p));
+ if (!p)
+ goto put_ctxt;
+
*p++ = err_vers;
*p++ = rpcrdma_version;
- *p++ = rpcrdma_version;
+ *p = rpcrdma_version;
trace_svcrdma_err_vers(*rdma_argp);
break;
default:
- *p++ = err_chunk;
+ p = xdr_reserve_space(&ctxt->sc_stream,
+ sizeof(*p));
+ if (!p)
+ goto put_ctxt;
+
+ *p = err_chunk;
trace_svcrdma_err_chunk(*rdma_argp);
}
- length = (unsigned long)p - (unsigned long)ctxt->sc_xprt_buf;
- svc_rdma_sync_reply_hdr(xprt, ctxt, length);
+
+ svc_rdma_sync_reply_hdr(xprt, ctxt, ctxt->sc_hdrbuf.len);
ctxt->sc_send_wr.opcode = IB_WR_SEND;
ret = svc_rdma_send(xprt, &ctxt->sc_send_wr);
if (ret)
- svc_rdma_send_ctxt_put(xprt, ctxt);
+ goto put_ctxt;
+ return;
+
+put_ctxt:
+ svc_rdma_send_ctxt_put(xprt, ctxt);
}
/* By convention, backchannel calls arrive via rdma_msg type
@@ -151,6 +151,8 @@ svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
ctxt->sc_cqe.done = svc_rdma_wc_send;
ctxt->sc_xprt_buf = buffer;
+ xdr_buf_init(&ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
+ rdma->sc_max_req_size);
ctxt->sc_sges[0].addr = addr;
for (i = 0; i < rdma->sc_max_send_sges; i++)
@@ -204,6 +206,10 @@ struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma)
spin_unlock(&rdma->sc_send_lock);
out:
+ rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0);
+ xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf,
+ ctxt->sc_xprt_buf, NULL);
+
ctxt->sc_send_wr.num_sge = 0;
ctxt->sc_cur_sge_no = 0;
ctxt->sc_page_count = 0;
@@ -322,131 +328,171 @@ int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr)
return ret;
}
-/* Returns length of transport header, in bytes.
+/**
+ * svc_rdma_encode_read_list - Encode RPC Reply's Read chunk list
+ * @sctxt: Send context for the RPC Reply
+ *
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Reply Read list
+ * %-EMSGSIZE on XDR buffer overflow
*/
-static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp)
+static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt)
{
- unsigned int nsegs;
- __be32 *p;
-
- p = rdma_resp;
-
- /* RPC-over-RDMA V1 replies never have a Read list. */
- p += rpcrdma_fixed_maxsz + 1;
-
- /* Skip Write list. */
- while (*p++ != xdr_zero) {
- nsegs = be32_to_cpup(p++);
- p += nsegs * rpcrdma_segment_maxsz;
- }
+ /* RPC-over-RDMA version 1 replies never have a Read list. */
+ return xdr_stream_encode_item_absent(&sctxt->sc_stream);
+}
- /* Skip Reply chunk. */
- if (*p++ != xdr_zero) {
- nsegs = be32_to_cpup(p++);
- p += nsegs * rpcrdma_segment_maxsz;
+/**
+ * svc_rdma_encode_write_segment - Encode one Write segment
+ * @src: matching Write chunk in the RPC Call header
+ * @sctxt: Send context for the RPC Reply
+ * @remaining: remaining bytes of the payload left in the Write chunk
+ *
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Write segment
+ * %-EMSGSIZE on XDR buffer overflow
+ */
+static ssize_t svc_rdma_encode_write_segment(__be32 *src,
+ struct svc_rdma_send_ctxt *sctxt,
+ unsigned int *remaining)
+{
+ __be32 *p;
+ const size_t len = rpcrdma_segment_maxsz * sizeof(*p);
+ u32 handle, length;
+ u64 offset;
+
+ p = xdr_reserve_space(&sctxt->sc_stream, len);
+ if (!p)
+ return -EMSGSIZE;
+
+ handle = be32_to_cpup(src++);
+ length = be32_to_cpup(src++);
+ xdr_decode_hyper(src, &offset);
+
+ *p++ = cpu_to_be32(handle);
+ if (*remaining < length) {
+ /* segment only partly filled */
+ length = *remaining;
+ *remaining = 0;
+ } else {
+ /* entire segment was consumed */
+ *remaining -= length;
}
+ *p++ = cpu_to_be32(length);
+ xdr_encode_hyper(p, offset);
- return (unsigned long)p - (unsigned long)rdma_resp;
+ trace_svcrdma_encode_wseg(handle, length, offset);
+ return len;
}
-/* One Write chunk is copied from Call transport header to Reply
- * transport header. Each segment's length field is updated to
- * reflect number of bytes consumed in the segment.
- *
- * Returns number of segments in this chunk.
+/**
+ * svc_rdma_encode_write_chunk - Encode one Write chunk
+ * @src: matching Write chunk in the RPC Call header
+ * @sctxt: Send context for the RPC Reply
+ * @remaining: size in bytes of the payload in the Write chunk
+ *
+ * Copy a Write chunk from the Call transport header to the
+ * Reply transport header. Update each segment's length field
+ * to reflect the number of bytes written in that segment.
+ *
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Write chunk
+ * %-EMSGSIZE on XDR buffer overflow
*/
-static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
+static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
+ struct svc_rdma_send_ctxt *sctxt,
unsigned int remaining)
{
unsigned int i, nsegs;
- u32 seg_len;
+ ssize_t len, ret;
- /* Write list discriminator */
- *dst++ = *src++;
+ len = 0;
+ trace_svcrdma_encode_write_chunk(remaining);
- /* number of segments in this chunk */
- nsegs = be32_to_cpup(src);
- *dst++ = *src++;
+ src++;
+ ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
+ if (ret < 0)
+ return -EMSGSIZE;
+ len += ret;
- for (i = nsegs; i; i--) {
- /* segment's RDMA handle */
- *dst++ = *src++;
-
- /* bytes returned in this segment */
- seg_len = be32_to_cpu(*src);
- if (remaining >= seg_len) {
- /* entire segment was consumed */
- *dst = *src;
- remaining -= seg_len;
- } else {
- /* segment only partly filled */
- *dst = cpu_to_be32(remaining);
- remaining = 0;
- }
- dst++; src++;
+ nsegs = be32_to_cpup(src++);
+ ret = xdr_stream_encode_u32(&sctxt->sc_stream, nsegs);
+ if (ret < 0)
+ return -EMSGSIZE;
+ len += ret;
- /* segment's RDMA offset */
- *dst++ = *src++;
- *dst++ = *src++;
+ for (i = nsegs; i; i--) {
+ ret = svc_rdma_encode_write_segment(src, sctxt, &remaining);
+ if (ret < 0)
+ return -EMSGSIZE;
+ src += rpcrdma_segment_maxsz;
+ len += ret;
}
- return nsegs;
+ return len;
}
-/* The client provided a Write list in the Call message. Fill in
- * the segments in the first Write chunk in the Reply's transport
+/**
+ * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list
+ * @rctxt: Reply context with information about the RPC Call
+ * @sctxt: Send context for the RPC Reply
+ * @length: size in bytes of the payload in the first Write chunk
+ *
+ * The client provides a Write chunk list in the Call message. Fill
+ * in the segments in the first Write chunk in the Reply's transport
* header with the number of bytes consumed in each segment.
* Remaining chunks are returned unused.
*
* Assumptions:
* - Client has provided only one Write chunk
+ *
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Reply's Write list
+ * %-EMSGSIZE on XDR buffer overflow
*/
-static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch,
- unsigned int consumed)
+static ssize_t svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rdma_send_ctxt *sctxt,
+ unsigned int length)
{
- unsigned int nsegs;
- __be32 *p, *q;
-
- /* RPC-over-RDMA V1 replies never have a Read list. */
- p = rdma_resp + rpcrdma_fixed_maxsz + 1;
-
- q = wr_ch;
- while (*q != xdr_zero) {
- nsegs = xdr_encode_write_chunk(p, q, consumed);
- q += 2 + nsegs * rpcrdma_segment_maxsz;
- p += 2 + nsegs * rpcrdma_segment_maxsz;
- consumed = 0;
- }
+ ssize_t len, ret;
- /* Terminate Write list */
- *p++ = xdr_zero;
+ ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt, length);
+ if (ret < 0)
+ return ret;
+ len = ret;
- /* Reply chunk discriminator; may be replaced later */
- *p = xdr_zero;
+ /* Terminate the Write list */
+ ret = xdr_stream_encode_item_absent(&sctxt->sc_stream);
+ if (ret < 0)
+ return ret;
+
+ return len + ret;
}
-/* The client provided a Reply chunk in the Call message. Fill in
- * the segments in the Reply chunk in the Reply message with the
- * number of bytes consumed in each segment.
+/**
+ * svc_rdma_encode_reply_chunk - Encode RPC Reply's Reply chunk
+ * @rctxt: Reply context with information about the RPC Call
+ * @sctxt: Send context for the RPC Reply
+ * @length: size in bytes of the payload in the Reply chunk
*
* Assumptions:
- * - Reply can always fit in the provided Reply chunk
+ * - Reply can always fit in the client-provided Reply chunk
+ *
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Reply's Reply chunk
+ * %-EMSGSIZE on XDR buffer overflow
*/
-static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch,
- unsigned int consumed)
+static ssize_t svc_rdma_encode_reply_chunk(const struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rdma_send_ctxt *sctxt,
+ unsigned int length)
{
- __be32 *p;
-
- /* Find the Reply chunk in the Reply's xprt header.
- * RPC-over-RDMA V1 replies never have a Read list.
- */
- p = rdma_resp + rpcrdma_fixed_maxsz + 1;
-
- /* Skip past Write list */
- while (*p++ != xdr_zero)
- p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
-
- xdr_encode_write_chunk(p, rp_ch, consumed);
+ return svc_rdma_encode_write_chunk(rctxt->rc_reply_chunk, sctxt,
+ length);
}
static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
@@ -765,14 +811,26 @@ static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt,
struct svc_rqst *rqstp)
{
+ struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
+ __be32 *rdma_argp = rctxt->rc_recv_buf;
__be32 *p;
- p = ctxt->sc_xprt_buf;
- trace_svcrdma_err_chunk(*p);
- p += 3;
+ rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0);
+ xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf,
+ ctxt->sc_xprt_buf, NULL);
+
+ p = xdr_reserve_space(&ctxt->sc_stream, RPCRDMA_HDRLEN_ERR);
+ if (!p)
+ return -ENOMSG;
+
+ *p++ = *rdma_argp;
+ *p++ = *(rdma_argp + 1);
+ *p++ = rdma->sc_fc_credits;
*p++ = rdma_error;
*p = err_chunk;
- svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_ERR);
+ trace_svcrdma_err_chunk(*rdma_argp);
+
+ svc_rdma_sync_reply_hdr(rdma, ctxt, ctxt->sc_hdrbuf.len);
svc_rdma_save_io_pages(rqstp, ctxt);
@@ -803,7 +861,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
__be32 *rp_ch = rctxt->rc_reply_chunk;
struct xdr_buf *xdr = &rqstp->rq_res;
struct svc_rdma_send_ctxt *sctxt;
- __be32 *p, *rdma_resp;
+ __be32 *p;
int ret;
/* Create the RDMA response header. xprt->xpt_mutex,
@@ -816,19 +874,18 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
sctxt = svc_rdma_send_ctxt_get(rdma);
if (!sctxt)
goto err0;
- rdma_resp = sctxt->sc_xprt_buf;
- p = rdma_resp;
+ p = xdr_reserve_space(&sctxt->sc_stream,
+ rpcrdma_fixed_maxsz * sizeof(*p));
+ if (!p)
+ goto err0;
*p++ = *rdma_argp;
*p++ = *(rdma_argp + 1);
*p++ = rdma->sc_fc_credits;
- *p++ = rp_ch ? rdma_nomsg : rdma_msg;
-
- /* Start with empty chunks */
- *p++ = xdr_zero;
- *p++ = xdr_zero;
- *p = xdr_zero;
+ *p = rp_ch ? rdma_nomsg : rdma_msg;
+ if (svc_rdma_encode_read_list(sctxt) < 0)
+ goto err0;
if (wr_lst) {
/* XXX: Presume the client sent only one Write chunk */
unsigned long offset;
@@ -845,16 +902,24 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
length);
if (ret < 0)
goto err2;
- svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret);
+ if (svc_rdma_encode_write_list(rctxt, sctxt, length) < 0)
+ goto err0;
+ } else {
+ if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
+ goto err0;
}
if (rp_ch) {
ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res);
if (ret < 0)
goto err2;
- svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret);
+ if (svc_rdma_encode_reply_chunk(rctxt, sctxt, ret) < 0)
+ goto err0;
+ } else {
+ if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
+ goto err0;
}
- svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp));
+ svc_rdma_sync_reply_hdr(rdma, sctxt, sctxt->sc_hdrbuf.len);
ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp);
if (ret < 0)
goto err1;
Same idea as the receive-side changes I did a while back: use xdr_stream helpers rather than open-coding the XDR chunk list encoders. This builds the Reply transport header from beginning to end without backtracking. As additional clean-ups, fill in documenting comments for the XDR encoders and sprinkle some trace points in the new encoding functions. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> --- include/linux/sunrpc/svc_rdma.h | 2 net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 15 +- net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 34 +++ net/sunrpc/xprtrdma/svc_rdma_sendto.c | 279 +++++++++++++++++----------- 4 files changed, 209 insertions(+), 121 deletions(-)