@@ -210,6 +210,7 @@ struct svc_rdma_recv_ctxt {
*/
struct svc_rdma_write_info {
struct svcxprt_rdma *wi_rdma;
+ struct list_head wi_list;
const struct svc_rdma_chunk *wi_chunk;
@@ -238,7 +239,10 @@ struct svc_rdma_send_ctxt {
struct ib_cqe sc_cqe;
struct xdr_buf sc_hdrbuf;
struct xdr_stream sc_stream;
+
+ struct list_head sc_write_info_list;
struct svc_rdma_write_info sc_reply_info;
+
void *sc_xprt_buf;
int sc_page_count;
int sc_cur_sge_no;
@@ -270,11 +274,14 @@ extern void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
extern void svc_rdma_cc_release(struct svcxprt_rdma *rdma,
struct svc_rdma_chunk_ctxt *cc,
enum dma_data_direction dir);
+extern void svc_rdma_write_chunk_release(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *ctxt);
extern void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt);
-extern int svc_rdma_send_write_list(struct svcxprt_rdma *rdma,
- const struct svc_rdma_recv_ctxt *rctxt,
- const struct xdr_buf *xdr);
+extern int svc_rdma_prepare_write_list(struct svcxprt_rdma *rdma,
+ const struct svc_rdma_pcl *write_pcl,
+ struct svc_rdma_send_ctxt *sctxt,
+ const struct xdr_buf *xdr);
extern int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma,
const struct svc_rdma_pcl *write_pcl,
const struct svc_rdma_pcl *reply_pcl,
@@ -230,6 +230,28 @@ static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
queue_work(svcrdma_wq, &info->wi_work);
}
+/**
+ * svc_rdma_write_chunk_release - Release Write chunk I/O resources
+ * @rdma: controlling transport
+ * @ctxt: Send context that is being released
+ */
+void svc_rdma_write_chunk_release(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *ctxt)
+{
+ struct svc_rdma_write_info *info;
+ struct svc_rdma_chunk_ctxt *cc;
+
+ while (!list_empty(&ctxt->sc_write_info_list)) {
+ info = list_first_entry(&ctxt->sc_write_info_list,
+ struct svc_rdma_write_info, wi_list);
+ list_del(&info->wi_list);
+
+ cc = &info->wi_cc;
+ svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
+ svc_rdma_write_info_free(info);
+ }
+}
+
/**
* svc_rdma_reply_chunk_release - Release Reply chunk I/O resources
* @rdma: controlling transport
@@ -286,13 +308,11 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
struct ib_cqe *cqe = wc->wr_cqe;
struct svc_rdma_chunk_ctxt *cc =
container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
- struct svc_rdma_write_info *info =
- container_of(cc, struct svc_rdma_write_info, wi_cc);
switch (wc->status) {
case IB_WC_SUCCESS:
trace_svcrdma_wc_write(&cc->cc_cid);
- break;
+ return;
case IB_WC_WR_FLUSH_ERR:
trace_svcrdma_wc_write_flush(wc, &cc->cc_cid);
break;
@@ -300,12 +320,11 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
trace_svcrdma_wc_write_err(wc, &cc->cc_cid);
}
- svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
-
- if (unlikely(wc->status != IB_WC_SUCCESS))
- svc_xprt_deferred_close(&rdma->sc_xprt);
-
- svc_rdma_write_info_free(info);
+ /* The RDMA Write has flushed, so the client won't get
+ * some of the outgoing RPC message. Signal the loss
+ * to the client by closing the connection.
+ */
+ svc_xprt_deferred_close(&rdma->sc_xprt);
}
/**
@@ -601,13 +620,19 @@ static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data)
return xdr->len;
}
-static int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
- const struct svc_rdma_chunk *chunk,
- const struct xdr_buf *xdr)
+/* Link Write WRs for @chunk onto @sctxt's WR chain.
+ */
+static int svc_rdma_prepare_write_chunk(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *sctxt,
+ const struct svc_rdma_chunk *chunk,
+ const struct xdr_buf *xdr)
{
struct svc_rdma_write_info *info;
struct svc_rdma_chunk_ctxt *cc;
+ struct ib_send_wr *first_wr;
struct xdr_buf payload;
+ struct list_head *pos;
+ struct ib_cqe *cqe;
int ret;
if (xdr_buf_subsegment(xdr, &payload, chunk->ch_position,
@@ -623,10 +648,25 @@ static int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
if (ret != payload.len)
goto out_err;
- trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
- ret = svc_rdma_post_chunk_ctxt(rdma, cc);
- if (ret < 0)
+ ret = -EINVAL;
+ if (unlikely(cc->cc_sqecount > rdma->sc_sq_depth))
goto out_err;
+
+ first_wr = sctxt->sc_wr_chain;
+ cqe = &cc->cc_cqe;
+ list_for_each(pos, &cc->cc_rwctxts) {
+ struct svc_rdma_rw_ctxt *rwc;
+
+ rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list);
+ first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp,
+ rdma->sc_port_num, cqe, first_wr);
+ cqe = NULL;
+ }
+ sctxt->sc_wr_chain = first_wr;
+ sctxt->sc_sqecount += cc->cc_sqecount;
+ list_add(&info->wi_list, &sctxt->sc_write_info_list);
+
+ trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
return 0;
out_err:
@@ -635,25 +675,27 @@ static int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
}
/**
- * svc_rdma_send_write_list - Send all chunks on the Write list
+ * svc_rdma_prepare_write_list - Construct WR chain for sending Write list
* @rdma: controlling RDMA transport
- * @rctxt: Write list provisioned by the client
+ * @write_pcl: Write list provisioned by the client
+ * @sctxt: Send WR resources
* @xdr: xdr_buf containing an RPC Reply message
*
* Returns zero on success, or a negative errno if one or more
* Write chunks could not be sent.
*/
-int svc_rdma_send_write_list(struct svcxprt_rdma *rdma,
- const struct svc_rdma_recv_ctxt *rctxt,
- const struct xdr_buf *xdr)
+int svc_rdma_prepare_write_list(struct svcxprt_rdma *rdma,
+ const struct svc_rdma_pcl *write_pcl,
+ struct svc_rdma_send_ctxt *sctxt,
+ const struct xdr_buf *xdr)
{
struct svc_rdma_chunk *chunk;
int ret;
- pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
+ pcl_for_each_chunk(chunk, write_pcl) {
if (!chunk->ch_payload_length)
break;
- ret = svc_rdma_send_write_chunk(rdma, chunk, xdr);
+ ret = svc_rdma_prepare_write_chunk(rdma, sctxt, chunk, xdr);
if (ret < 0)
return ret;
}
@@ -142,6 +142,7 @@ svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
ctxt->sc_cqe.done = svc_rdma_wc_send;
+ INIT_LIST_HEAD(&ctxt->sc_write_info_list);
ctxt->sc_xprt_buf = buffer;
xdr_buf_init(&ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
rdma->sc_max_req_size);
@@ -227,6 +228,7 @@ static void svc_rdma_send_ctxt_release(struct svcxprt_rdma *rdma,
struct ib_device *device = rdma->sc_cm_id->device;
unsigned int i;
+ svc_rdma_write_chunk_release(rdma, ctxt);
svc_rdma_reply_chunk_release(rdma, ctxt);
if (ctxt->sc_page_count)
@@ -1013,7 +1015,8 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
if (!p)
goto put_ctxt;
- ret = svc_rdma_send_write_list(rdma, rctxt, &rqstp->rq_res);
+ ret = svc_rdma_prepare_write_list(rdma, &rctxt->rc_write_pcl, sctxt,
+ &rqstp->rq_res);
if (ret < 0)
goto put_ctxt;