diff mbox series

[v2,1/4] SUNRPC: Convert svc_tcp_sendmsg to use bio_vecs directly

Message ID 168935823761.1984.15760913629466718014.stgit@manet.1015granger.net (mailing list archive)
State Superseded
Delegated to: Netdev Maintainers
Headers show
Series Send RPC-on-TCP with one sock_sendmsg() call | expand

Checks

Context Check Description
netdev/series_format warning Target tree name not specified in the subject
netdev/tree_selection success Guessed tree name to be net-next, async
netdev/fixes_present success Fixes tag not required for -next series
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 1371 this patch: 1371
netdev/cc_maintainers warning 11 maintainers not CCed: kuba@kernel.org anna@kernel.org neilb@suse.de tom@talpey.com kolga@netapp.com trond.myklebust@hammerspace.com Dai.Ngo@oracle.com davem@davemloft.net pabeni@redhat.com edumazet@google.com jlayton@kernel.org
netdev/build_clang success Errors and warnings before: 1691 this patch: 1691
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/deprecated_api success None detected
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 1394 this patch: 1394
netdev/checkpatch success total: 0 errors, 0 warnings, 0 checks, 179 lines checked
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline success Was 0 now: 0

Commit Message

Chuck Lever July 14, 2023, 6:10 p.m. UTC
From: Chuck Lever <chuck.lever@oracle.com>

Add a helper to convert a whole xdr_buf directly into an array of
bio_vecs, then send this array instead of iterating piecemeal over
the xdr_buf containing the outbound RPC message.

Note that the rules of the RPC protocol mean there can be only one
outstanding send at a time on a transport socket. The kernel's
SunRPC server enforces this via the transport's xpt_mutex. Thus we
can use a per-transport shared array for the xdr_buf conversion
rather than allocate one every time or use one that is part of
struct svc_rqst.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/linux/sunrpc/svcsock.h |    3 ++
 include/linux/sunrpc/xdr.h     |    2 +
 net/sunrpc/svcsock.c           |   59 ++++++++++++++--------------------------
 net/sunrpc/xdr.c               |   50 ++++++++++++++++++++++++++++++++++
 4 files changed, 75 insertions(+), 39 deletions(-)

Comments

Jeff Layton Aug. 12, 2023, 12:04 p.m. UTC | #1
On Fri, 2023-07-14 at 14:10 -0400, Chuck Lever wrote:
> From: Chuck Lever <chuck.lever@oracle.com>
> 
> Add a helper to convert a whole xdr_buf directly into an array of
> bio_vecs, then send this array instead of iterating piecemeal over
> the xdr_buf containing the outbound RPC message.
> 
> Note that the rules of the RPC protocol mean there can be only one
> outstanding send at a time on a transport socket. The kernel's
> SunRPC server enforces this via the transport's xpt_mutex. Thus we
> can use a per-transport shared array for the xdr_buf conversion
> rather than allocate one every time or use one that is part of
> struct svc_rqst.
> 
> Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
> ---
>  include/linux/sunrpc/svcsock.h |    3 ++
>  include/linux/sunrpc/xdr.h     |    2 +
>  net/sunrpc/svcsock.c           |   59 ++++++++++++++--------------------------
>  net/sunrpc/xdr.c               |   50 ++++++++++++++++++++++++++++++++++
>  4 files changed, 75 insertions(+), 39 deletions(-)
> 

I've seen some pynfs test regressions in mainline (v6.5-rc5-ish)
kernels. Here's one failing test:

_text = b'write data' # len=10

[...]

def testSimpleWrite2(t, env):
    """WRITE with stateid=zeros changing size

    FLAGS: write all
    DEPEND: MKFILE
    CODE: WRT1b
    """
    c = env.c1
    c.init_connection()
    attrs = {FATTR4_SIZE: 32, FATTR4_MODE: 0o644}
    fh, stateid = c.create_confirm(t.word(), attrs=attrs,
                                   deny=OPEN4_SHARE_DENY_NONE)
    res = c.write_file(fh, _text, 30)
    check(res, msg="WRITE with stateid=zeros changing size")
    res = c.read_file(fh, 25, 20)
    _compare(t, res, b'\0'*5 + _text, True)

