@@ -203,6 +203,29 @@ struct svc_rdma_recv_ctxt {
struct page *rc_pages[RPCSVC_MAXPAGES];
};
+/*
+ * State for sending a Write chunk.
+ * - Tracks progress of writing one chunk over all its segments
+ * - Stores arguments for the SGL constructor functions
+ */
+struct svc_rdma_write_info {
+ struct svcxprt_rdma *wi_rdma;
+
+ const struct svc_rdma_chunk *wi_chunk;
+
+ /* write state of this chunk */
+ unsigned int wi_seg_off;
+ unsigned int wi_seg_no;
+
+ /* SGL constructor arguments */
+ const struct xdr_buf *wi_xdr;
+ unsigned char *wi_base;
+ unsigned int wi_next_off;
+
+ struct svc_rdma_chunk_ctxt wi_cc;
+ struct work_struct wi_work;
+};
+
struct svc_rdma_send_ctxt {
struct llist_node sc_node;
struct rpc_rdma_cid sc_cid;
@@ -215,6 +238,7 @@ struct svc_rdma_send_ctxt {
struct ib_cqe sc_cqe;
struct xdr_buf sc_hdrbuf;
struct xdr_stream sc_stream;
+ struct svc_rdma_write_info sc_reply_info;
void *sc_xprt_buf;
int sc_page_count;
int sc_cur_sge_no;
@@ -249,6 +273,7 @@ extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
const struct xdr_buf *xdr);
extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
const struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rdma_send_ctxt *sctxt,
const struct xdr_buf *xdr);
extern int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
struct svc_rqst *rqstp,
@@ -2118,6 +2118,10 @@ DEFINE_SIMPLE_CID_EVENT(svcrdma_wc_write);
DEFINE_SEND_FLUSH_EVENT(svcrdma_wc_write_flush);
DEFINE_SEND_FLUSH_EVENT(svcrdma_wc_write_err);
+DEFINE_SIMPLE_CID_EVENT(svcrdma_wc_reply);
+DEFINE_SEND_FLUSH_EVENT(svcrdma_wc_reply_flush);
+DEFINE_SEND_FLUSH_EVENT(svcrdma_wc_reply_err);
+
TRACE_EVENT(svcrdma_qp_error,
TP_PROTO(
const struct ib_event *event,
@@ -197,28 +197,6 @@ void svc_rdma_cc_release(struct svcxprt_rdma *rdma,
llist_add_batch(first, last, &rdma->sc_rw_ctxts);
}
-/* State for sending a Write or Reply chunk.
- * - Tracks progress of writing one chunk over all its segments
- * - Stores arguments for the SGL constructor functions
- */
-struct svc_rdma_write_info {
- struct svcxprt_rdma *wi_rdma;
-
- const struct svc_rdma_chunk *wi_chunk;
-
- /* write state of this chunk */
- unsigned int wi_seg_off;
- unsigned int wi_seg_no;
-
- /* SGL constructor arguments */
- const struct xdr_buf *wi_xdr;
- unsigned char *wi_base;
- unsigned int wi_next_off;
-
- struct svc_rdma_chunk_ctxt wi_cc;
- struct work_struct wi_work;
-};
-
static struct svc_rdma_write_info *
svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma,
const struct svc_rdma_chunk *chunk)
@@ -252,6 +230,43 @@ static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
queue_work(svcrdma_wq, &info->wi_work);
}
+static void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma,
+ struct svc_rdma_chunk_ctxt *cc)
+{
+ svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
+ svc_rdma_cc_release(rdma, cc, DMA_TO_DEVICE);
+}
+
+/**
+ * svc_rdma_reply_done - Reply chunk Write completion handler
+ * @cq: controlling Completion Queue
+ * @wc: Work Completion report
+ *
+ * Pages under I/O are released by a subsequent Send completion.
+ */
+static void svc_rdma_reply_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_chunk_ctxt *cc =
+ container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
+ struct svcxprt_rdma *rdma = cq->cq_context;
+
+ switch (wc->status) {
+ case IB_WC_SUCCESS:
+ trace_svcrdma_wc_reply(&cc->cc_cid);
+ svc_rdma_reply_chunk_release(rdma, cc);
+ return;
+ case IB_WC_WR_FLUSH_ERR:
+ trace_svcrdma_wc_reply_flush(wc, &cc->cc_cid);
+ break;
+ default:
+ trace_svcrdma_wc_reply_err(wc, &cc->cc_cid);
+ }
+
+ svc_rdma_reply_chunk_release(rdma, cc);
+ svc_xprt_deferred_close(&rdma->sc_xprt);
+}
+
/**
* svc_rdma_write_done - Write chunk completion
* @cq: controlling Completion Queue
@@ -624,7 +639,8 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
/**
* svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
* @rdma: controlling RDMA transport
- * @rctxt: Write and Reply chunks from client
+ * @rctxt: Write and Reply chunks provisioned by the client
+ * @sctxt: Send WR resources
* @xdr: xdr_buf containing an RPC Reply
*
* Returns a non-negative number of bytes the chunk consumed, or
@@ -636,37 +652,34 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
*/
int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
const struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rdma_send_ctxt *sctxt,
const struct xdr_buf *xdr)
{
- struct svc_rdma_write_info *info;
- struct svc_rdma_chunk_ctxt *cc;
- struct svc_rdma_chunk *chunk;
+ struct svc_rdma_write_info *info = &sctxt->sc_reply_info;
+ struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
int ret;
- if (pcl_is_empty(&rctxt->rc_reply_pcl))
- return 0;
+ if (likely(pcl_is_empty(&rctxt->rc_reply_pcl)))
+ return 0; /* client provided no Reply chunk */
- chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
- info = svc_rdma_write_info_alloc(rdma, chunk);
- if (!info)
- return -ENOMEM;
- cc = &info->wi_cc;
+ info->wi_rdma = rdma;
+ info->wi_chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
+ info->wi_seg_off = 0;
+ info->wi_seg_no = 0;
+ svc_rdma_cc_init(rdma, &info->wi_cc);
+ info->wi_cc.cc_cqe.done = svc_rdma_reply_done;
ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
svc_rdma_xb_write, info);
if (ret < 0)
- goto out_err;
+ return ret;
trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount);
ret = svc_rdma_post_chunk_ctxt(rdma, cc);
if (ret < 0)
- goto out_err;
+ return ret;
return xdr->len;
-
-out_err:
- svc_rdma_write_info_free(info);
- return ret;
}
/**
@@ -1012,7 +1012,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
if (!p)
goto put_ctxt;
- ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res);
+ ret = svc_rdma_send_reply_chunk(rdma, rctxt, sctxt, &rqstp->rq_res);
if (ret < 0)
goto reply_chunk;
rc_size = ret;