diff mbox

[V9fs-developer] fs/9p: Update zero-copy implementation in 9p

Message ID 1312965749-19683-1-git-send-email-aneesh.kumar@linux.vnet.ibm.com (mailing list archive)
State Superseded, archived
Delegated to: Eric Van Hensbergen
Headers show

Commit Message

Aneesh Kumar K.V Aug. 10, 2011, 8:42 a.m. UTC
* remove lot of update to different data structure
* add a seperate callback for zero copy request.
* above makes non zero copy code path simpler
* remove conditionalizing TREAD/TREADDIR/TWRITE in the zero copy path
* Fix the dotu p9_check_errors with zero copy. Add sufficient doc around
* Add support for both in and output buffers in zero copy callback
* pin and unpin pages in the same context
* use helpers instead of defining page offset and rest of page ourself
* Fix mem leak in p9_check_errors
* Remove 'E' and 'F' in p9pdu_vwritef

Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
---
 fs/9p/vfs_dir.c            |    2 +-
 include/net/9p/9p.h        |   11 +-
 include/net/9p/transport.h |   10 +-
 net/9p/client.c            |  347 +++++++++++++++++++++++++++++++++++---------
 net/9p/protocol.c          |   46 +------
 net/9p/protocol.h          |    2 +
 net/9p/trans_common.c      |   53 ++-----
 net/9p/trans_common.h      |   21 +---
 net/9p/trans_virtio.c      |  314 ++++++++++++++++++++++++---------------
 9 files changed, 497 insertions(+), 309 deletions(-)

Comments

mohan@imap.linux.ibm.com Aug. 15, 2011, 9:27 a.m. UTC | #1
[snip]

> + * p9_client_zc_rpc - issue a request and wait for a response
> + * @c: client session
> + * @type: type of request
> + * @uidata: user bffer that should be ued for zero copy read
> + * @uodata: user buffer that shoud be user for zero copy write
> + * @inlen: read buffer size
> + * @olen: write buffer size
> + * @hdrlen: reader header size, This is the size of response protocol data
> + * @fmt: protocol format string (see protocol.c)
> + *
> + * Returns request structure (which client must free using p9_free_req)
> + */
> +static struct p9_req_t *p9_client_zc_rpc(struct p9_client *c, int8_t type,
> +					 char *uidata, char *uodata,
> +					 int inlen, int olen, int in_hdrlen,
> +					 int kern_buf, const char *fmt, ...)

Most of the code is duplicated from p9_client_rpc, is it possible to write a
helper routine which can be used by both p9_client_rpc & p9_client_zc_rpc?