This test writes 10 bytes of data (to a file at offset 30, and then does
a 20 byte read starting at offset 25. The READ reply has NULs where the
written data should be

The patch that broke things is this one:

    5df5dd03a8f7 sunrpc: Use sendmsg(MSG_SPLICE_PAGES) rather then sendpage

This patch fixes the problem and gets the test run "green" again. I
think we will probably want to send this patch to mainline for v6.5, but
it'd be good to understand what's broken and how this fixes it.

Do you (or David) have any insight?

It'd also be good to understand whether we also need to fix UDP. pynfs
is tcp-only, so I can't run the same test there as easily.

> diff --git a/include/linux/sunrpc/svcsock.h b/include/linux/sunrpc/svcsock.h
> index a7116048a4d4..a9bfeadf4cbe 100644
> --- a/include/linux/sunrpc/svcsock.h
> +++ b/include/linux/sunrpc/svcsock.h
> @@ -40,6 +40,9 @@ struct svc_sock {
>  
>  	struct completion	sk_handshake_done;
>  
> +	struct bio_vec		sk_send_bvec[RPCSVC_MAXPAGES]
> +						____cacheline_aligned;
> +
>  	struct page *		sk_pages[RPCSVC_MAXPAGES];	/* received data */
>  };
>  
> diff --git a/include/linux/sunrpc/xdr.h b/include/linux/sunrpc/xdr.h
> index f89ec4b5ea16..42f9d7eb9a1a 100644
> --- a/include/linux/sunrpc/xdr.h
> +++ b/include/linux/sunrpc/xdr.h
> @@ -139,6 +139,8 @@ void	xdr_terminate_string(const struct xdr_buf *, const u32);
>  size_t	xdr_buf_pagecount(const struct xdr_buf *buf);
>  int	xdr_alloc_bvec(struct xdr_buf *buf, gfp_t gfp);
>  void	xdr_free_bvec(struct xdr_buf *buf);
> +unsigned int xdr_buf_to_bvec(struct bio_vec *bvec, unsigned int bvec_size,
> +			     const struct xdr_buf *xdr);
>  
>  static inline __be32 *xdr_encode_array(__be32 *p, const void *s, unsigned int len)
>  {
> diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
> index e43f26382411..e35e5afe4b81 100644
> --- a/net/sunrpc/svcsock.c
> +++ b/net/sunrpc/svcsock.c
> @@ -36,6 +36,8 @@
>  #include <linux/skbuff.h>
>  #include <linux/file.h>
>  #include <linux/freezer.h>
> +#include <linux/bvec.h>
> +
>  #include <net/sock.h>
>  #include <net/checksum.h>
>  #include <net/ip.h>
> @@ -1194,72 +1196,52 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
>  	return 0;	/* record not complete */
>  }
>  
> -static int svc_tcp_send_kvec(struct socket *sock, const struct kvec *vec,
> -			      int flags)
> -{
> -	struct msghdr msg = { .msg_flags = MSG_SPLICE_PAGES | flags, };
> -
> -	iov_iter_kvec(&msg.msg_iter, ITER_SOURCE, vec, 1, vec->iov_len);
> -	return sock_sendmsg(sock, &msg);
> -}
> -
>  /*
>   * MSG_SPLICE_PAGES is used exclusively to reduce the number of
>   * copy operations in this path. Therefore the caller must ensure
>   * that the pages backing @xdr are unchanging.
>   *
> - * In addition, the logic assumes that * .bv_len is never larger
> - * than PAGE_SIZE.
> + * Note that the send is non-blocking. The caller has incremented
> + * the reference count on each page backing the RPC message, and
> + * the network layer will "put" these pages when transmission is
> + * complete.
> + *
> + * This is safe for our RPC services because the memory backing
> + * the head and tail components is never kmalloc'd. These always
> + * come from pages in the svc_rqst::rq_pages array.
>   */
> -static int svc_tcp_sendmsg(struct socket *sock, struct xdr_buf *xdr,
> +static int svc_tcp_sendmsg(struct svc_sock *svsk, struct xdr_buf *xdr,
>  			   rpc_fraghdr marker, unsigned int *sentp)
>  {
> -	const struct kvec *head = xdr->head;
> -	const struct kvec *tail = xdr->tail;
>  	struct kvec rm = {
>  		.iov_base	= &marker,
>  		.iov_len	= sizeof(marker),
>  	};
>  	struct msghdr msg = {
> -		.msg_flags	= 0,
> +		.msg_flags	= MSG_MORE,
>  	};
> +	unsigned int count;
>  	int ret;
>  
>  	*sentp = 0;
> -	ret = xdr_alloc_bvec(xdr, GFP_KERNEL);
> -	if (ret < 0)
> -		return ret;
>  
> -	ret = kernel_sendmsg(sock, &msg, &rm, 1, rm.iov_len);
> +	ret = kernel_sendmsg(svsk->sk_sock, &msg, &rm, 1, rm.iov_len);
>  	if (ret < 0)
>  		return ret;
>  	*sentp += ret;
>  	if (ret != rm.iov_len)
>  		return -EAGAIN;
>  
> -	ret = svc_tcp_send_kvec(sock, head, 0);
> -	if (ret < 0)
> -		return ret;
> -	*sentp += ret;
> -	if (ret != head->iov_len)
> -		goto out;
> +	count = xdr_buf_to_bvec(svsk->sk_send_bvec,
> +				ARRAY_SIZE(svsk->sk_send_bvec), xdr);
>  
>  	msg.msg_flags = MSG_SPLICE_PAGES;
> -	iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, xdr->bvec,
> -		      xdr_buf_pagecount(xdr), xdr->page_len);
> -	ret = sock_sendmsg(sock, &msg);
> +	iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, svsk->sk_send_bvec,
> +		      count, xdr->len);
> +	ret = sock_sendmsg(svsk->sk_sock, &msg);
>  	if (ret < 0)
>  		return ret;
>  	*sentp += ret;
> -
> -	if (tail->iov_len) {
> -		ret = svc_tcp_send_kvec(sock, tail, 0);
> -		if (ret < 0)
> -			return ret;
> -		*sentp += ret;
> -	}
> -
> -out:
>  	return 0;
>  }
>  
> @@ -1290,8 +1272,7 @@ static int svc_tcp_sendto(struct svc_rqst *rqstp)
>  	if (svc_xprt_is_dead(xprt))
>  		goto out_notconn;
>  	tcp_sock_set_cork(svsk->sk_sk, true);
> -	err = svc_tcp_sendmsg(svsk->sk_sock, xdr, marker, &sent);
> -	xdr_free_bvec(xdr);
> +	err = svc_tcp_sendmsg(svsk, xdr, marker, &sent);
>  	trace_svcsock_tcp_send(xprt, err < 0 ? (long)err : sent);
>  	if (err < 0 || sent != (xdr->len + sizeof(marker)))
>  		goto out_close;
> diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
> index 2a22e78af116..358e6de91775 100644
> --- a/net/sunrpc/xdr.c
> +++ b/net/sunrpc/xdr.c
> @@ -164,6 +164,56 @@ xdr_free_bvec(struct xdr_buf *buf)
>  	buf->bvec = NULL;
>  }
>  
> +/**
> + * xdr_buf_to_bvec - Copy components of an xdr_buf into a bio_vec array
> + * @bvec: bio_vec array to populate
> + * @bvec_size: element count of @bio_vec
> + * @xdr: xdr_buf to be copied
> + *
> + * Returns the number of entries consumed in @bvec.
> + */
> +unsigned int xdr_buf_to_bvec(struct bio_vec *bvec, unsigned int bvec_size,
> +			     const struct xdr_buf *xdr)
> +{
> +	const struct kvec *head = xdr->head;
> +	const struct kvec *tail = xdr->tail;
> +	unsigned int count = 0;
> +
> +	if (head->iov_len) {
> +		bvec_set_virt(bvec++, head->iov_base, head->iov_len);
> +		++count;
> +	}
> +
> +	if (xdr->page_len) {
> +		unsigned int offset, len, remaining;
> +		struct page **pages = xdr->pages;
> +
> +		offset = offset_in_page(xdr->page_base);
> +		remaining = xdr->page_len;
> +		while (remaining > 0) {
> +			len = min_t(unsigned int, remaining,
> +				    PAGE_SIZE - offset);
> +			bvec_set_page(bvec++, *pages++, len, offset);
> +			remaining -= len;
> +			offset = 0;
> +			if (unlikely(++count > bvec_size))
> +				goto bvec_overflow;
> +		}
> +	}
> +
> +	if (tail->iov_len) {
> +		bvec_set_virt(bvec, tail->iov_base, tail->iov_len);
> +		if (unlikely(++count > bvec_size))
> +			goto bvec_overflow;
> +	}
> +
> +	return count;
> +
> +bvec_overflow:
> +	pr_warn_once("%s: bio_vec array overflow\n", __func__);
> +	return count - 1;
> +}
> +
>  /**
>   * xdr_inline_pages - Prepare receive buffer for a large reply
>   * @xdr: xdr_buf into which reply will be placed
> 
>
Chuck Lever Aug. 13, 2023, 4:04 p.m. UTC | #2
On Sat, Aug 12, 2023 at 08:04:57AM -0400, Jeff Layton wrote:
> On Fri, 2023-07-14 at 14:10 -0400, Chuck Lever wrote:
> > From: Chuck Lever <chuck.lever@oracle.com>
> > 
> > Add a helper to convert a whole xdr_buf directly into an array of
> > bio_vecs, then send this array instead of iterating piecemeal over
> > the xdr_buf containing the outbound RPC message.
> > 
> > Note that the rules of the RPC protocol mean there can be only one
> > outstanding send at a time on a transport socket. The kernel's
> > SunRPC server enforces this via the transport's xpt_mutex. Thus we
> > can use a per-transport shared array for the xdr_buf conversion
> > rather than allocate one every time or use one that is part of
> > struct svc_rqst.
> > 
> > Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
> > ---
> >  include/linux/sunrpc/svcsock.h |    3 ++
> >  include/linux/sunrpc/xdr.h     |    2 +
> >  net/sunrpc/svcsock.c           |   59 ++++++++++++++--------------------------
> >  net/sunrpc/xdr.c               |   50 ++++++++++++++++++++++++++++++++++
> >  4 files changed, 75 insertions(+), 39 deletions(-)
> > 
> 
> I've seen some pynfs test regressions in mainline (v6.5-rc5-ish)
> kernels. Here's one failing test:
> 
> _text = b'write data' # len=10
> 
> [...]
> 
> def testSimpleWrite2(t, env):
>     """WRITE with stateid=zeros changing size
> 
>     FLAGS: write all
>     DEPEND: MKFILE
>     CODE: WRT1b
>     """
>     c = env.c1
>     c.init_connection()
>     attrs = {FATTR4_SIZE: 32, FATTR4_MODE: 0o644}
>     fh, stateid = c.create_confirm(t.word(), attrs=attrs,
>                                    deny=OPEN4_SHARE_DENY_NONE)
>     res = c.write_file(fh, _text, 30)
>     check(res, msg="WRITE with stateid=zeros changing size")
>     res = c.read_file(fh, 25, 20)
>     _compare(t, res, b'\0'*5 + _text, True)
> 
> This test writes 10 bytes of data (to a file at offset 30, and then does
> a 20 byte read starting at offset 25. The READ reply has NULs where the
> written data should be

Nice catch. I hope this is something you found with your nascent
kdevops rig...? :^)


> The patch that broke things is this one:
> 
>     5df5dd03a8f7 sunrpc: Use sendmsg(MSG_SPLICE_PAGES) rather then sendpage
> 
> This patch fixes the problem and gets the test run "green" again.

Note that the version of the patch in this reply is not the version
that is applied to nfsd-next. That might not make a difference,
though.


> I think we will probably want to send this patch to mainline for v6.5, but
> it'd be good to understand what's broken and how this fixes it.

I agree, I'd like to get a root cause before proceeding with a fix.


> Do you (or David) have any insight?

It's often the case that this kind of problem is because either the
send or receive code doesn't handle a non-zero xdr_buf::page_base
correctly. A failing READ at a non-page-aligned offset is a typical
feature of this kind of bug.

This class of bug slips through the cracks when testing with POSIX
pagecache-based clients because such clients only rarely send non-
aligned READ requests.


> It'd also be good to understand whether we also need to fix UDP. pynfs
> is tcp-only, so I can't run the same test there as easily.

We don't have a good story for testing UDP. We should either build
a proper test infrastructure, or cut bait and remove UDP support.
But once you spot the incorrect code in the TCP send path, I bet
it won't be hard to see whether UDP also has this problem.


> > diff --git a/include/linux/sunrpc/svcsock.h b/include/linux/sunrpc/svcsock.h
> > index a7116048a4d4..a9bfeadf4cbe 100644
> > --- a/include/linux/sunrpc/svcsock.h
> > +++ b/include/linux/sunrpc/svcsock.h
> > @@ -40,6 +40,9 @@ struct svc_sock {
> >  
> >  	struct completion	sk_handshake_done;
> >  
> > +	struct bio_vec		sk_send_bvec[RPCSVC_MAXPAGES]
> > +						____cacheline_aligned;
> > +
> >  	struct page *		sk_pages[RPCSVC_MAXPAGES];	/* received data */
> >  };
> >  
> > diff --git a/include/linux/sunrpc/xdr.h b/include/linux/sunrpc/xdr.h
> > index f89ec4b5ea16..42f9d7eb9a1a 100644
> > --- a/include/linux/sunrpc/xdr.h
> > +++ b/include/linux/sunrpc/xdr.h
> > @@ -139,6 +139,8 @@ void	xdr_terminate_string(const struct xdr_buf *, const u32);
> >  size_t	xdr_buf_pagecount(const struct xdr_buf *buf);
> >  int	xdr_alloc_bvec(struct xdr_buf *buf, gfp_t gfp);
> >  void	xdr_free_bvec(struct xdr_buf *buf);
> > +unsigned int xdr_buf_to_bvec(struct bio_vec *bvec, unsigned int bvec_size,
> > +			     const struct xdr_buf *xdr);
> >  
> >  static inline __be32 *xdr_encode_array(__be32 *p, const void *s, unsigned int len)
> >  {
> > diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
> > index e43f26382411..e35e5afe4b81 100644
> > --- a/net/sunrpc/svcsock.c
> > +++ b/net/sunrpc/svcsock.c
> > @@ -36,6 +36,8 @@
> >  #include <linux/skbuff.h>
> >  #include <linux/file.h>
> >  #include <linux/freezer.h>
> > +#include <linux/bvec.h>
> > +
> >  #include <net/sock.h>
> >  #include <net/checksum.h>
> >  #include <net/ip.h>
> > @@ -1194,72 +1196,52 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
> >  	return 0;	/* record not complete */
> >  }
> >  
> > -static int svc_tcp_send_kvec(struct socket *sock, const struct kvec *vec,
> > -			      int flags)
> > -{
> > -	struct msghdr msg = { .msg_flags = MSG_SPLICE_PAGES | flags, };
> > -
> > -	iov_iter_kvec(&msg.msg_iter, ITER_SOURCE, vec, 1, vec->iov_len);
> > -	return sock_sendmsg(sock, &msg);
> > -}
> > -
> >  /*
> >   * MSG_SPLICE_PAGES is used exclusively to reduce the number of
> >   * copy operations in this path. Therefore the caller must ensure
> >   * that the pages backing @xdr are unchanging.
> >   *
> > - * In addition, the logic assumes that * .bv_len is never larger
> > - * than PAGE_SIZE.
> > + * Note that the send is non-blocking. The caller has incremented
> > + * the reference count on each page backing the RPC message, and
> > + * the network layer will "put" these pages when transmission is
> > + * complete.
> > + *
> > + * This is safe for our RPC services because the memory backing
> > + * the head and tail components is never kmalloc'd. These always
> > + * come from pages in the svc_rqst::rq_pages array.
> >   */
> > -static int svc_tcp_sendmsg(struct socket *sock, struct xdr_buf *xdr,
> > +static int svc_tcp_sendmsg(struct svc_sock *svsk, struct xdr_buf *xdr,
> >  			   rpc_fraghdr marker, unsigned int *sentp)
> >  {
> > -	const struct kvec *head = xdr->head;
> > -	const struct kvec *tail = xdr->tail;
> >  	struct kvec rm = {
> >  		.iov_base	= &marker,
> >  		.iov_len	= sizeof(marker),
> >  	};
> >  	struct msghdr msg = {
> > -		.msg_flags	= 0,
> > +		.msg_flags	= MSG_MORE,
> >  	};
> > +	unsigned int count;
> >  	int ret;
> >  
> >  	*sentp = 0;
> > -	ret = xdr_alloc_bvec(xdr, GFP_KERNEL);
> > -	if (ret < 0)
> > -		return ret;
> >  
> > -	ret = kernel_sendmsg(sock, &msg, &rm, 1, rm.iov_len);
> > +	ret = kernel_sendmsg(svsk->sk_sock, &msg, &rm, 1, rm.iov_len);
> >  	if (ret < 0)
> >  		return ret;
> >  	*sentp += ret;
> >  	if (ret != rm.iov_len)
> >  		return -EAGAIN;
> >  
> > -	ret = svc_tcp_send_kvec(sock, head, 0);
> > -	if (ret < 0)
> > -		return ret;
> > -	*sentp += ret;
> > -	if (ret != head->iov_len)
> > -		goto out;
> > +	count = xdr_buf_to_bvec(svsk->sk_send_bvec,
> > +				ARRAY_SIZE(svsk->sk_send_bvec), xdr);
> >  
> >  	msg.msg_flags = MSG_SPLICE_PAGES;
> > -	iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, xdr->bvec,
> > -		      xdr_buf_pagecount(xdr), xdr->page_len);
> > -	ret = sock_sendmsg(sock, &msg);
> > +	iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, svsk->sk_send_bvec,
> > +		      count, xdr->len);
> > +	ret = sock_sendmsg(svsk->sk_sock, &msg);
> >  	if (ret < 0)
> >  		return ret;
> >  	*sentp += ret;
> > -
> > -	if (tail->iov_len) {
> > -		ret = svc_tcp_send_kvec(sock, tail, 0);
> > -		if (ret < 0)
> > -			return ret;
> > -		*sentp += ret;
> > -	}
> > -
> > -out:
> >  	return 0;
> >  }
> >  
> > @@ -1290,8 +1272,7 @@ static int svc_tcp_sendto(struct svc_rqst *rqstp)
> >  	if (svc_xprt_is_dead(xprt))
> >  		goto out_notconn;
> >  	tcp_sock_set_cork(svsk->sk_sk, true);
> > -	err = svc_tcp_sendmsg(svsk->sk_sock, xdr, marker, &sent);
> > -	xdr_free_bvec(xdr);
> > +	err = svc_tcp_sendmsg(svsk, xdr, marker, &sent);
> >  	trace_svcsock_tcp_send(xprt, err < 0 ? (long)err : sent);
> >  	if (err < 0 || sent != (xdr->len + sizeof(marker)))
> >  		goto out_close;
> > diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
> > index 2a22e78af116..358e6de91775 100644
> > --- a/net/sunrpc/xdr.c
> > +++ b/net/sunrpc/xdr.c
> > @@ -164,6 +164,56 @@ xdr_free_bvec(struct xdr_buf *buf)
> >  	buf->bvec = NULL;
> >  }
> >  
> > +/**
> > + * xdr_buf_to_bvec - Copy components of an xdr_buf into a bio_vec array
> > + * @bvec: bio_vec array to populate
> > + * @bvec_size: element count of @bio_vec
> > + * @xdr: xdr_buf to be copied
> > + *
> > + * Returns the number of entries consumed in @bvec.
> > + */
> > +unsigned int xdr_buf_to_bvec(struct bio_vec *bvec, unsigned int bvec_size,
> > +			     const struct xdr_buf *xdr)
> > +{
> > +	const struct kvec *head = xdr->head;
> > +	const struct kvec *tail = xdr->tail;
> > +	unsigned int count = 0;
> > +
> > +	if (head->iov_len) {
> > +		bvec_set_virt(bvec++, head->iov_base, head->iov_len);
> > +		++count;
> > +	}
> > +
> > +	if (xdr->page_len) {
> > +		unsigned int offset, len, remaining;
> > +		struct page **pages = xdr->pages;
> > +
> > +		offset = offset_in_page(xdr->page_base);
> > +		remaining = xdr->page_len;
> > +		while (remaining > 0) {
> > +			len = min_t(unsigned int, remaining,
> > +				    PAGE_SIZE - offset);
> > +			bvec_set_page(bvec++, *pages++, len, offset);
> > +			remaining -= len;
> > +			offset = 0;
> > +			if (unlikely(++count > bvec_size))
> > +				goto bvec_overflow;
> > +		}
> > +	}
> > +
> > +	if (tail->iov_len) {
> > +		bvec_set_virt(bvec, tail->iov_base, tail->iov_len);
> > +		if (unlikely(++count > bvec_size))
> > +			goto bvec_overflow;
> > +	}
> > +
> > +	return count;
> > +
> > +bvec_overflow:
> > +	pr_warn_once("%s: bio_vec array overflow\n", __func__);
> > +	return count - 1;
> > +}
> > +
> >  /**
> >   * xdr_inline_pages - Prepare receive buffer for a large reply
> >   * @xdr: xdr_buf into which reply will be placed
> > 
> > 
> 
> -- 
> Jeff Layton <jlayton@kernel.org>
Mkrtchyan, Tigran Aug. 14, 2023, 12:56 p.m. UTC | #3
----- Original Message -----
> From: "Jeff Layton" <jlayton@kernel.org>
> To: "Chuck Lever" <cel@kernel.org>, "linux-nfs" <linux-nfs@vger.kernel.org>, netdev@vger.kernel.org
> Cc: "Chuck Lever" <chuck.lever@oracle.com>, dhowells@redhat.com
> Sent: Saturday, 12 August, 2023 14:04:57
> Subject: Re: [PATCH v2 1/4] SUNRPC: Convert svc_tcp_sendmsg to use bio_vecs directly

> On Fri, 2023-07-14 at 14:10 -0400, Chuck Lever wrote:
>> From: Chuck Lever <chuck.lever@oracle.com>
>> 
>> Add a helper to convert a whole xdr_buf directly into an array of
>> bio_vecs, then send this array instead of iterating piecemeal over
>> the xdr_buf containing the outbound RPC message.
>> 
>> Note that the rules of the RPC protocol mean there can be only one
>> outstanding send at a time on a transport socket. The kernel's
>> SunRPC server enforces this via the transport's xpt_mutex. Thus we
>> can use a per-transport shared array for the xdr_buf conversion
>> rather than allocate one every time or use one that is part of
>> struct svc_rqst.
>> 
>> Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
>> ---
>>  include/linux/sunrpc/svcsock.h |    3 ++
>>  include/linux/sunrpc/xdr.h     |    2 +
>>  net/sunrpc/svcsock.c           |   59 ++++++++++++++--------------------------
>>  net/sunrpc/xdr.c               |   50 ++++++++++++++++++++++++++++++++++
>>  4 files changed, 75 insertions(+), 39 deletions(-)
>> 
> 
> I've seen some pynfs test regressions in mainline (v6.5-rc5-ish)
> kernels. Here's one failing test:


BTW, we have built a container to run pynfs tests as part of your CI process.

podman run -ti --rm dcache/pynfs:0.3 /run-nfs4.0.sh --help
podman run -ti --rm dcache/pynfs:0.3 /run-nfs4.1.sh --help

Maybe others will find it useful as well.

Tigran.




> 
> _text = b'write data' # len=10
> 
> [...]
> 
> def testSimpleWrite2(t, env):
>    """WRITE with stateid=zeros changing size
> 
>    FLAGS: write all
>    DEPEND: MKFILE
>    CODE: WRT1b
>    """
>    c = env.c1
>    c.init_connection()
>    attrs = {FATTR4_SIZE: 32, FATTR4_MODE: 0o644}
>    fh, stateid = c.create_confirm(t.word(), attrs=attrs,
>                                   deny=OPEN4_SHARE_DENY_NONE)
>    res = c.write_file(fh, _text, 30)
>    check(res, msg="WRITE with stateid=zeros changing size")
>    res = c.read_file(fh, 25, 20)
>    _compare(t, res, b'\0'*5 + _text, True)
> 
> This test writes 10 bytes of data (to a file at offset 30, and then does
> a 20 byte read starting at offset 25. The READ reply has NULs where the
> written data should be
> 
> The patch that broke things is this one:
> 
>    5df5dd03a8f7 sunrpc: Use sendmsg(MSG_SPLICE_PAGES) rather then sendpage
> 
> This patch fixes the problem and gets the test run "green" again. I
> think we will probably want to send this patch to mainline for v6.5, but
> it'd be good to understand what's broken and how this fixes it.
> 
> Do you (or David) have any insight?
> 
> It'd also be good to understand whether we also need to fix UDP. pynfs
> is tcp-only, so I can't run the same test there as easily.
> 
>> diff --git a/include/linux/sunrpc/svcsock.h b/include/linux/sunrpc/svcsock.h
>> index a7116048a4d4..a9bfeadf4cbe 100644
>> --- a/include/linux/sunrpc/svcsock.h
>> +++ b/include/linux/sunrpc/svcsock.h
>> @@ -40,6 +40,9 @@ struct svc_sock {
>>  
>>  	struct completion	sk_handshake_done;
>>  
>> +	struct bio_vec		sk_send_bvec[RPCSVC_MAXPAGES]
>> +						____cacheline_aligned;
>> +
>>  	struct page *		sk_pages[RPCSVC_MAXPAGES];	/* received data */
>>  };
>>  
>> diff --git a/include/linux/sunrpc/xdr.h b/include/linux/sunrpc/xdr.h
>> index f89ec4b5ea16..42f9d7eb9a1a 100644
>> --- a/include/linux/sunrpc/xdr.h
>> +++ b/include/linux/sunrpc/xdr.h
>> @@ -139,6 +139,8 @@ void	xdr_terminate_string(const struct xdr_buf *, const
>> u32);
>>  size_t	xdr_buf_pagecount(const struct xdr_buf *buf);
>>  int	xdr_alloc_bvec(struct xdr_buf *buf, gfp_t gfp);
>>  void	xdr_free_bvec(struct xdr_buf *buf);
>> +unsigned int xdr_buf_to_bvec(struct bio_vec *bvec, unsigned int bvec_size,
>> +			     const struct xdr_buf *xdr);
>>  
>>  static inline __be32 *xdr_encode_array(__be32 *p, const void *s, unsigned int
>>  len)
>>  {
>> diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
>> index e43f26382411..e35e5afe4b81 100644
>> --- a/net/sunrpc/svcsock.c
>> +++ b/net/sunrpc/svcsock.c
>> @@ -36,6 +36,8 @@
>>  #include <linux/skbuff.h>
>>  #include <linux/file.h>
>>  #include <linux/freezer.h>
>> +#include <linux/bvec.h>
>> +
>>  #include <net/sock.h>
>>  #include <net/checksum.h>
>>  #include <net/ip.h>
>> @@ -1194,72 +1196,52 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
>>  	return 0;	/* record not complete */
>>  }
>>  
>> -static int svc_tcp_send_kvec(struct socket *sock, const struct kvec *vec,
>> -			      int flags)
>> -{
>> -	struct msghdr msg = { .msg_flags = MSG_SPLICE_PAGES | flags, };
>> -
>> -	iov_iter_kvec(&msg.msg_iter, ITER_SOURCE, vec, 1, vec->iov_len);
>> -	return sock_sendmsg(sock, &msg);
>> -}
>> -
>>  /*
>>   * MSG_SPLICE_PAGES is used exclusively to reduce the number of
>>   * copy operations in this path. Therefore the caller must ensure
>>   * that the pages backing @xdr are unchanging.
>>   *
>> - * In addition, the logic assumes that * .bv_len is never larger
>> - * than PAGE_SIZE.
>> + * Note that the send is non-blocking. The caller has incremented
>> + * the reference count on each page backing the RPC message, and
>> + * the network layer will "put" these pages when transmission is
>> + * complete.
>> + *
>> + * This is safe for our RPC services because the memory backing
>> + * the head and tail components is never kmalloc'd. These always
>> + * come from pages in the svc_rqst::rq_pages array.
>>   */
>> -static int svc_tcp_sendmsg(struct socket *sock, struct xdr_buf *xdr,
>> +static int svc_tcp_sendmsg(struct svc_sock *svsk, struct xdr_buf *xdr,
>>  			   rpc_fraghdr marker, unsigned int *sentp)
>>  {
>> -	const struct kvec *head = xdr->head;
>> -	const struct kvec *tail = xdr->tail;
>>  	struct kvec rm = {
>>  		.iov_base	= &marker,
>>  		.iov_len	= sizeof(marker),
>>  	};
>>  	struct msghdr msg = {
>> -		.msg_flags	= 0,
>> +		.msg_flags	= MSG_MORE,
>>  	};
>> +	unsigned int count;
>>  	int ret;
>>  
>>  	*sentp = 0;
>> -	ret = xdr_alloc_bvec(xdr, GFP_KERNEL);
>> -	if (ret < 0)
>> -		return ret;
>>  
>> -	ret = kernel_sendmsg(sock, &msg, &rm, 1, rm.iov_len);
>> +	ret = kernel_sendmsg(svsk->sk_sock, &msg, &rm, 1, rm.iov_len);
>>  	if (ret < 0)
>>  		return ret;
>>  	*sentp += ret;
>>  	if (ret != rm.iov_len)
>>  		return -EAGAIN;
>>  
>> -	ret = svc_tcp_send_kvec(sock, head, 0);
>> -	if (ret < 0)
>> -		return ret;
>> -	*sentp += ret;
>> -	if (ret != head->iov_len)
>> -		goto out;
>> +	count = xdr_buf_to_bvec(svsk->sk_send_bvec,
>> +				ARRAY_SIZE(svsk->sk_send_bvec), xdr);
>>  
>>  	msg.msg_flags = MSG_SPLICE_PAGES;
>> -	iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, xdr->bvec,
>> -		      xdr_buf_pagecount(xdr), xdr->page_len);
>> -	ret = sock_sendmsg(sock, &msg);
>> +	iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, svsk->sk_send_bvec,
>> +		      count, xdr->len);
>> +	ret = sock_sendmsg(svsk->sk_sock, &msg);
>>  	if (ret < 0)
>>  		return ret;
>>  	*sentp += ret;
>> -
>> -	if (tail->iov_len) {
>> -		ret = svc_tcp_send_kvec(sock, tail, 0);
>> -		if (ret < 0)
>> -			return ret;
>> -		*sentp += ret;
>> -	}
>> -
>> -out:
>>  	return 0;
>>  }
>>  
>> @@ -1290,8 +1272,7 @@ static int svc_tcp_sendto(struct svc_rqst *rqstp)
>>  	if (svc_xprt_is_dead(xprt))
>>  		goto out_notconn;
>>  	tcp_sock_set_cork(svsk->sk_sk, true);
>> -	err = svc_tcp_sendmsg(svsk->sk_sock, xdr, marker, &sent);
>> -	xdr_free_bvec(xdr);
>> +	err = svc_tcp_sendmsg(svsk, xdr, marker, &sent);
>>  	trace_svcsock_tcp_send(xprt, err < 0 ? (long)err : sent);
>>  	if (err < 0 || sent != (xdr->len + sizeof(marker)))
>>  		goto out_close;
>> diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
>> index 2a22e78af116..358e6de91775 100644
>> --- a/net/sunrpc/xdr.c
>> +++ b/net/sunrpc/xdr.c
>> @@ -164,6 +164,56 @@ xdr_free_bvec(struct xdr_buf *buf)
>>  	buf->bvec = NULL;
>>  }
>>  
>> +/**
>> + * xdr_buf_to_bvec - Copy components of an xdr_buf into a bio_vec array
>> + * @bvec: bio_vec array to populate
>> + * @bvec_size: element count of @bio_vec
>> + * @xdr: xdr_buf to be copied
>> + *
>> + * Returns the number of entries consumed in @bvec.
>> + */
>> +unsigned int xdr_buf_to_bvec(struct bio_vec *bvec, unsigned int bvec_size,
>> +			     const struct xdr_buf *xdr)
>> +{
>> +	const struct kvec *head = xdr->head;
>> +	const struct kvec *tail = xdr->tail;
>> +	unsigned int count = 0;
>> +
>> +	if (head->iov_len) {
>> +		bvec_set_virt(bvec++, head->iov_base, head->iov_len);
>> +		++count;
>> +	}
>> +
>> +	if (xdr->page_len) {
>> +		unsigned int offset, len, remaining;
>> +		struct page **pages = xdr->pages;
>> +
>> +		offset = offset_in_page(xdr->page_base);
>> +		remaining = xdr->page_len;
>> +		while (remaining > 0) {
>> +			len = min_t(unsigned int, remaining,
>> +				    PAGE_SIZE - offset);
>> +			bvec_set_page(bvec++, *pages++, len, offset);
>> +			remaining -= len;
>> +			offset = 0;
>> +			if (unlikely(++count > bvec_size))
>> +				goto bvec_overflow;
>> +		}
>> +	}
>> +
>> +	if (tail->iov_len) {
>> +		bvec_set_virt(bvec, tail->iov_base, tail->iov_len);
>> +		if (unlikely(++count > bvec_size))
>> +			goto bvec_overflow;
>> +	}
>> +
>> +	return count;
>> +
>> +bvec_overflow:
>> +	pr_warn_once("%s: bio_vec array overflow\n", __func__);
>> +	return count - 1;
>> +}
>> +
>>  /**
>>   * xdr_inline_pages - Prepare receive buffer for a large reply
>>   * @xdr: xdr_buf into which reply will be placed
>> 
>> 
> 
> --
> Jeff Layton <jlayton@kernel.org>
diff mbox series

Patch

diff --git a/include/linux/sunrpc/svcsock.h b/include/linux/sunrpc/svcsock.h
index a7116048a4d4..a9bfeadf4cbe 100644
--- a/include/linux/sunrpc/svcsock.h
+++ b/include/linux/sunrpc/svcsock.h
@@ -40,6 +40,9 @@  struct svc_sock {
 
 	struct completion	sk_handshake_done;
 
+	struct bio_vec		sk_send_bvec[RPCSVC_MAXPAGES]
+						____cacheline_aligned;
+
 	struct page *		sk_pages[RPCSVC_MAXPAGES];	/* received data */
 };
 
