@@ -1447,9 +1447,44 @@ DECLARE_EVENT_CLASS(svcrdma_segment_event,
TP_ARGS(handle, length, offset))
DEFINE_SEGMENT_EVENT(send_rseg);
-DEFINE_SEGMENT_EVENT(encode_wseg);
DEFINE_SEGMENT_EVENT(send_wseg);
+TRACE_EVENT(svcrdma_encode_wseg,
+ TP_PROTO(
+ const struct svc_rdma_send_ctxt *ctxt,
+ u32 segno,
+ u32 handle,
+ u32 length,
+ u64 offset
+ ),
+
+ TP_ARGS(ctxt, segno, handle, length, offset),
+
+ TP_STRUCT__entry(
+ __field(u32, cq_id)
+ __field(int, completion_id)
+ __field(u32, segno)
+ __field(u32, handle)
+ __field(u32, length)
+ __field(u64, offset)
+ ),
+
+ TP_fast_assign(
+ __entry->cq_id = ctxt->sc_cid.ci_queue_id;
+ __entry->completion_id = ctxt->sc_cid.ci_completion_id;
+ __entry->segno = segno;
+ __entry->handle = handle;
+ __entry->length = length;
+ __entry->offset = offset;
+ ),
+
+ TP_printk("cq_id=%u cid=%d segno=%u %u@0x%016llx:0x%08x",
+ __entry->cq_id, __entry->completion_id,
+ __entry->segno, __entry->length,
+ (unsigned long long)__entry->offset, __entry->handle
+ )
+);
+
TRACE_EVENT(svcrdma_decode_rseg,
TP_PROTO(
const struct rpc_rdma_cid *cid,
@@ -358,49 +358,42 @@ static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt)
/**
* svc_rdma_encode_write_segment - Encode one Write segment
- * @src: matching Write chunk in the RPC Call header
* @sctxt: Send context for the RPC Reply
+ * @chunk: Write chunk to push
* @remaining: remaining bytes of the payload left in the Write chunk
+ * @segno: which segment in the chunk
*
* Return values:
* On success, returns length in bytes of the Reply XDR buffer
- * that was consumed by the Write segment
+ * that was consumed by the Write segment, and updates @remaining
* %-EMSGSIZE on XDR buffer overflow
*/
-static ssize_t svc_rdma_encode_write_segment(__be32 *src,
- struct svc_rdma_send_ctxt *sctxt,
- unsigned int *remaining)
+static ssize_t svc_rdma_encode_write_segment(struct svc_rdma_send_ctxt *sctxt,
+ const struct svc_rdma_chunk *chunk,
+ u32 *remaining, unsigned int segno)
{
+ const struct svc_rdma_segment *segment = &chunk->ch_segments[segno];
+ const size_t len = rpcrdma_segment_maxsz * sizeof(__be32);
+ u32 length;
__be32 *p;
- const size_t len = rpcrdma_segment_maxsz * sizeof(*p);
- u32 handle, length;
- u64 offset;
p = xdr_reserve_space(&sctxt->sc_stream, len);
if (!p)
return -EMSGSIZE;
- xdr_decode_rdma_segment(src, &handle, &length, &offset);
-
- if (*remaining < length) {
- /* segment only partly filled */
- length = *remaining;
- *remaining = 0;
- } else {
- /* entire segment was consumed */
- *remaining -= length;
- }
- xdr_encode_rdma_segment(p, handle, length, offset);
-
- trace_svcrdma_encode_wseg(handle, length, offset);
+ length = min_t(u32, *remaining, segment->rs_length);
+ *remaining -= length;
+ xdr_encode_rdma_segment(p, segment->rs_handle, length,
+ segment->rs_offset);
+ trace_svcrdma_encode_wseg(sctxt, segno, segment->rs_handle, length,
+ segment->rs_offset);
return len;
}
/**
* svc_rdma_encode_write_chunk - Encode one Write chunk
- * @src: matching Write chunk in the RPC Call header
* @sctxt: Send context for the RPC Reply
- * @remaining: size in bytes of the payload in the Write chunk
+ * @chunk: Write chunk to push
*
* Copy a Write chunk from the Call transport header to the
* Reply transport header. Update each segment's length field
@@ -411,33 +404,30 @@ static ssize_t svc_rdma_encode_write_segment(__be32 *src,
* that was consumed by the Write chunk
* %-EMSGSIZE on XDR buffer overflow
*/
-static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
- struct svc_rdma_send_ctxt *sctxt,
- unsigned int remaining)
+static ssize_t svc_rdma_encode_write_chunk(struct svc_rdma_send_ctxt *sctxt,
+ const struct svc_rdma_chunk *chunk)
{
- unsigned int i, nsegs;
+ u32 remaining = chunk->ch_payload_length;
+ unsigned int segno;
ssize_t len, ret;
- len = 0;
trace_svcrdma_encode_write_chunk(remaining);
- src++;
+ len = 0;
ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
if (ret < 0)
- return -EMSGSIZE;
+ return ret;
len += ret;
- nsegs = be32_to_cpup(src++);
- ret = xdr_stream_encode_u32(&sctxt->sc_stream, nsegs);
+ ret = xdr_stream_encode_u32(&sctxt->sc_stream, chunk->ch_segcount);
if (ret < 0)
- return -EMSGSIZE;
+ return ret;
len += ret;
- for (i = nsegs; i; i--) {
- ret = svc_rdma_encode_write_segment(src, sctxt, &remaining);
+ for (segno = 0; segno < chunk->ch_segcount; segno++) {
+ ret = svc_rdma_encode_write_segment(sctxt, chunk, &remaining, segno);
if (ret < 0)
- return -EMSGSIZE;
- src += rpcrdma_segment_maxsz;
+ return ret;
len += ret;
}
@@ -449,34 +439,23 @@ static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
* @rctxt: Reply context with information about the RPC Call
* @sctxt: Send context for the RPC Reply
*
- * The client provides a Write chunk list in the Call message. Fill
- * in the segments in the first Write chunk in the Reply's transport
- * header with the number of bytes consumed in each segment.
- * Remaining chunks are returned unused.
- *
- * Assumptions:
- * - Client has provided only one Write chunk
- *
* Return values:
* On success, returns length in bytes of the Reply XDR buffer
* that was consumed by the Reply's Write list
* %-EMSGSIZE on XDR buffer overflow
*/
-static ssize_t
-svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
- struct svc_rdma_send_ctxt *sctxt)
+static ssize_t svc_rdma_encode_write_list(struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rdma_send_ctxt *sctxt)
{
struct svc_rdma_chunk *chunk;
ssize_t len, ret;
len = 0;
- if (rctxt->rc_write_list) {
- chunk = pcl_first_chunk(&rctxt->rc_write_pcl);
- ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt,
- chunk->ch_payload_length);
+ pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
+ ret = svc_rdma_encode_write_chunk(sctxt, chunk);
if (ret < 0)
return ret;
- len = ret;
+ len += ret;
}
/* Terminate the Write list */
@@ -493,24 +472,28 @@ svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
* @sctxt: Send context for the RPC Reply
* @length: size in bytes of the payload in the Reply chunk
*
- * Assumptions:
- * - Reply can always fit in the client-provided Reply chunk
- *
* Return values:
* On success, returns length in bytes of the Reply XDR buffer
* that was consumed by the Reply's Reply chunk
* %-EMSGSIZE on XDR buffer overflow
+ * %-E2BIG if the RPC message is larger than the Reply chunk
*/
static ssize_t
-svc_rdma_encode_reply_chunk(const struct svc_rdma_recv_ctxt *rctxt,
+svc_rdma_encode_reply_chunk(struct svc_rdma_recv_ctxt *rctxt,
struct svc_rdma_send_ctxt *sctxt,
unsigned int length)
{
- if (!rctxt->rc_reply_chunk)
+ struct svc_rdma_chunk *chunk;
+
+ if (pcl_is_empty(&rctxt->rc_reply_pcl))
return xdr_stream_encode_item_absent(&sctxt->sc_stream);
- return svc_rdma_encode_write_chunk(rctxt->rc_reply_chunk, sctxt,
- length);
+ chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
+ if (length > chunk->ch_length)
+ return -E2BIG;
+
+ chunk->ch_payload_length = length;
+ return svc_rdma_encode_write_chunk(sctxt, chunk);
}
static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
@@ -928,7 +911,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
*p++ = *rdma_argp;
*p++ = *(rdma_argp + 1);
*p++ = rdma->sc_fc_credits;
- *p = rctxt->rc_reply_chunk ? rdma_nomsg : rdma_msg;
+ *p = pcl_is_empty(&rctxt->rc_reply_pcl) ? rdma_msg : rdma_nomsg;
if (svc_rdma_encode_read_list(sctxt) < 0)
goto err0;
Refactor: Instead of re-parsing the ingress RPC Call transport header when constructing the egress RPC Reply transport header, use the new parsed Write list and Reply chunk, which are version- agnostic and already XDR decoded. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> --- include/trace/events/rpcrdma.h | 37 +++++++++++- net/sunrpc/xprtrdma/svc_rdma_sendto.c | 105 ++++++++++++++------------------- 2 files changed, 80 insertions(+), 62 deletions(-)