> +{
> +	va_list ap;
> +	int tag, err;
> +	struct p9_req_t *req;
> +	unsigned long flags;
> +	int sigpending;
> +
> +	P9_DPRINTK(P9_DEBUG_MUX, "client %p op %d\n", c, type);
> +
> +	/* we allow for any status other than disconnected */
> +	if (c->status == Disconnected)
> +		return ERR_PTR(-EIO);
> +
> +	/* If we are called with KERNEL_DS force kern_buf */
> +	if (segment_eq(get_fs(), KERNEL_DS))
> +		kern_buf = 1;
> +
> +	/* if status is begin_disconnected we allow only clunk request */
> +	if ((c->status == BeginDisconnect) && (type != P9_TCLUNK))
> +		return ERR_PTR(-EIO);
> +
> +	if (signal_pending(current)) {
> +		sigpending = 1;
> +		clear_thread_flag(TIF_SIGPENDING);
> +	} else
> +		sigpending = 0;
> +
> +	tag = P9_NOTAG;
> +	if (type != P9_TVERSION) {
> +		tag = p9_idpool_get(c->tagpool);
> +		if (tag < 0)
> +			return ERR_PTR(-ENOMEM);
> +	}
> +	/*
> +	 * We allocate a inline protocol data of only 4k bytes.
> +	 * The actual content is passed in zero-copy fashion.
> +	 */
> +	req = p9_tag_alloc(c, tag, P9_ZC_HDR_SZ);
> +	if (IS_ERR(req))
> +		return req;
> +
> +	/* marshall the data */
> +	p9pdu_prepare(req->tc, tag, type);
> +	va_start(ap, fmt);
> +	err = p9pdu_vwritef(req->tc, c->proto_version, fmt, ap);
> +	va_end(ap);
> +	if (err)
> +		goto reterr;
> +	p9pdu_finalize(req->tc);
> +
> +	err = c->trans_mod->zc_request(c, req, uidata, uodata,
> +				       inlen, olen, in_hdrlen, kern_buf);
> +	if (err < 0) {
> +		if (err == -EIO)
> +			c->status = Disconnected;
> +		goto reterr;
> +	}
> +	if (req->status == REQ_STATUS_ERROR) {
> +		P9_DPRINTK(P9_DEBUG_ERROR, "req_status error %d\n", req->t_err);
> +		err = req->t_err;
> +	}
> +	if ((err == -ERESTARTSYS) && (c->status == Connected)) {
> +		P9_DPRINTK(P9_DEBUG_MUX, "flushing\n");
> +		sigpending = 1;
> +		clear_thread_flag(TIF_SIGPENDING);
> +
> +		if (c->trans_mod->cancel(c, req))
> +			p9_client_flush(c, req);
> +
> +		/* if we received the response anyway, don't signal error */
> +		if (req->status == REQ_STATUS_RCVD)
> +			err = 0;
> +	}
> +	if (sigpending) {
> +		spin_lock_irqsave(&current->sighand->siglock, flags);
> +		recalc_sigpending();
> +		spin_unlock_irqrestore(&current->sighand->siglock, flags);
> +	}
> +	if (err < 0)
> +		goto reterr;
> +
> +	err = p9_check_zc_errors(c, req, uidata, in_hdrlen, kern_buf);
> +	if (!err) {
> +		P9_DPRINTK(P9_DEBUG_MUX, "exit: client %p op %d\n", c, type);
> +		return req;
> +	}
> +reterr:
> +	P9_DPRINTK(P9_DEBUG_MUX, "exit: client %p op %d error: %d\n", c, type,
> +									err);
> +	p9_free_req(c, req);
> +	return ERR_PTR(err);
> +}
> +
>  static struct p9_fid *p9_fid_create(struct p9_client *clnt)
>  {
>  	int ret;
> @@ -1329,13 +1512,15 @@ int
>  p9_client_read(struct p9_fid *fid, char *data, char __user *udata,  
> u64 offset,
>  								u32 count)
>  {
> -	int err, rsize;
> -	struct p9_client *clnt;
> -	struct p9_req_t *req;
>  	char *dataptr;
> +	int kernel_buf = 0;
> +	struct p9_req_t *req;
> +	struct p9_client *clnt;
> +	int err, rsize, non_zc = 0;
> +
>
> -	P9_DPRINTK(P9_DEBUG_9P, ">>> TREAD fid %d offset %llu %d\n", fid->fid,
> -					(long long unsigned) offset, count);
> +	P9_DPRINTK(P9_DEBUG_9P, ">>> TREAD fid %d offset %llu %d\n",
> +		   fid->fid, (long long unsigned) offset, count);
>  	err = 0;
>  	clnt = fid->clnt;
>
> @@ -1346,14 +1531,25 @@ p9_client_read(struct p9_fid *fid, char  
> *data, char __user *udata, u64 offset,
>  	if (count < rsize)
>  		rsize = count;
>
> -	/* Don't bother zerocopy for small IO (< 1024) */
> -	if (((clnt->trans_mod->pref & P9_TRANS_PREF_PAYLOAD_MASK) ==
> -			P9_TRANS_PREF_PAYLOAD_SEP) && (rsize > 1024)) {
> -		req = p9_client_rpc(clnt, P9_TREAD, "dqE", fid->fid, offset,
> -				rsize, data, udata);
> +	/* Don't bother zerocopy form small IO (< 1024) */
form Typo?
> +	if (clnt->trans_mod->zc_request && rsize > 1024) {
> +		char *indata;
> +		if (data) {
> +			kernel_buf = 1;
> +			indata = data;
> +		} else
> +			indata = (char *)udata;
> +		/*
> +		 * response header len is 11
> +		 * PDU Header(7) + IO Size (4)
> +		 */
> +		req = p9_client_zc_rpc(clnt, P9_TREAD, indata, NULL, rsize, 0,
> +				       11, kernel_buf, "dqd", fid->fid,
> +				       offset, rsize);


------------------------------------------------------------------------------
uberSVN's rich system and user administration capabilities and model 
configuration take the hassle out of deploying and managing Subversion and 
the tools developers use with it. Learn more about uberSVN and get a free 
download at:  http://p.sf.net/sfu/wandisco-dev2dev
Aneesh Kumar K.V Aug. 15, 2011, 10:01 a.m. UTC | #2
On Mon, 15 Aug 2011 05:27:42 -0400, mohan@imap.linux.ibm.com wrote:
> 
> [snip]
> 
> > + * p9_client_zc_rpc - issue a request and wait for a response
> > + * @c: client session
> > + * @type: type of request
> > + * @uidata: user bffer that should be ued for zero copy read
> > + * @uodata: user buffer that shoud be user for zero copy write
> > + * @inlen: read buffer size
> > + * @olen: write buffer size
> > + * @hdrlen: reader header size, This is the size of response protocol data
> > + * @fmt: protocol format string (see protocol.c)
> > + *
> > + * Returns request structure (which client must free using p9_free_req)
> > + */
> > +static struct p9_req_t *p9_client_zc_rpc(struct p9_client *c, int8_t type,
> > +					 char *uidata, char *uodata,
> > +					 int inlen, int olen, int in_hdrlen,
> > +					 int kern_buf, const char *fmt, ...)
> 
> Most of the code is duplicated from p9_client_rpc, is it possible to write a
> helper routine which can be used by both p9_client_rpc &
> p9_client_zc_rpc?

I will try that.


> > +{
> > +	va_list ap;
> > +	int tag, err;
> > +	struct p9_req_t *req;
> > +	unsigned long flags;
> > +	int sigpending;
> > +
> > +	P9_DPRINTK(P9_DEBUG_MUX, "client %p op %d\n", c, type);
> > +
> > +	/* we allow for any status other than disconnected */
> > +	if (c->status == Disconnected)
> > +		return ERR_PTR(-EIO);
> > +
> > +	/* If we are called with KERNEL_DS force kern_buf */
> > +	if (segment_eq(get_fs(), KERNEL_DS))
> > +		kern_buf = 1;
> > +
> > +	/* if status is begin_disconnected we allow only clunk request */
> > +	if ((c->status == BeginDisconnect) && (type != P9_TCLUNK))
> > +		return ERR_PTR(-EIO);
> > +
> > +	if (signal_pending(current)) {
> > +		sigpending = 1;
> > +		clear_thread_flag(TIF_SIGPENDING);
> > +	} else
> > +		sigpending = 0;
> > +
> > +	tag = P9_NOTAG;
> > +	if (type != P9_TVERSION) {
> > +		tag = p9_idpool_get(c->tagpool);
> > +		if (tag < 0)
> > +			return ERR_PTR(-ENOMEM);
> > +	}
> > +	/*
> > +	 * We allocate a inline protocol data of only 4k bytes.
> > +	 * The actual content is passed in zero-copy fashion.
> > +	 */
> > +	req = p9_tag_alloc(c, tag, P9_ZC_HDR_SZ);
> > +	if (IS_ERR(req))
> > +		return req;
> > +
> > +	/* marshall the data */
> > +	p9pdu_prepare(req->tc, tag, type);
> > +	va_start(ap, fmt);
> > +	err = p9pdu_vwritef(req->tc, c->proto_version, fmt, ap);
> > +	va_end(ap);
> > +	if (err)
> > +		goto reterr;
> > +	p9pdu_finalize(req->tc);
> > +
> > +	err = c->trans_mod->zc_request(c, req, uidata, uodata,
> > +				       inlen, olen, in_hdrlen, kern_buf);
> > +	if (err < 0) {
> > +		if (err == -EIO)
> > +			c->status = Disconnected;
> > +		goto reterr;
> > +	}
> > +	if (req->status == REQ_STATUS_ERROR) {
> > +		P9_DPRINTK(P9_DEBUG_ERROR, "req_status error %d\n", req->t_err);
> > +		err = req->t_err;
> > +	}
> > +	if ((err == -ERESTARTSYS) && (c->status == Connected)) {
> > +		P9_DPRINTK(P9_DEBUG_MUX, "flushing\n");
> > +		sigpending = 1;
> > +		clear_thread_flag(TIF_SIGPENDING);
> > +
> > +		if (c->trans_mod->cancel(c, req))
> > +			p9_client_flush(c, req);
> > +
> > +		/* if we received the response anyway, don't signal error */
> > +		if (req->status == REQ_STATUS_RCVD)
> > +			err = 0;
> > +	}
> > +	if (sigpending) {
> > +		spin_lock_irqsave(&current->sighand->siglock, flags);
> > +		recalc_sigpending();
> > +		spin_unlock_irqrestore(&current->sighand->siglock, flags);
> > +	}
> > +	if (err < 0)
> > +		goto reterr;
> > +
> > +	err = p9_check_zc_errors(c, req, uidata, in_hdrlen, kern_buf);
> > +	if (!err) {
> > +		P9_DPRINTK(P9_DEBUG_MUX, "exit: client %p op %d\n", c, type);
> > +		return req;
> > +	}
> > +reterr:
> > +	P9_DPRINTK(P9_DEBUG_MUX, "exit: client %p op %d error: %d\n", c, type,
> > +									err);
> > +	p9_free_req(c, req);
> > +	return ERR_PTR(err);
> > +}
> > +
> >  static struct p9_fid *p9_fid_create(struct p9_client *clnt)
> >  {
> >  	int ret;
> > @@ -1329,13 +1512,15 @@ int
> >  p9_client_read(struct p9_fid *fid, char *data, char __user *udata,  
> > u64 offset,
> >  								u32 count)
> >  {
> > -	int err, rsize;
> > -	struct p9_client *clnt;
> > -	struct p9_req_t *req;
> >  	char *dataptr;
> > +	int kernel_buf = 0;
> > +	struct p9_req_t *req;
> > +	struct p9_client *clnt;
> > +	int err, rsize, non_zc = 0;
> > +
> >
> > -	P9_DPRINTK(P9_DEBUG_9P, ">>> TREAD fid %d offset %llu %d\n", fid->fid,
> > -					(long long unsigned) offset, count);
> > +	P9_DPRINTK(P9_DEBUG_9P, ">>> TREAD fid %d offset %llu %d\n",
> > +		   fid->fid, (long long unsigned) offset, count);
> >  	err = 0;
> >  	clnt = fid->clnt;
> >
> > @@ -1346,14 +1531,25 @@ p9_client_read(struct p9_fid *fid, char  
> > *data, char __user *udata, u64 offset,
> >  	if (count < rsize)
> >  		rsize = count;
> >
> > -	/* Don't bother zerocopy for small IO (< 1024) */
> > -	if (((clnt->trans_mod->pref & P9_TRANS_PREF_PAYLOAD_MASK) ==
> > -			P9_TRANS_PREF_PAYLOAD_SEP) && (rsize > 1024)) {
> > -		req = p9_client_rpc(clnt, P9_TREAD, "dqE", fid->fid, offset,
> > -				rsize, data, udata);
> > +	/* Don't bother zerocopy form small IO (< 1024) */
> form Typo?

Yes. Will fix in next update.

> > +	if (clnt->trans_mod->zc_request && rsize > 1024) {
> > +		char *indata;
> > +		if (data) {
> > +			kernel_buf = 1;
> > +			indata = data;
> > +		} else
> > +			indata = (char *)udata;
> > +		/*
> > +		 * response header len is 11
> > +		 * PDU Header(7) + IO Size (4)
> > +		 */
> > +		req = p9_client_zc_rpc(clnt, P9_TREAD, indata, NULL, rsize, 0,
> > +				       11, kernel_buf, "dqd", fid->fid,
> > +				       offset, rsize);
> 

-aneesh

------------------------------------------------------------------------------
uberSVN's rich system and user administration capabilities and model 
configuration take the hassle out of deploying and managing Subversion and 
the tools developers use with it. Learn more about uberSVN and get a free 
download at:  http://p.sf.net/sfu/wandisco-dev2dev
diff mbox

Patch

diff --git a/fs/9p/vfs_dir.c b/fs/9p/vfs_dir.c
index 9c2bdda..ce6600f 100644
--- a/fs/9p/vfs_dir.c
+++ b/fs/9p/vfs_dir.c
@@ -231,7 +231,7 @@  static int v9fs_dir_readdir_dotl(struct file *filp, void *dirent,
 	while (err == 0) {
 		if (rdir->tail == rdir->head) {
 			err = p9_client_readdir(fid, rdir->buf, buflen,
-								filp->f_pos);
+						filp->f_pos);
 			if (err <= 0)
 				goto unlock_and_exit;
 
diff --git a/include/net/9p/9p.h b/include/net/9p/9p.h
index 342dcf1..700d41e 100644
--- a/include/net/9p/9p.h
+++ b/include/net/9p/9p.h
@@ -330,6 +330,9 @@  enum p9_qid_t {
 /* Room for readdir header */
 #define P9_READDIRHDRSZ	24
 
+/* size of header for zero copy read/write */
+#define P9_ZC_HDR_SZ 4096
+
 /**
  * struct p9_qid - file system entity information
  * @type: 8-bit type &p9_qid_t
@@ -526,10 +529,6 @@  struct p9_rstatfs {
  * @tag: transaction id of the request
  * @offset: used by marshalling routines to track current position in buffer
  * @capacity: used by marshalling routines to track total malloc'd capacity
- * @pubuf: Payload user buffer given by the caller
- * @pkbuf: Payload kernel buffer given by the caller
- * @pbuf_size: pubuf/pkbuf(only one will be !NULL) size to be read/write.
- * @private: For transport layer's use.
  * @sdata: payload
  *
  * &p9_fcall represents the structure for all 9P RPC
@@ -546,10 +545,6 @@  struct p9_fcall {
 
 	size_t offset;
 	size_t capacity;
-	char __user *pubuf;
-	char *pkbuf;
-	size_t pbuf_size;
-	void *private;
 
 	u8 *sdata;
 };
diff --git a/include/net/9p/transport.h b/include/net/9p/transport.h
index 83531eb..adcbb20 100644
--- a/include/net/9p/transport.h
+++ b/include/net/9p/transport.h
@@ -26,13 +26,6 @@ 
 #ifndef NET_9P_TRANSPORT_H
 #define NET_9P_TRANSPORT_H
 
-#define P9_TRANS_PREF_PAYLOAD_MASK 0x1
-
-/* Default. Add Payload to PDU before sending it down to transport layer */
-#define P9_TRANS_PREF_PAYLOAD_DEF  0x0
-/* Send pay load separately to transport layer along with PDU.*/
-#define P9_TRANS_PREF_PAYLOAD_SEP  0x1
-
 /**
  * struct p9_trans_module - transport module interface
  * @list: used to maintain a list of currently available transports
@@ -56,13 +49,14 @@  struct p9_trans_module {
 	struct list_head list;
 	char *name;		/* name of transport */
 	int maxsize;		/* max message size of transport */
-	int pref;               /* Preferences of this transport */
 	int def;		/* this transport should be default */
 	struct module *owner;
 	int (*create)(struct p9_client *, const char *, char *);
 	void (*close) (struct p9_client *);
 	int (*request) (struct p9_client *, struct p9_req_t *req);
 	int (*cancel) (struct p9_client *, struct p9_req_t *req);
+	int (*zc_request)(struct p9_client *, struct p9_req_t *,
+			  char *, char *, int , int, int, int);
 };
 
 void v9fs_register_trans(struct p9_trans_module *m);
diff --git a/net/9p/client.c b/net/9p/client.c
index 63b384b..d094ce9 100644
--- a/net/9p/client.c
+++ b/net/9p/client.c
@@ -203,11 +203,12 @@  free_and_return:
  *
  */
 
-static struct p9_req_t *p9_tag_alloc(struct p9_client *c, u16 tag)
+static struct p9_req_t *p9_tag_alloc(struct p9_client *c, u16 tag, int max_size)
 {
 	unsigned long flags;
 	int row, col;
 	struct p9_req_t *req;
+	int alloc_msize = min(c->msize, max_size);
 
 	/* This looks up the original request by tag so we know which
 	 * buffer to read the data into */
@@ -245,23 +246,12 @@  static struct p9_req_t *p9_tag_alloc(struct p9_client *c, u16 tag)
 			return ERR_PTR(-ENOMEM);
 		}
 		init_waitqueue_head(req->wq);
-		if ((c->trans_mod->pref & P9_TRANS_PREF_PAYLOAD_MASK) ==
-				P9_TRANS_PREF_PAYLOAD_SEP) {
-			int alloc_msize = min(c->msize, 4096);
-			req->tc = kmalloc(sizeof(struct p9_fcall)+alloc_msize,
-					  GFP_NOFS);
-			req->tc->capacity = alloc_msize;
-			req->rc = kmalloc(sizeof(struct p9_fcall)+alloc_msize,
-					  GFP_NOFS);
-			req->rc->capacity = alloc_msize;
-		} else {
-			req->tc = kmalloc(sizeof(struct p9_fcall)+c->msize,
-					  GFP_NOFS);
-			req->tc->capacity = c->msize;
-			req->rc = kmalloc(sizeof(struct p9_fcall)+c->msize,
-					  GFP_NOFS);
-			req->rc->capacity = c->msize;
-		}
+		req->tc = kmalloc(sizeof(struct p9_fcall) + alloc_msize,
+				  GFP_NOFS);
+		req->tc->capacity = alloc_msize;
+		req->rc = kmalloc(sizeof(struct p9_fcall) + alloc_msize,
+				  GFP_NOFS);
+		req->rc->capacity = alloc_msize;
 		if ((!req->tc) || (!req->rc)) {
 			printk(KERN_ERR "Couldn't grow tag array\n");
 			kfree(req->tc);
@@ -484,27 +474,8 @@  static int p9_check_errors(struct p9_client *c, struct p9_req_t *req)
 
 	if (!p9_is_proto_dotl(c)) {
 		char *ename;
-
-		if (req->tc->pbuf_size) {
-			/* Handle user buffers */
-			size_t len = req->rc->size - req->rc->offset;
-			if (req->tc->pubuf) {
-				/* User Buffer */
-				err = copy_from_user(
-					&req->rc->sdata[req->rc->offset],
-					req->tc->pubuf, len);
-				if (err) {
-					err = -EFAULT;
-					goto out_err;
-				}
-			} else {
-				/* Kernel Buffer */
-				memmove(&req->rc->sdata[req->rc->offset],
-						req->tc->pkbuf, len);
-			}
-		}
 		err = p9pdu_readf(req->rc, c->proto_version, "s?d",
-				&ename, &ecode);
+				  &ename, &ecode);
 		if (err)
 			goto out_err;
 
@@ -514,11 +485,10 @@  static int p9_check_errors(struct p9_client *c, struct p9_req_t *req)
 		if (!err || !IS_ERR_VALUE(err)) {
 			err = p9_errstr2errno(ename, strlen(ename));
 
-			P9_DPRINTK(P9_DEBUG_9P, "<<< RERROR (%d) %s\n", -ecode,
-					ename);
-
-			kfree(ename);
+			P9_DPRINTK(P9_DEBUG_9P, "<<< RERROR (%d) %s\n",
+				   -ecode, ename);
 		}
+		kfree(ename);
 	} else {
 		err = p9pdu_readf(req->rc, c->proto_version, "d", &ecode);
 		err = -ecode;
@@ -526,7 +496,6 @@  static int p9_check_errors(struct p9_client *c, struct p9_req_t *req)
 		P9_DPRINTK(P9_DEBUG_9P, "<<< RLERROR (%d)\n", -ecode);
 	}
 
-
 	return err;
 
 out_err:
@@ -535,6 +504,110 @@  out_err:
 	return err;
 }
 
+/**
+ * p9_check_zc_errors - check 9p packet for error return and process it
+ * @c: current client instance
+ * @req: request to parse and check for error conditions
+ * @in_hdrlen: Size of response protocol buffer.
+ *
+ * returns error code if one is discovered, otherwise returns 0
+ *
+ * this will have to be more complicated if we have multiple
+ * error packet types
+ */
+
+static int p9_check_zc_errors(struct p9_client *c, struct p9_req_t *req,
+			      char *uidata, int in_hdrlen, int kern_buf)
+{
+	int err;
+	int ecode;
+	int8_t type;
+	char *ename = NULL;
+
+	err = p9_parse_header(req->rc, NULL, &type, NULL, 0);
+	if (err) {
+		P9_DPRINTK(P9_DEBUG_ERROR, "couldn't parse header %d\n", err);
+		return err;
+	}
+
+	if (type != P9_RERROR && type != P9_RLERROR)
+		return 0;
+
+	if (!p9_is_proto_dotl(c)) {
+		/* Error is reported in string format */
+		uint16_t len;
+		/* 7 = header size for RERROR, 2 is the size of string len; */
+		int inline_len = in_hdrlen - (7 + 2);
+
+		/* Read the size of error string */
+		err = p9pdu_readf(req->rc, c->proto_version, "w", &len);
+		if (err)
+			goto out_err;
+
+		ename = kmalloc(len + 1, GFP_NOFS);
+		if (!ename) {
+			err = -ENOMEM;
+			goto out_err;
+		}
+		if (len <= inline_len) {
+			/* We have error in protocol buffer itself */
+			if (pdu_read(req->rc, ename, len)) {
+				err = -EFAULT;
+				goto out_free;
+
+			}
+		} else {
+			/*
+			 *  Part of the data is in user space buffer.
+			 */
+			if (pdu_read(req->rc, ename, inline_len)) {
+				err = -EFAULT;
+				goto out_free;
+
+			}
+			if (kern_buf) {
+				memcpy(ename + inline_len, uidata,
+				       len - inline_len);
+			} else {
+				err = copy_from_user(ename + inline_len,
+						     uidata, len - inline_len);
+				if (err) {
+					err = -EFAULT;
+					goto out_free;
+				}
+			}
+		}
+		ename[len] = 0;
+		if (p9_is_proto_dotu(c)) {
+			/* For dotu we also have error code */
+			err = p9pdu_readf(req->rc,
+					  c->proto_version, "d", &ecode);
+			if (err)
+				goto out_free;
+			err = -ecode;
+		}
+		if (!err || !IS_ERR_VALUE(err)) {
+			err = p9_errstr2errno(ename, strlen(ename));
+
+			P9_DPRINTK(P9_DEBUG_9P, "<<< RERROR (%d) %s\n",
+				   -ecode, ename);
+		}
+		kfree(ename);
+	} else {
+		err = p9pdu_readf(req->rc, c->proto_version, "d", &ecode);
+		err = -ecode;
+
+		P9_DPRINTK(P9_DEBUG_9P, "<<< RLERROR (%d)\n", -ecode);
+	}
+	return err;
+
+out_free:
+	kfree(ename);
+out_err:
+	P9_DPRINTK(P9_DEBUG_ERROR, "couldn't parse error%d\n", err);
+	return err;
+}
+
 static struct p9_req_t *
 p9_client_rpc(struct p9_client *c, int8_t type, const char *fmt, ...);
 
@@ -619,7 +692,7 @@  p9_client_rpc(struct p9_client *c, int8_t type, const char *fmt, ...)
 			return ERR_PTR(-ENOMEM);
 	}
 
-	req = p9_tag_alloc(c, tag);
+	req = p9_tag_alloc(c, tag, c->msize);
 	if (IS_ERR(req))
 		return req;
 
@@ -685,6 +758,116 @@  reterr:
 	return ERR_PTR(err);
 }
 
+/**
+ * p9_client_zc_rpc - issue a request and wait for a response
+ * @c: client session
+ * @type: type of request
+ * @uidata: user bffer that should be ued for zero copy read
+ * @uodata: user buffer that shoud be user for zero copy write
+ * @inlen: read buffer size
+ * @olen: write buffer size
+ * @hdrlen: reader header size, This is the size of response protocol data
+ * @fmt: protocol format string (see protocol.c)
+ *
+ * Returns request structure (which client must free using p9_free_req)
+ */
+static struct p9_req_t *p9_client_zc_rpc(struct p9_client *c, int8_t type,
+					 char *uidata, char *uodata,
+					 int inlen, int olen, int in_hdrlen,
+					 int kern_buf, const char *fmt, ...)
+{
+	va_list ap;
+	int tag, err;
+	struct p9_req_t *req;
+	unsigned long flags;
+	int sigpending;
+
+	P9_DPRINTK(P9_DEBUG_MUX, "client %p op %d\n", c, type);
+
+	/* we allow for any status other than disconnected */
+	if (c->status == Disconnected)
+		return ERR_PTR(-EIO);
+
+	/* If we are called with KERNEL_DS force kern_buf */
+	if (segment_eq(get_fs(), KERNEL_DS))
+		kern_buf = 1;
+
+	/* if status is begin_disconnected we allow only clunk request */
+	if ((c->status == BeginDisconnect) && (type != P9_TCLUNK))
+		return ERR_PTR(-EIO);
+
+	if (signal_pending(current)) {
+		sigpending = 1;
+		clear_thread_flag(TIF_SIGPENDING);
+	} else
+		sigpending = 0;
+
+	tag = P9_NOTAG;
+	if (type != P9_TVERSION) {
+		tag = p9_idpool_get(c->tagpool);
+		if (tag < 0)
+			return ERR_PTR(-ENOMEM);
+	}
+	/*
+	 * We allocate a inline protocol data of only 4k bytes.
+	 * The actual content is passed in zero-copy fashion.
+	 */
+	req = p9_tag_alloc(c, tag, P9_ZC_HDR_SZ);
+	if (IS_ERR(req))
+		return req;
+
+	/* marshall the data */
+	p9pdu_prepare(req->tc, tag, type);
+	va_start(ap, fmt);
+	err = p9pdu_vwritef(req->tc, c->proto_version, fmt, ap);
+	va_end(ap);
+	if (err)
+		goto reterr;
+	p9pdu_finalize(req->tc);
+
+	err = c->trans_mod->zc_request(c, req, uidata, uodata,
+				       inlen, olen, in_hdrlen, kern_buf);
+	if (err < 0) {
+		if (err == -EIO)
+			c->status = Disconnected;
+		goto reterr;
+	}
+	if (req->status == REQ_STATUS_ERROR) {
+		P9_DPRINTK(P9_DEBUG_ERROR, "req_status error %d\n", req->t_err);
+		err = req->t_err;
+	}
+	if ((err == -ERESTARTSYS) && (c->status == Connected)) {
+		P9_DPRINTK(P9_DEBUG_MUX, "flushing\n");
+		sigpending = 1;
+		clear_thread_flag(TIF_SIGPENDING);
+
+		if (c->trans_mod->cancel(c, req))
+			p9_client_flush(c, req);
+
+		/* if we received the response anyway, don't signal error */
+		if (req->status == REQ_STATUS_RCVD)
+			err = 0;
+	}
+	if (sigpending) {
+		spin_lock_irqsave(&current->sighand->siglock, flags);
+		recalc_sigpending();
+		spin_unlock_irqrestore(&current->sighand->siglock, flags);
+	}
+	if (err < 0)
+		goto reterr;
+
+	err = p9_check_zc_errors(c, req, uidata, in_hdrlen, kern_buf);
+	if (!err) {
+		P9_DPRINTK(P9_DEBUG_MUX, "exit: client %p op %d\n", c, type);
+		return req;
+	}
+reterr:
+	P9_DPRINTK(P9_DEBUG_MUX, "exit: client %p op %d error: %d\n", c, type,
+									err);
+	p9_free_req(c, req);
+	return ERR_PTR(err);
+}
+
 static struct p9_fid *p9_fid_create(struct p9_client *clnt)
 {
 	int ret;
@@ -1329,13 +1512,15 @@  int
 p9_client_read(struct p9_fid *fid, char *data, char __user *udata, u64 offset,
 								u32 count)
 {
-	int err, rsize;
-	struct p9_client *clnt;
-	struct p9_req_t *req;
 	char *dataptr;
+	int kernel_buf = 0;
+	struct p9_req_t *req;
+	struct p9_client *clnt;
+	int err, rsize, non_zc = 0;
+
 
-	P9_DPRINTK(P9_DEBUG_9P, ">>> TREAD fid %d offset %llu %d\n", fid->fid,
-					(long long unsigned) offset, count);
+	P9_DPRINTK(P9_DEBUG_9P, ">>> TREAD fid %d offset %llu %d\n",
+		   fid->fid, (long long unsigned) offset, count);
 	err = 0;
 	clnt = fid->clnt;
 
@@ -1346,14 +1531,25 @@  p9_client_read(struct p9_fid *fid, char *data, char __user *udata, u64 offset,
 	if (count < rsize)
 		rsize = count;
 
-	/* Don't bother zerocopy for small IO (< 1024) */
-	if (((clnt->trans_mod->pref & P9_TRANS_PREF_PAYLOAD_MASK) ==
-			P9_TRANS_PREF_PAYLOAD_SEP) && (rsize > 1024)) {
-		req = p9_client_rpc(clnt, P9_TREAD, "dqE", fid->fid, offset,
-				rsize, data, udata);
+	/* Don't bother zerocopy form small IO (< 1024) */
+	if (clnt->trans_mod->zc_request && rsize > 1024) {
+		char *indata;
+		if (data) {
+			kernel_buf = 1;
+			indata = data;
+		} else
+			indata = (char *)udata;
+		/*
+		 * response header len is 11
+		 * PDU Header(7) + IO Size (4)
+		 */
+		req = p9_client_zc_rpc(clnt, P9_TREAD, indata, NULL, rsize, 0,
+				       11, kernel_buf, "dqd", fid->fid,
+				       offset, rsize);
 	} else {
+		non_zc = 1;
 		req = p9_client_rpc(clnt, P9_TREAD, "dqd", fid->fid, offset,
-				rsize);
+				    rsize);
 	}
 	if (IS_ERR(req)) {
 		err = PTR_ERR(req);
@@ -1369,7 +1565,7 @@  p9_client_read(struct p9_fid *fid, char *data, char __user *udata, u64 offset,
 	P9_DPRINTK(P9_DEBUG_9P, "<<< RREAD count %d\n", count);
 	P9_DUMP_PKT(1, req->rc);
 
-	if (!req->tc->pbuf_size) {
+	if (non_zc) {
 		if (data) {
 			memmove(data, dataptr, count);
 		} else {
@@ -1395,6 +1591,7 @@  p9_client_write(struct p9_fid *fid, char *data, const char __user *udata,
 							u64 offset, u32 count)
 {
 	int err, rsize;
+	int kernel_buf = 0;
 	struct p9_client *clnt;
 	struct p9_req_t *req;
 
@@ -1411,18 +1608,23 @@  p9_client_write(struct p9_fid *fid, char *data, const char __user *udata,
 		rsize = count;
 
 	/* Don't bother zerocopy form small IO (< 1024) */
-	if (((clnt->trans_mod->pref & P9_TRANS_PREF_PAYLOAD_MASK) ==
-				P9_TRANS_PREF_PAYLOAD_SEP) && (rsize > 1024)) {
-		req = p9_client_rpc(clnt, P9_TWRITE, "dqE", fid->fid, offset,
-				rsize, data, udata);
+	if (clnt->trans_mod->zc_request && rsize > 1024) {
+		char *odata;
+		if (data) {
+			kernel_buf = 1;
+			odata = data;
+		} else
+			odata = (char *)udata;
+		req = p9_client_zc_rpc(clnt, P9_TWRITE, NULL, odata, 0, rsize,
+				       P9_ZC_HDR_SZ, kernel_buf, "dqd",
+				       fid->fid, offset, rsize);
 	} else {
-
 		if (data)
 			req = p9_client_rpc(clnt, P9_TWRITE, "dqD", fid->fid,
-					offset, rsize, data);
+					    offset, rsize, data);
 		else
 			req = p9_client_rpc(clnt, P9_TWRITE, "dqU", fid->fid,
-					offset, rsize, udata);
+					    offset, rsize, udata);
 	}
 	if (IS_ERR(req)) {
 		err = PTR_ERR(req);
@@ -1823,7 +2025,7 @@  EXPORT_SYMBOL_GPL(p9_client_xattrcreate);
 
 int p9_client_readdir(struct p9_fid *fid, char *data, u32 count, u64 offset)
 {
-	int err, rsize;
+	int err, rsize, non_zc = 0;
 	struct p9_client *clnt;
 	struct p9_req_t *req;
 	char *dataptr;
@@ -1841,13 +2043,18 @@  int p9_client_readdir(struct p9_fid *fid, char *data, u32 count, u64 offset)
 	if (count < rsize)
 		rsize = count;
 
-	if ((clnt->trans_mod->pref & P9_TRANS_PREF_PAYLOAD_MASK) ==
-			P9_TRANS_PREF_PAYLOAD_SEP) {
-		req = p9_client_rpc(clnt, P9_TREADDIR, "dqF", fid->fid,
-				offset, rsize, data);
+	/* Don't bother zerocopy form small IO (< 1024) */
+	if (clnt->trans_mod->zc_request && rsize > 1024) {
+		/*
+		 * response header len is 11
+		 * PDU Header(7) + IO Size (4)
+		 */
+		req = p9_client_zc_rpc(clnt, P9_TREADDIR, data, NULL, rsize, 0,
+				       11, 1, "dqd", fid->fid, offset, rsize);
 	} else {
+		non_zc = 1;
 		req = p9_client_rpc(clnt, P9_TREADDIR, "dqd", fid->fid,
-				offset, rsize);
+				    offset, rsize);
 	}
 	if (IS_ERR(req)) {
 		err = PTR_ERR(req);
@@ -1862,7 +2069,7 @@  int p9_client_readdir(struct p9_fid *fid, char *data, u32 count, u64 offset)
 
 	P9_DPRINTK(P9_DEBUG_9P, "<<< RREADDIR count %d\n", count);
 
-	if (!req->tc->pbuf_size && data)
+	if (non_zc)
 		memmove(data, dataptr, count);
 
 	p9_free_req(clnt, req);
diff --git a/net/9p/protocol.c b/net/9p/protocol.c
index df58375..b7d4e8a 100644
--- a/net/9p/protocol.c
+++ b/net/9p/protocol.c
@@ -81,7 +81,7 @@  void p9stat_free(struct p9_wstat *stbuf)
 }
 EXPORT_SYMBOL(p9stat_free);
 
-static size_t pdu_read(struct p9_fcall *pdu, void *data, size_t size)
+size_t pdu_read(struct p9_fcall *pdu, void *data, size_t size)
 {
 	size_t len = min(pdu->size - pdu->offset, size);
 	memcpy(data, &pdu->sdata[pdu->offset], len);
@@ -108,26 +108,6 @@  pdu_write_u(struct p9_fcall *pdu, const char __user *udata, size_t size)
 	return size - len;
 }
 
-static size_t
-pdu_write_urw(struct p9_fcall *pdu, const char *kdata, const char __user *udata,
-		size_t size)
-{
-	BUG_ON(pdu->size > P9_IOHDRSZ);
-	pdu->pubuf = (char __user *)udata;
-	pdu->pkbuf = (char *)kdata;
-	pdu->pbuf_size = size;
-	return 0;
-}
-
-static size_t
-pdu_write_readdir(struct p9_fcall *pdu, const char *kdata, size_t size)
-{
-	BUG_ON(pdu->size > P9_READDIRHDRSZ);
-	pdu->pkbuf = (char *)kdata;
-	pdu->pbuf_size = size;
-	return 0;
-}
-
 /*
 	b - int8_t
 	w - int16_t
@@ -459,26 +439,6 @@  p9pdu_vwritef(struct p9_fcall *pdu, int proto_version, const char *fmt,
 					errcode = -EFAULT;
 			}
 			break;
-		case 'E':{
-				 int32_t cnt = va_arg(ap, int32_t);
-				 const char *k = va_arg(ap, const void *);
-				 const char __user *u = va_arg(ap,
-							const void __user *);
-				 errcode = p9pdu_writef(pdu, proto_version, "d",
-						 cnt);
-				 if (!errcode && pdu_write_urw(pdu, k, u, cnt))
-					errcode = -EFAULT;
-			 }
-			 break;
-		case 'F':{
-				 int32_t cnt = va_arg(ap, int32_t);
-				 const char *k = va_arg(ap, const void *);
-				 errcode = p9pdu_writef(pdu, proto_version, "d",
-						 cnt);
-				 if (!errcode && pdu_write_readdir(pdu, k, cnt))
-					errcode = -EFAULT;
-			 }
-			 break;
 		case 'U':{
 				int32_t count = va_arg(ap, int32_t);
 				const char __user *udata =
@@ -637,10 +597,6 @@  void p9pdu_reset(struct p9_fcall *pdu)
 {
 	pdu->offset = 0;
 	pdu->size = 0;
-	pdu->private = NULL;
-	pdu->pubuf = NULL;
-	pdu->pkbuf = NULL;
-	pdu->pbuf_size = 0;
 }
 
 int p9dirent_read(char *buf, int len, struct p9_dirent *dirent,
diff --git a/net/9p/protocol.h b/net/9p/protocol.h
index 2431c0f..e5083f3 100644
--- a/net/9p/protocol.h
+++ b/net/9p/protocol.h
@@ -32,3 +32,5 @@  int p9pdu_prepare(struct p9_fcall *pdu, int16_t tag, int8_t type);
 int p9pdu_finalize(struct p9_fcall *pdu);
 void p9pdu_dump(int, struct p9_fcall *);
 void p9pdu_reset(struct p9_fcall *pdu);
+size_t pdu_read(struct p9_fcall *pdu, void *data, size_t size);
+
diff --git a/net/9p/trans_common.c b/net/9p/trans_common.c
index 9a70ebd..de8df95 100644
--- a/net/9p/trans_common.c
+++ b/net/9p/trans_common.c
@@ -21,30 +21,25 @@ 
 
 /**
  *  p9_release_req_pages - Release pages after the transaction.
- *  @*private: PDU's private page of struct trans_rpage_info
  */
-void
-p9_release_req_pages(struct trans_rpage_info *rpinfo)
+void p9_release_pages(struct page **pages, int nr_pages)
 {
 	int i = 0;
-
-	while (rpinfo->rp_data[i] && rpinfo->rp_nr_pages--) {
-		put_page(rpinfo->rp_data[i]);
+	while (pages[i] && nr_pages--) {
+		put_page(pages[i]);
 		i++;
 	}
 }
-EXPORT_SYMBOL(p9_release_req_pages);
+EXPORT_SYMBOL(p9_release_pages);
 
 /**
  * p9_nr_pages - Return number of pages needed to accommodate the payload.
  */
-int
-p9_nr_pages(struct p9_req_t *req)
+int p9_nr_pages(char *data, int len)
 {
 	unsigned long start_page, end_page;
-	start_page =  (unsigned long)req->tc->pubuf >> PAGE_SHIFT;
-	end_page = ((unsigned long)req->tc->pubuf + req->tc->pbuf_size +
-			PAGE_SIZE - 1) >> PAGE_SHIFT;
+	start_page =  (unsigned long)data >> PAGE_SHIFT;
+	end_page = ((unsigned long)data + len + PAGE_SIZE - 1) >> PAGE_SHIFT;
 	return end_page - start_page;
 }
 EXPORT_SYMBOL(p9_nr_pages);
@@ -58,35 +53,17 @@  EXPORT_SYMBOL(p9_nr_pages);
  * @nr_pages: number of pages to accommodate the payload
  * @rw: Indicates if the pages are for read or write.
  */
-int
-p9_payload_gup(struct p9_req_t *req, size_t *pdata_off, int *pdata_len,
-		int nr_pages, u8 rw)
-{
-	uint32_t first_page_bytes = 0;
-	int32_t pdata_mapped_pages;
-	struct trans_rpage_info  *rpinfo;
-
-	*pdata_off = (__force size_t)req->tc->pubuf & (PAGE_SIZE-1);
 
-	if (*pdata_off)
-		first_page_bytes = min(((size_t)PAGE_SIZE - *pdata_off),
-				       req->tc->pbuf_size);
+int p9_payload_gup(char *data, int *nr_pages, struct page **pages, int write)
+{
+	int nr_mapped_pages;
 
-	rpinfo = req->tc->private;
-	pdata_mapped_pages = get_user_pages_fast((unsigned long)req->tc->pubuf,
-			nr_pages, rw, &rpinfo->rp_data[0]);
-	if (pdata_mapped_pages <= 0)
-		return pdata_mapped_pages;
+	nr_mapped_pages = get_user_pages_fast((unsigned long)data,
+					      *nr_pages, write, pages);
+	if (nr_mapped_pages <= 0)
+		return nr_mapped_pages;
 
-	rpinfo->rp_nr_pages = pdata_mapped_pages;
-	if (*pdata_off) {
-		*pdata_len = first_page_bytes;
-		*pdata_len += min((req->tc->pbuf_size - *pdata_len),
-				((size_t)pdata_mapped_pages - 1) << PAGE_SHIFT);
-	} else {
-		*pdata_len = min(req->tc->pbuf_size,
-				(size_t)pdata_mapped_pages << PAGE_SHIFT);
-	}
+	*nr_pages = nr_mapped_pages;
 	return 0;
 }
 EXPORT_SYMBOL(p9_payload_gup);
diff --git a/net/9p/trans_common.h b/net/9p/trans_common.h
index 7630922..173bb55 100644
--- a/net/9p/trans_common.h
+++ b/net/9p/trans_common.h
@@ -12,21 +12,6 @@ 
  *
  */
 
-/* TRUE if it is user context */
-#define P9_IS_USER_CONTEXT (!segment_eq(get_fs(), KERNEL_DS))
-
-/**
- * struct trans_rpage_info - To store mapped page information in PDU.
- * @rp_alloc:Set if this structure is allocd, not a reuse unused space in pdu.
- * @rp_nr_pages: Number of mapped pages
- * @rp_data: Array of page pointers
- */
-struct trans_rpage_info {
-	u8 rp_alloc;
-	int rp_nr_pages;
-	struct page *rp_data[0];
-};
-
-void p9_release_req_pages(struct trans_rpage_info *);
-int p9_payload_gup(struct p9_req_t *, size_t *, int *, int, u8);
-int p9_nr_pages(struct p9_req_t *);
+void p9_release_pages(struct page **, int);
+int p9_payload_gup(char *, int *, struct page **, int);
+int p9_nr_pages(char *, int);
diff --git a/net/9p/trans_virtio.c b/net/9p/trans_virtio.c
index 175b513..e03cde0 100644
--- a/net/9p/trans_virtio.c
+++ b/net/9p/trans_virtio.c
@@ -163,17 +163,6 @@  static void req_done(struct virtqueue *vq)
 		P9_DPRINTK(P9_DEBUG_TRANS, ": rc %p\n", rc);
 		P9_DPRINTK(P9_DEBUG_TRANS, ": lookup tag %d\n", rc->tag);
 		req = p9_tag_lookup(chan->client, rc->tag);
-		if (req->tc->private) {
-			struct trans_rpage_info *rp = req->tc->private;
-			int p = rp->rp_nr_pages;
-			/*Release pages */
-			p9_release_req_pages(rp);
-			atomic_sub(p, &vp_pinned);
-			wake_up(&vp_wq);
-			if (rp->rp_alloc)
-				kfree(rp);
-			req->tc->private = NULL;
-		}
 		req->status = REQ_STATUS_RCVD;
 		p9_client_cb(chan->client, req);
 	}
@@ -193,9 +182,8 @@  static void req_done(struct virtqueue *vq)
  *
  */
 
-static int
-pack_sg_list(struct scatterlist *sg, int start, int limit, char *data,
-								int count)
+static int pack_sg_list(struct scatterlist *sg, int start,
+			int limit, char *data, int count)
 {
 	int s;
 	int index = start;
@@ -225,30 +213,33 @@  static int p9_virtio_cancel(struct p9_client *client, struct p9_req_t *req)
  * @sg: scatter/gather list to pack into
  * @start: which segment of the sg_list to start at
  * @pdata_off: Offset into the first page
+ * @end_page_len: length of the last page
  * @**pdata: a list of pages to add into sg.
- * @count: amount of data to pack into the scatter/gather list
+ * @nr_pages: number of pages to pack into the scatter/gather list
  */
 static int
-pack_sg_list_p(struct scatterlist *sg, int start, int limit, size_t pdata_off,
-		struct page **pdata, int count)
+pack_sg_list_p(struct scatterlist *sg, int start, int limit,
+	       size_t pdata_off, int end_page_len,
+		struct page **pdata, int nr_pages)
 {
-	int s;
 	int i = 0;
+	int s = PAGE_SIZE;
 	int index = start;
 
+	BUG_ON(nr_pages > (limit - start));
 	if (pdata_off) {
-		s = min((int)(PAGE_SIZE - pdata_off), count);
+		if (nr_pages == 1)
+			s = end_page_len;
 		sg_set_page(&sg[index++], pdata[i++], s, pdata_off);
-		count -= s;
+		nr_pages--;
 	}
-
-	while (count) {
-		BUG_ON(index > limit);
-		s = min((int)PAGE_SIZE, count);
+	while (nr_pages) {
+		if (nr_pages == 1)
+			s = end_page_len;
 		sg_set_page(&sg[index++], pdata[i++], s, 0);
-		count -= s;
+		nr_pages--;
 	}
-	return index-start;
+	return index - start;
 }
 
 /**
@@ -261,143 +252,224 @@  pack_sg_list_p(struct scatterlist *sg, int start, int limit, size_t pdata_off,
 static int
 p9_virtio_request(struct p9_client *client, struct p9_req_t *req)
 {
-	int in, out, inp, outp;
+	int err;
+	int in, out;
+	unsigned long flags;
 	struct virtio_chan *chan = client->trans;
 	char *rdata = (char *)req->rc+sizeof(struct p9_fcall);
-	unsigned long flags;
-	size_t pdata_off = 0;
-	struct trans_rpage_info *rpinfo = NULL;
-	int err, pdata_len = 0;
 
 	P9_DPRINTK(P9_DEBUG_TRANS, "9p debug: virtio request\n");
 
 	req->status = REQ_STATUS_SENT;
+req_retry_pinned:
+	spin_lock_irqsave(&chan->lock, flags);
 
-	if (req->tc->pbuf_size && (req->tc->pubuf && P9_IS_USER_CONTEXT)) {
-		int nr_pages = p9_nr_pages(req);
-		int rpinfo_size = sizeof(struct trans_rpage_info) +
-			sizeof(struct page *) * nr_pages;
+	/* Handle out VirtIO ring buffers */
+	out = pack_sg_list(chan->sg, 0, VIRTQUEUE_NUM, req->tc->sdata,
+			   req->tc->size);
 
-		if (atomic_read(&vp_pinned) >= chan->p9_max_pages) {
-			err = wait_event_interruptible(vp_wq,
-				atomic_read(&vp_pinned) < chan->p9_max_pages);
+	in = pack_sg_list(chan->sg, out, VIRTQUEUE_NUM,
+			  rdata, req->rc->capacity);
+
+	err = virtqueue_add_buf(chan->vq, chan->sg, out, in, req->tc);
+	if (err < 0) {
+		if (err == -ENOSPC) {
+			chan->ring_bufs_avail = 0;
+			spin_unlock_irqrestore(&chan->lock, flags);
+			err = wait_event_interruptible(*chan->vc_wq,
+							chan->ring_bufs_avail);
 			if (err  == -ERESTARTSYS)
 				return err;
-			P9_DPRINTK(P9_DEBUG_TRANS, "9p: May gup pages now.\n");
-		}
 
-		if (rpinfo_size <= (req->tc->capacity - req->tc->size)) {
-			/* We can use sdata */
-			req->tc->private = req->tc->sdata + req->tc->size;
-			rpinfo = (struct trans_rpage_info *)req->tc->private;
-			rpinfo->rp_alloc = 0;
+			P9_DPRINTK(P9_DEBUG_TRANS, "9p:Retry virtio request\n");
+			goto req_retry_pinned;
 		} else {
-			req->tc->private = kmalloc(rpinfo_size, GFP_NOFS);
-			if (!req->tc->private) {
-				P9_DPRINTK(P9_DEBUG_TRANS, "9p debug: "
-					"private kmalloc returned NULL");
-				return -ENOMEM;
-			}
-			rpinfo = (struct trans_rpage_info *)req->tc->private;
-			rpinfo->rp_alloc = 1;
+			spin_unlock_irqrestore(&chan->lock, flags);
+			P9_DPRINTK(P9_DEBUG_TRANS,
+					"9p debug: "
+					"virtio rpc add_buf returned failure");
+			return -EIO;
 		}
+	}
+	virtqueue_kick(chan->vq);
+	spin_unlock_irqrestore(&chan->lock, flags);
+
+	P9_DPRINTK(P9_DEBUG_TRANS, "9p debug: virtio request kicked\n");
+	return 0;
+}
 
-		err = p9_payload_gup(req, &pdata_off, &pdata_len, nr_pages,
-				req->tc->id == P9_TREAD ? 1 : 0);
-		if (err < 0) {
-			if (rpinfo->rp_alloc)
-				kfree(rpinfo);
+static int p9_get_mapped_pages(struct virtio_chan *chan,
+			       struct page **pages, char *data,
+			       int nr_pages, int write, int kern_buf)
+{
+	int err;
+	if (!kern_buf) {
+		/*
+		 * We allow only p9_max_pages pinned. We wait for the
+		 * Other zc request to finish here
+		 */
+		if (atomic_read(&vp_pinned) >= chan->p9_max_pages) {
+			err = wait_event_interruptible(vp_wq,
+			      (atomic_read(&vp_pinned) < chan->p9_max_pages));
+			if (err == -ERESTARTSYS)
+				return err;
+		}
+		err = p9_payload_gup(data, &nr_pages, pages, write);
+		if (err < 0)
 			return err;
-		} else {
-			atomic_add(rpinfo->rp_nr_pages, &vp_pinned);
+		atomic_add(nr_pages, &vp_pinned);
+	} else {
+		/* kernel buffer, no need to pin pages */
+		int s, index = 0;
+		int count = nr_pages;
+		while (nr_pages) {
+			s = rest_of_page(data);
+			pages[index++] = virt_to_page(data);
+			data += s;
+			nr_pages--;
 		}
+		nr_pages = count;
 	}
+	return nr_pages;
+}
 
-req_retry_pinned:
-	spin_lock_irqsave(&chan->lock, flags);
+/**
+ * p9_virtio_zc_request - issue a zero copy request
+ * @client: client instance issuing the request
+ * @req: request to be issued
+ * @uidata: user bffer that should be ued for zero copy read
+ * @uodata: user buffer that shoud be user for zero copy write
+ * @inlen: read buffer size
+ * @olen: write buffer size
+ * @hdrlen: reader header size, This is the size of response protocol data
+ *
+ */
+static int
+p9_virtio_zc_request(struct p9_client *client, struct p9_req_t *req,
+		     char *uidata, char *uodata, int inlen,
+		     int outlen, int in_hdr_len, int kern_buf)
+{
+	int in, out, err;
+	unsigned long flags;
+	int in_nr_pages = 0, out_nr_pages = 0;
+	size_t data_in_off = 0, data_out_off = 0;
+	int end_in_page_len = 0, end_out_page_len = 0;
+	struct page **in_pages = NULL, **out_pages = NULL;
+	struct virtio_chan *chan = client->trans;
+	char *rdata = (char *)req->rc->sdata;
 
-	/* Handle out VirtIO ring buffers */
-	out = pack_sg_list(chan->sg, 0, VIRTQUEUE_NUM, req->tc->sdata,
-			req->tc->size);
+	P9_DPRINTK(P9_DEBUG_TRANS, "9p debug: virtio request\n");
 
-	if (req->tc->pbuf_size && (req->tc->id == P9_TWRITE)) {
-		/* We have additional write payload buffer to take care */
-		if (req->tc->pubuf && P9_IS_USER_CONTEXT) {
-			outp = pack_sg_list_p(chan->sg, out, VIRTQUEUE_NUM,
-					pdata_off, rpinfo->rp_data, pdata_len);
-		} else {
-			char *pbuf;
-			if (req->tc->pubuf)
-				pbuf = (__force char *) req->tc->pubuf;
-			else
-				pbuf = req->tc->pkbuf;
-			outp = pack_sg_list(chan->sg, out, VIRTQUEUE_NUM, pbuf,
-					req->tc->pbuf_size);
+	if (uidata) {
+		in_nr_pages = p9_nr_pages(uidata, inlen);
+		in_pages = kmalloc(sizeof(struct page *) * in_nr_pages,
+				   GFP_NOFS);
+		if (!in_pages) {
+			err = -ENOMEM;
+			goto err_out;
 		}
-		out += outp;
+		in_nr_pages = p9_get_mapped_pages(chan, in_pages, uidata,
+						  in_nr_pages, 1, kern_buf);
+		if (in_nr_pages < 0) {
+			err = in_nr_pages;
+			kfree(in_pages);
+			in_pages = NULL;
+			goto err_out;
+		}
+		data_in_off = offset_in_page(uidata);
+		end_in_page_len = PAGE_SIZE -
+			((in_nr_pages * PAGE_SIZE) - (inlen + data_in_off));
 	}
-
-	/* Handle in VirtIO ring buffers */
-	if (req->tc->pbuf_size &&
-		((req->tc->id == P9_TREAD) || (req->tc->id == P9_TREADDIR))) {
-		/*
-		 * Take care of additional Read payload.
-		 * 11 is the read/write header = PDU Header(7) + IO Size (4).
-		 * Arrange in such a way that server places header in the
-		 * alloced memory and payload onto the user buffer.
-		 */
-		inp = pack_sg_list(chan->sg, out, VIRTQUEUE_NUM, rdata, 11);
-		/*
-		 * Running executables in the filesystem may result in
-		 * a read request with kernel buffer as opposed to user buffer.
-		 */
-		if (req->tc->pubuf && P9_IS_USER_CONTEXT) {
-			in = pack_sg_list_p(chan->sg, out+inp, VIRTQUEUE_NUM,
-					pdata_off, rpinfo->rp_data, pdata_len);
-		} else {
-			char *pbuf;
-			if (req->tc->pubuf)
-				pbuf = (__force char *) req->tc->pubuf;
-			else
-				pbuf = req->tc->pkbuf;
-
-			in = pack_sg_list(chan->sg, out+inp, VIRTQUEUE_NUM,
-					pbuf, req->tc->pbuf_size);
+	if (uodata) {
+		out_nr_pages = p9_nr_pages(uodata, outlen);
+		out_pages = kmalloc(sizeof(struct page *) * out_nr_pages,
+				    GFP_NOFS);
+		if (!out_pages) {
+			err = -ENOMEM;
+			goto err_out;
 		}
-		in += inp;
-	} else {
-		in = pack_sg_list(chan->sg, out, VIRTQUEUE_NUM, rdata,
-				req->rc->capacity);
+		out_nr_pages = p9_get_mapped_pages(chan, out_pages, uodata,
+						   out_nr_pages, 0, kern_buf);
+		if (out_nr_pages < 0) {
+			err = out_nr_pages;
+			kfree(out_pages);
+			out_pages = NULL;
+			goto err_out;
+		}
+		data_out_off = offset_in_page(uodata);
+		end_out_page_len = PAGE_SIZE -
+			((out_nr_pages * PAGE_SIZE) - (outlen + data_out_off));
 	}
 
+	req->status = REQ_STATUS_SENT;
+req_retry_pinned:
+	spin_lock_irqsave(&chan->lock, flags);
+	/* out data */
+	out = pack_sg_list(chan->sg, 0, VIRTQUEUE_NUM, req->tc->sdata,
+			   req->tc->size);
+
+	if (out_pages)
+		out += pack_sg_list_p(chan->sg, out, VIRTQUEUE_NUM,
+				      data_out_off, end_out_page_len,
+				      out_pages, out_nr_pages);
+	/*
+	 * Take care of in data
+	 * For example TREAD have 11.
+	 * 11 is the read/write header = PDU Header(7) + IO Size (4).
+	 * Arrange in such a way that server places header in the
+	 * alloced memory and payload onto the user buffer.
+	 */
+	in = pack_sg_list(chan->sg, out, VIRTQUEUE_NUM, rdata, in_hdr_len);
+	if (in_pages)
+		in += pack_sg_list_p(chan->sg, out + in, VIRTQUEUE_NUM,
+				     data_in_off, end_in_page_len,
+				     in_pages, in_nr_pages);
+
 	err = virtqueue_add_buf(chan->vq, chan->sg, out, in, req->tc);
 	if (err < 0) {
 		if (err == -ENOSPC) {
 			chan->ring_bufs_avail = 0;
 			spin_unlock_irqrestore(&chan->lock, flags);
 			err = wait_event_interruptible(*chan->vc_wq,
-							chan->ring_bufs_avail);
+						       chan->ring_bufs_avail);
 			if (err  == -ERESTARTSYS)
-				return err;
+				goto err_out;
 
 			P9_DPRINTK(P9_DEBUG_TRANS, "9p:Retry virtio request\n");
 			goto req_retry_pinned;
 		} else {
 			spin_unlock_irqrestore(&chan->lock, flags);
 			P9_DPRINTK(P9_DEBUG_TRANS,
-					"9p debug: "
-					"virtio rpc add_buf returned failure");
-			if (rpinfo && rpinfo->rp_alloc)
-				kfree(rpinfo);
-			return -EIO;
+				   "9p debug: "
+				   "virtio rpc add_buf returned failure");
+			err = -EIO;
+			goto err_out;
 		}
 	}
-
 	virtqueue_kick(chan->vq);
 	spin_unlock_irqrestore(&chan->lock, flags);
-
 	P9_DPRINTK(P9_DEBUG_TRANS, "9p debug: virtio request kicked\n");
-	return 0;
+	err = wait_event_interruptible(*req->wq,
+				       req->status >= REQ_STATUS_RCVD);
+	/*
+	 * Non kernel buffers are pinned, unpin them
+	 */
+err_out:
+	if (!kern_buf) {
+		if (in_pages) {
+			p9_release_pages(in_pages, in_nr_pages);
+			atomic_sub(in_nr_pages, &vp_pinned);
+		}
+		if (out_pages) {
+			p9_release_pages(out_pages, out_nr_pages);
+			atomic_sub(out_nr_pages, &vp_pinned);
+		}
+		/* wakeup anybody waiting for slots to pin pages */
+		wake_up(&vp_wq);
+	}
+	kfree(in_pages);
+	kfree(out_pages);
+	return err;
 }
 
 static ssize_t p9_mount_tag_show(struct device *dev,
@@ -591,9 +663,9 @@  static struct p9_trans_module p9_virtio_trans = {
 	.create = p9_virtio_create,
 	.close = p9_virtio_close,
 	.request = p9_virtio_request,
+	.zc_request = p9_virtio_zc_request,
 	.cancel = p9_virtio_cancel,
 	.maxsize = PAGE_SIZE*VIRTQUEUE_NUM,
-	.pref = P9_TRANS_PREF_PAYLOAD_SEP,
 	.def = 0,
 	.owner = THIS_MODULE,
 };