diff --git a/include/linux/sunrpc/xdr.h b/include/linux/sunrpc/xdr.h
index f89ec4b5ea16..42f9d7eb9a1a 100644
--- a/include/linux/sunrpc/xdr.h
+++ b/include/linux/sunrpc/xdr.h
@@ -139,6 +139,8 @@  void	xdr_terminate_string(const struct xdr_buf *, const u32);
 size_t	xdr_buf_pagecount(const struct xdr_buf *buf);
 int	xdr_alloc_bvec(struct xdr_buf *buf, gfp_t gfp);
 void	xdr_free_bvec(struct xdr_buf *buf);
+unsigned int xdr_buf_to_bvec(struct bio_vec *bvec, unsigned int bvec_size,
+			     const struct xdr_buf *xdr);
 
 static inline __be32 *xdr_encode_array(__be32 *p, const void *s, unsigned int len)
 {
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index e43f26382411..e35e5afe4b81 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -36,6 +36,8 @@ 
 #include <linux/skbuff.h>
 #include <linux/file.h>
 #include <linux/freezer.h>
+#include <linux/bvec.h>
+
 #include <net/sock.h>
 #include <net/checksum.h>
 #include <net/ip.h>
@@ -1194,72 +1196,52 @@  static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
 	return 0;	/* record not complete */
 }
 
