From patchwork Thu Feb 27 21:16:47 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: James Simmons X-Patchwork-Id: 11410859 Return-Path: Received: from mail.kernel.org (pdx-korg-mail-1.web.codeaurora.org [172.30.200.123]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id C518D1580 for ; Thu, 27 Feb 2020 21:48:41 +0000 (UTC) Received: from pdx1-mailman02.dreamhost.com (pdx1-mailman02.dreamhost.com [64.90.62.194]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by mail.kernel.org (Postfix) with ESMTPS id AD78924690 for ; Thu, 27 Feb 2020 21:48:41 +0000 (UTC) DMARC-Filter: OpenDMARC Filter v1.3.2 mail.kernel.org AD78924690 Authentication-Results: mail.kernel.org; dmarc=none (p=none dis=none) header.from=infradead.org Authentication-Results: mail.kernel.org; spf=none smtp.mailfrom=lustre-devel-bounces@lists.lustre.org Received: from pdx1-mailman02.dreamhost.com (localhost [IPv6:::1]) by pdx1-mailman02.dreamhost.com (Postfix) with ESMTP id 5C46034B86F; Thu, 27 Feb 2020 13:39:04 -0800 (PST) X-Original-To: lustre-devel@lists.lustre.org Delivered-To: lustre-devel-lustre.org@pdx1-mailman02.dreamhost.com Received: from smtp3.ccs.ornl.gov (smtp3.ccs.ornl.gov [160.91.203.39]) by pdx1-mailman02.dreamhost.com (Postfix) with ESMTP id 599A121FC51 for ; Thu, 27 Feb 2020 13:21:06 -0800 (PST) Received: from star.ccs.ornl.gov (star.ccs.ornl.gov [160.91.202.134]) by smtp3.ccs.ornl.gov (Postfix) with ESMTP id 9640991B5; Thu, 27 Feb 2020 16:18:19 -0500 (EST) Received: by star.ccs.ornl.gov (Postfix, from userid 2004) id 93AF546D; Thu, 27 Feb 2020 16:18:19 -0500 (EST) From: James Simmons To: Andreas Dilger , Oleg Drokin , NeilBrown Date: Thu, 27 Feb 2020 16:16:47 -0500 Message-Id: <1582838290-17243-540-git-send-email-jsimmons@infradead.org> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1582838290-17243-1-git-send-email-jsimmons@infradead.org> References: <1582838290-17243-1-git-send-email-jsimmons@infradead.org> Subject: [lustre-devel] [PATCH 539/622] lustre: ldlm: FLOCK request can be processed twice X-BeenThere: lustre-devel@lists.lustre.org X-Mailman-Version: 2.1.23 Precedence: list List-Id: "For discussing Lustre software development." List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: Vitaly Fertman , Andriy Skulysh , Lustre Development List MIME-Version: 1.0 Errors-To: lustre-devel-bounces@lists.lustre.org Sender: "lustre-devel" From: Andriy Skulysh Original request can be processed after resend request, so it can create a lock on MDT without client lock or unlock other lock. Make flock enqueue to use modify RPC slot. Cray-bug-id: LUS-5739 WC-bug-id: https://jira.whamcloud.com/browse/LU-12828 Lustre-commit: 85a12c6c8d7a ("LU-12828 ldlm: FLOCK request can be processed twice") Signed-off-by: Andriy Skulysh Signed-off-by: Vitaly Fertman Reviewed-by: Alexander Boyko Reviewed-by: Andrew Perepechko Reviewed-on: https://review.whamcloud.com/36340 Reviewed-by: Alexandr Boyko Reviewed-by: Oleg Drokin Signed-off-by: James Simmons --- fs/lustre/include/lustre_dlm.h | 3 +++ fs/lustre/include/lustre_mdc.h | 25 ----------------------- fs/lustre/include/lustre_net.h | 3 ++- fs/lustre/include/obd_class.h | 6 ++---- fs/lustre/ldlm/ldlm_request.c | 34 ++++++++++++++++++++++++++++--- fs/lustre/mdc/mdc_locks.c | 45 +++++++++++++----------------------------- fs/lustre/mdc/mdc_reint.c | 4 ++-- fs/lustre/mdc/mdc_request.c | 20 +++++++++---------- fs/lustre/obdclass/genops.c | 30 +++++----------------------- fs/lustre/ptlrpc/client.c | 29 +++++++++++++++++++++++++-- 10 files changed, 96 insertions(+), 103 deletions(-) diff --git a/fs/lustre/include/lustre_dlm.h b/fs/lustre/include/lustre_dlm.h index 7621d1e..31d360e 100644 --- a/fs/lustre/include/lustre_dlm.h +++ b/fs/lustre/include/lustre_dlm.h @@ -959,6 +959,8 @@ struct ldlm_enqueue_info { void *ei_cbdata; /* whether enqueue slave stripes */ unsigned int ei_enq_slave:1; + /* whether acquire rpc slot */ + unsigned int ei_enq_slot:1; }; extern struct obd_ops ldlm_obd_ops; @@ -1279,6 +1281,7 @@ int ldlm_prep_elc_req(struct obd_export *exp, int version, int opc, int canceloff, struct list_head *cancels, int count); +struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, int lvb_len); int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, enum ldlm_type type, u8 with_policy, enum ldlm_mode mode, diff --git a/fs/lustre/include/lustre_mdc.h b/fs/lustre/include/lustre_mdc.h index f57783d..d7b6e4a 100644 --- a/fs/lustre/include/lustre_mdc.h +++ b/fs/lustre/include/lustre_mdc.h @@ -60,31 +60,6 @@ struct ptlrpc_request; struct obd_device; -static inline void mdc_get_mod_rpc_slot(struct ptlrpc_request *req, - struct lookup_intent *it) -{ - struct client_obd *cli = &req->rq_import->imp_obd->u.cli; - u32 opc; - u16 tag; - - opc = lustre_msg_get_opc(req->rq_reqmsg); - tag = obd_get_mod_rpc_slot(cli, opc, it); - lustre_msg_set_tag(req->rq_reqmsg, tag); - ptlrpc_reassign_next_xid(req); -} - -static inline void mdc_put_mod_rpc_slot(struct ptlrpc_request *req, - struct lookup_intent *it) -{ - struct client_obd *cli = &req->rq_import->imp_obd->u.cli; - u32 opc; - u16 tag; - - opc = lustre_msg_get_opc(req->rq_reqmsg); - tag = lustre_msg_get_tag(req->rq_reqmsg); - obd_put_mod_rpc_slot(cli, opc, it, tag); -} - /** * Update the maximum possible easize. * diff --git a/fs/lustre/include/lustre_net.h b/fs/lustre/include/lustre_net.h index 87e1d60..90a0b01 100644 --- a/fs/lustre/include/lustre_net.h +++ b/fs/lustre/include/lustre_net.h @@ -1919,7 +1919,8 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req, u64 ptlrpc_next_xid(void); u64 ptlrpc_sample_next_xid(void); u64 ptlrpc_req_xid(struct ptlrpc_request *request); -void ptlrpc_reassign_next_xid(struct ptlrpc_request *req); +void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req); +void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req); /* Set of routines to run a function in ptlrpcd context */ void *ptlrpcd_alloc_work(struct obd_import *imp, diff --git a/fs/lustre/include/obd_class.h b/fs/lustre/include/obd_class.h index bc01eca..a099768 100644 --- a/fs/lustre/include/obd_class.h +++ b/fs/lustre/include/obd_class.h @@ -115,10 +115,8 @@ static inline char *obd_import_nid2str(struct obd_import *imp) int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, u16 max); int obd_mod_rpc_stats_seq_show(struct client_obd *cli, struct seq_file *seq); -u16 obd_get_mod_rpc_slot(struct client_obd *cli, u32 opc, - struct lookup_intent *it); -void obd_put_mod_rpc_slot(struct client_obd *cli, u32 opc, - struct lookup_intent *it, u16 tag); +u16 obd_get_mod_rpc_slot(struct client_obd *cli, u32 opc); +void obd_put_mod_rpc_slot(struct client_obd *cli, u32 opc, u16 tag); struct llog_handle; struct llog_rec_hdr; diff --git a/fs/lustre/ldlm/ldlm_request.c b/fs/lustre/ldlm/ldlm_request.c index 20bdba4..6df057d 100644 --- a/fs/lustre/ldlm/ldlm_request.c +++ b/fs/lustre/ldlm/ldlm_request.c @@ -347,6 +347,11 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns, } } +static bool ldlm_request_slot_needed(enum ldlm_type type) +{ + return type == LDLM_FLOCK || type == LDLM_IBITS; +} + /** * Finishing portion of client lock enqueue code. * @@ -365,6 +370,11 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, struct ldlm_reply *reply; int cleanup_phase = 1; + if (ldlm_request_slot_needed(type)) + obd_put_request_slot(&req->rq_import->imp_obd->u.cli); + + ptlrpc_put_mod_rpc_slot(req); + lock = ldlm_handle2lock(lockh); /* ldlm_cli_enqueue is holding a reference on this lock. */ if (!lock) { @@ -662,8 +672,7 @@ int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req, } EXPORT_SYMBOL(ldlm_prep_enqueue_req); -static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, - int lvb_len) +struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, int lvb_len) { struct ptlrpc_request *req; int rc; @@ -682,6 +691,7 @@ static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, ptlrpc_request_set_replen(req); return req; } +EXPORT_SYMBOL(ldlm_enqueue_pack); /** * Client-side lock enqueue. @@ -814,6 +824,24 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, LDLM_GLIMPSE_ENQUEUE); } + /* It is important to obtain modify RPC slot first (if applicable), so + * that threads that are waiting for a modify RPC slot are not polluting + * our rpcs in flight counter. + */ + if (einfo->ei_enq_slot) + ptlrpc_get_mod_rpc_slot(req); + + if (ldlm_request_slot_needed(einfo->ei_type)) { + rc = obd_get_request_slot(&req->rq_import->imp_obd->u.cli); + if (rc) { + if (einfo->ei_enq_slot) + ptlrpc_put_mod_rpc_slot(req); + failed_lock_cleanup(ns, lock, einfo->ei_mode); + LDLM_LOCK_RELEASE(lock); + goto out; + } + } + if (async) { LASSERT(reqp); return 0; @@ -835,7 +863,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, LDLM_LOCK_RELEASE(lock); else rc = err; - +out: if (!req_passed_in && req) { ptlrpc_req_finished(req); if (reqp) diff --git a/fs/lustre/mdc/mdc_locks.c b/fs/lustre/mdc/mdc_locks.c index 4d40087..60bbae1 100644 --- a/fs/lustre/mdc/mdc_locks.c +++ b/fs/lustre/mdc/mdc_locks.c @@ -856,6 +856,16 @@ static int mdc_finish_enqueue(struct obd_export *exp, return rc; } +static inline bool mdc_skip_mod_rpc_slot(const struct lookup_intent *it) +{ + if (it && + (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP || + it->it_op == IT_READDIR || + (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE)))) + return true; + return false; +} + /* We always reserve enough space in the reply packet for a stripe MD, because * we don't know in advance the file type. */ @@ -877,7 +887,7 @@ int mdc_enqueue_base(struct obd_export *exp, struct ldlm_enqueue_info *einfo, .l_inodebits = { MDS_INODELOCK_XATTR } }; struct obd_device *obddev = class_exp2obd(exp); - struct ptlrpc_request *req = NULL; + struct ptlrpc_request *req; u64 flags, saved_flags = extra_lock_flags; struct ldlm_res_id res_id; int generation, resends = 0; @@ -920,6 +930,7 @@ int mdc_enqueue_base(struct obd_export *exp, struct ldlm_enqueue_info *einfo, LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n", einfo->ei_type); res_id.name[3] = LDLM_FLOCK; + req = ldlm_enqueue_pack(exp, 0); } else if (it->it_op & IT_OPEN) { req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize); } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) { @@ -947,21 +958,7 @@ int mdc_enqueue_base(struct obd_export *exp, struct ldlm_enqueue_info *einfo, req->rq_sent = ktime_get_real_seconds() + resends; } - /* It is important to obtain modify RPC slot first (if applicable), so - * that threads that are waiting for a modify RPC slot are not polluting - * our rpcs in flight counter. - * We do not do flock request limiting, though - */ - if (it) { - mdc_get_mod_rpc_slot(req, it); - rc = obd_get_request_slot(&obddev->u.cli); - if (rc != 0) { - mdc_put_mod_rpc_slot(req, it); - mdc_clear_replay_flag(req, 0); - ptlrpc_req_finished(req); - return rc; - } - } + einfo->ei_enq_slot = !mdc_skip_mod_rpc_slot(it); /* With Data-on-MDT the glimpse callback is needed too. * It is set here in advance but not in mdc_finish_enqueue() @@ -987,12 +984,10 @@ int mdc_enqueue_base(struct obd_export *exp, struct ldlm_enqueue_info *einfo, (einfo->ei_type == LDLM_FLOCK) && (einfo->ei_mode == LCK_NL)) goto resend; + ptlrpc_req_finished(req); return rc; } - obd_put_request_slot(&obddev->u.cli); - mdc_put_mod_rpc_slot(req, it); - if (rc < 0) { CDEBUG(D_INFO, "%s: ldlm_cli_enqueue " DFID ":" DFID "=%s failed: rc = %d\n", @@ -1343,16 +1338,12 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env, struct ldlm_enqueue_info *einfo = &minfo->mi_einfo; struct lookup_intent *it; struct lustre_handle *lockh; - struct obd_device *obddev; struct ldlm_reply *lockrep; u64 flags = LDLM_FL_HAS_INTENT; it = &minfo->mi_it; lockh = &minfo->mi_lockh; - obddev = class_exp2obd(exp); - - obd_put_request_slot(&obddev->u.cli); if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE)) rc = -ETIMEDOUT; @@ -1387,7 +1378,6 @@ int mdc_intent_getattr_async(struct obd_export *exp, struct lookup_intent *it = &minfo->mi_it; struct ptlrpc_request *req; struct mdc_getattr_args *ga; - struct obd_device *obddev = class_exp2obd(exp); struct ldlm_res_id res_id; union ldlm_policy_data policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE } @@ -1409,12 +1399,6 @@ int mdc_intent_getattr_async(struct obd_export *exp, if (IS_ERR(req)) return PTR_ERR(req); - rc = obd_get_request_slot(&obddev->u.cli); - if (rc != 0) { - ptlrpc_req_finished(req); - return rc; - } - /* With Data-on-MDT the glimpse callback is needed too. * It is set here in advance but not in mdc_finish_enqueue() * to avoid possible races. It is safe to have glimpse handler @@ -1426,7 +1410,6 @@ int mdc_intent_getattr_async(struct obd_export *exp, rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy, &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1); if (rc < 0) { - obd_put_request_slot(&obddev->u.cli); ptlrpc_req_finished(req); return rc; } diff --git a/fs/lustre/mdc/mdc_reint.c b/fs/lustre/mdc/mdc_reint.c index 0dc0de4..dade5686 100644 --- a/fs/lustre/mdc/mdc_reint.c +++ b/fs/lustre/mdc/mdc_reint.c @@ -47,9 +47,9 @@ static int mdc_reint(struct ptlrpc_request *request, int level) request->rq_send_state = level; - mdc_get_mod_rpc_slot(request, NULL); + ptlrpc_get_mod_rpc_slot(request); rc = ptlrpc_queue_wait(request); - mdc_put_mod_rpc_slot(request, NULL); + ptlrpc_put_mod_rpc_slot(request); if (rc) CDEBUG(D_INFO, "error in handling %d\n", rc); else if (!req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY)) diff --git a/fs/lustre/mdc/mdc_request.c b/fs/lustre/mdc/mdc_request.c index 54f6d15..8569858 100644 --- a/fs/lustre/mdc/mdc_request.c +++ b/fs/lustre/mdc/mdc_request.c @@ -412,12 +412,12 @@ static int mdc_xattr_common(struct obd_export *exp, /* make rpc */ if (opcode == MDS_REINT) - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); if (opcode == MDS_REINT) - mdc_put_mod_rpc_slot(req, NULL); + ptlrpc_put_mod_rpc_slot(req); if (rc) ptlrpc_req_finished(req); @@ -990,9 +990,9 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data, ptlrpc_request_set_replen(req); - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); - mdc_put_mod_rpc_slot(req, NULL); + ptlrpc_put_mod_rpc_slot(req); if (!req->rq_repmsg) { CDEBUG(D_RPCTRACE, "request %p failed to send: rc = %d\n", req, @@ -1779,9 +1779,9 @@ static int mdc_ioc_hsm_progress(struct obd_export *exp, ptlrpc_request_set_replen(req); - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); - mdc_put_mod_rpc_slot(req, NULL); + ptlrpc_put_mod_rpc_slot(req); out: ptlrpc_req_finished(req); return rc; @@ -1984,9 +1984,9 @@ static int mdc_ioc_hsm_state_set(struct obd_export *exp, ptlrpc_request_set_replen(req); - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); - mdc_put_mod_rpc_slot(req, NULL); + ptlrpc_put_mod_rpc_slot(req); out: ptlrpc_req_finished(req); return rc; @@ -2049,9 +2049,9 @@ static int mdc_ioc_hsm_request(struct obd_export *exp, ptlrpc_request_set_replen(req); - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); - mdc_put_mod_rpc_slot(req, NULL); + ptlrpc_put_mod_rpc_slot(req); out: ptlrpc_req_finished(req); return rc; diff --git a/fs/lustre/obdclass/genops.c b/fs/lustre/obdclass/genops.c index 7f841d5..bceb055 100644 --- a/fs/lustre/obdclass/genops.c +++ b/fs/lustre/obdclass/genops.c @@ -1495,36 +1495,18 @@ static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli, return avail; } -static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it) -{ - if (it && - (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP || - it->it_op == IT_READDIR || - (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE)))) - return true; - return false; -} - /* Get a modify RPC slot from the obd client @cli according - * to the kind of operation @opc that is going to be sent - * and the intent @it of the operation if it applies. + * to the kind of operation @opc that is going to be sent. * If the maximum number of modify RPCs in flight is reached * the thread is put to sleep. * Returns the tag to be set in the request message. Tag 0 * is reserved for non-modifying requests. */ -u16 obd_get_mod_rpc_slot(struct client_obd *cli, u32 opc, - struct lookup_intent *it) +u16 obd_get_mod_rpc_slot(struct client_obd *cli, u32 opc) { bool close_req = false; u16 i, max; - /* read-only metadata RPCs don't consume a slot on MDT - * for reply reconstruction - */ - if (obd_skip_mod_rpc_slot(it)) - return 0; - if (opc == MDS_CLOSE) close_req = true; @@ -1567,15 +1549,13 @@ u16 obd_get_mod_rpc_slot(struct client_obd *cli, u32 opc, /* * Put a modify RPC slot from the obd client @cli according - * to the kind of operation @opc that has been sent and the - * intent @it of the operation if it applies. + * to the kind of operation @opc that has been sent. */ -void obd_put_mod_rpc_slot(struct client_obd *cli, u32 opc, - struct lookup_intent *it, u16 tag) +void obd_put_mod_rpc_slot(struct client_obd *cli, u32 opc, u16 tag) { bool close_req = false; - if (obd_skip_mod_rpc_slot(it)) + if (tag == 0) return; if (opc == MDS_CLOSE) diff --git a/fs/lustre/ptlrpc/client.c b/fs/lustre/ptlrpc/client.c index 8d874f2..632ddf1 100644 --- a/fs/lustre/ptlrpc/client.c +++ b/fs/lustre/ptlrpc/client.c @@ -717,7 +717,7 @@ static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req) static atomic64_t ptlrpc_last_xid; -void ptlrpc_reassign_next_xid(struct ptlrpc_request *req) +static void ptlrpc_reassign_next_xid(struct ptlrpc_request *req) { spin_lock(&req->rq_import->imp_lock); list_del_init(&req->rq_unreplied_list); @@ -725,7 +725,32 @@ void ptlrpc_reassign_next_xid(struct ptlrpc_request *req) spin_unlock(&req->rq_import->imp_lock); DEBUG_REQ(D_RPCTRACE, req, "reassign xid"); } -EXPORT_SYMBOL(ptlrpc_reassign_next_xid); + +void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req) +{ + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + u32 opc; + u16 tag; + + opc = lustre_msg_get_opc(req->rq_reqmsg); + tag = obd_get_mod_rpc_slot(cli, opc); + lustre_msg_set_tag(req->rq_reqmsg, tag); + ptlrpc_reassign_next_xid(req); +} +EXPORT_SYMBOL(ptlrpc_get_mod_rpc_slot); + +void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req) +{ + u16 tag = lustre_msg_get_tag(req->rq_reqmsg); + + if (tag != 0) { + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + u32 opc = lustre_msg_get_opc(req->rq_reqmsg); + + obd_put_mod_rpc_slot(cli, opc, tag); + } +} +EXPORT_SYMBOL(ptlrpc_put_mod_rpc_slot); int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, u32 version, int opcode, char **bufs,