-static int svc_tcp_send_kvec(struct socket *sock, const struct kvec *vec,
-			      int flags)
-{
-	struct msghdr msg = { .msg_flags = MSG_SPLICE_PAGES | flags, };
-
-	iov_iter_kvec(&msg.msg_iter, ITER_SOURCE, vec, 1, vec->iov_len);
-	return sock_sendmsg(sock, &msg);
-}
-
 /*
  * MSG_SPLICE_PAGES is used exclusively to reduce the number of
  * copy operations in this path. Therefore the caller must ensure
  * that the pages backing @xdr are unchanging.
  *
- * In addition, the logic assumes that * .bv_len is never larger
- * than PAGE_SIZE.
+ * Note that the send is non-blocking. The caller has incremented
+ * the reference count on each page backing the RPC message, and
+ * the network layer will "put" these pages when transmission is
+ * complete.
+ *
+ * This is safe for our RPC services because the memory backing
+ * the head and tail components is never kmalloc'd. These always
+ * come from pages in the svc_rqst::rq_pages array.
  */
-static int svc_tcp_sendmsg(struct socket *sock, struct xdr_buf *xdr,
+static int svc_tcp_sendmsg(struct svc_sock *svsk, struct xdr_buf *xdr,
 			   rpc_fraghdr marker, unsigned int *sentp)
 {
-	const struct kvec *head = xdr->head;
-	const struct kvec *tail = xdr->tail;
 	struct kvec rm = {
 		.iov_base	= &marker,
 		.iov_len	= sizeof(marker),
 	};
 	struct msghdr msg = {
-		.msg_flags	= 0,
+		.msg_flags	= MSG_MORE,
 	};
+	unsigned int count;
 	int ret;
 
 	*sentp = 0;
-	ret = xdr_alloc_bvec(xdr, GFP_KERNEL);
-	if (ret < 0)
-		return ret;
 
-	ret = kernel_sendmsg(sock, &msg, &rm, 1, rm.iov_len);
+	ret = kernel_sendmsg(svsk->sk_sock, &msg, &rm, 1, rm.iov_len);
 	if (ret < 0)
 		return ret;
 	*sentp += ret;
 	if (ret != rm.iov_len)
 		return -EAGAIN;
 
-	ret = svc_tcp_send_kvec(sock, head, 0);
-	if (ret < 0)
-		return ret;
-	*sentp += ret;
-	if (ret != head->iov_len)
-		goto out;
+	count = xdr_buf_to_bvec(svsk->sk_send_bvec,
+				ARRAY_SIZE(svsk->sk_send_bvec), xdr);
 
 	msg.msg_flags = MSG_SPLICE_PAGES;
-	iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, xdr->bvec,
-		      xdr_buf_pagecount(xdr), xdr->page_len);
-	ret = sock_sendmsg(sock, &msg);
+	iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, svsk->sk_send_bvec,
+		      count, xdr->len);
+	ret = sock_sendmsg(svsk->sk_sock, &msg);
 	if (ret < 0)
 		return ret;
 	*sentp += ret;
-
-	if (tail->iov_len) {
-		ret = svc_tcp_send_kvec(sock, tail, 0);
-		if (ret < 0)
-			return ret;
-		*sentp += ret;
-	}
-
-out:
 	return 0;
 }
 
@@ -1290,8 +1272,7 @@  static int svc_tcp_sendto(struct svc_rqst *rqstp)
 	if (svc_xprt_is_dead(xprt))
 		goto out_notconn;
 	tcp_sock_set_cork(svsk->sk_sk, true);
-	err = svc_tcp_sendmsg(svsk->sk_sock, xdr, marker, &sent);
-	xdr_free_bvec(xdr);
+	err = svc_tcp_sendmsg(svsk, xdr, marker, &sent);
 	trace_svcsock_tcp_send(xprt, err < 0 ? (long)err : sent);
 	if (err < 0 || sent != (xdr->len + sizeof(marker)))
 		goto out_close;
diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
index 2a22e78af116..358e6de91775 100644
--- a/net/sunrpc/xdr.c
+++ b/net/sunrpc/xdr.c
@@ -164,6 +164,56 @@  xdr_free_bvec(struct xdr_buf *buf)
 	buf->bvec = NULL;
 }
 
+/**
+ * xdr_buf_to_bvec - Copy components of an xdr_buf into a bio_vec array
+ * @bvec: bio_vec array to populate
+ * @bvec_size: element count of @bio_vec
+ * @xdr: xdr_buf to be copied
+ *
+ * Returns the number of entries consumed in @bvec.
+ */
+unsigned int xdr_buf_to_bvec(struct bio_vec *bvec, unsigned int bvec_size,
+			     const struct xdr_buf *xdr)
+{
+	const struct kvec *head = xdr->head;
+	const struct kvec *tail = xdr->tail;
+	unsigned int count = 0;
+
+	if (head->iov_len) {
+		bvec_set_virt(bvec++, head->iov_base, head->iov_len);
+		++count;
+	}
+
+	if (xdr->page_len) {
+		unsigned int offset, len, remaining;
+		struct page **pages = xdr->pages;
+
+		offset = offset_in_page(xdr->page_base);
+		remaining = xdr->page_len;
+		while (remaining > 0) {
+			len = min_t(unsigned int, remaining,
+				    PAGE_SIZE - offset);
+			bvec_set_page(bvec++, *pages++, len, offset);
+			remaining -= len;
+			offset = 0;
+			if (unlikely(++count > bvec_size))
+				goto bvec_overflow;
+		}
+	}
+
+	if (tail->iov_len) {
+		bvec_set_virt(bvec, tail->iov_base, tail->iov_len);
+		if (unlikely(++count > bvec_size))
+			goto bvec_overflow;
+	}
+
+	return count;
+
+bvec_overflow:
+	pr_warn_once("%s: bio_vec array overflow\n", __func__);
+	return count - 1;
+}
+
 /**
  * xdr_inline_pages - Prepare receive buffer for a large reply
  * @xdr: xdr_buf into which reply will be placed