@@ -593,7 +593,6 @@ enum ldlm_cancel_flags {
LCF_BL_AST = 0x4, /* Cancel locks marked as LDLM_FL_BL_AST
* in the same RPC
*/
- LCF_ONE_LOCK = 0x8, /* Cancel locks pack only one lock. */
};
struct ldlm_flock {
@@ -700,7 +700,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
* we can tell the server we have no lock. Otherwise, we
* should send cancel after dropping the cache.
*/
- if (ldlm_is_ast_sent(lock) || ldlm_is_failed(lock)) {
+ if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) ||
+ ldlm_is_failed(lock)) {
LDLM_DEBUG(lock,
"callback on lock %#llx - lock disappeared",
dlm_req->lock_handle[0].cookie);
@@ -1055,8 +1055,9 @@ static int _ldlm_cancel_pack(struct ptlrpc_request *req, struct ldlm_lock *lock,
* Prepare and send a batched cancel RPC. It will include @count lock
* handles of locks given in @cancels list.
*/
-static int ldlm_cli_cancel_req(struct obd_export *exp, void *ptr,
- int count, enum ldlm_cancel_flags flags)
+static int ldlm_cli_cancel_req(struct obd_export *exp, struct ldlm_lock *lock,
+ struct list_head *head, int count,
+ enum ldlm_cancel_flags flags)
{
struct ptlrpc_request *req = NULL;
struct obd_import *imp;
@@ -1065,6 +1066,7 @@ static int ldlm_cli_cancel_req(struct obd_export *exp, void *ptr,
LASSERT(exp);
LASSERT(count > 0);
+ LASSERT(!head || !lock);
CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, cfs_fail_val);
@@ -1104,10 +1106,7 @@ static int ldlm_cli_cancel_req(struct obd_export *exp, void *ptr,
req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
ptlrpc_at_set_req_timeout(req);
- if (flags & LCF_ONE_LOCK)
- rc = _ldlm_cancel_pack(req, ptr, NULL, count);
- else
- rc = _ldlm_cancel_pack(req, NULL, ptr, count);
+ rc = _ldlm_cancel_pack(req, lock, head, count);
if (rc == 0) {
ptlrpc_req_finished(req);
sent = count;
@@ -1265,7 +1264,8 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
enum ldlm_cancel_flags flags)
{
struct obd_export *exp;
- int avail, count = 1, bl_ast = 0;
+ int avail, count = 1, separate = 0;
+ enum ldlm_lru_flags lru_flags = 0;
u64 rc = 0;
struct ldlm_namespace *ns;
struct ldlm_lock *lock;
@@ -1286,7 +1286,8 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
LDLM_LOCK_RELEASE(lock);
return 0;
}
- bl_ast = 1;
+ if (ldlm_is_canceling(lock))
+ separate = 1;
} else if (ldlm_is_canceling(lock)) {
/* Lock is being canceled and the caller doesn't want to wait */
unlock_res_and_lock(lock);
@@ -1308,11 +1309,18 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
if (rc == LDLM_FL_LOCAL_ONLY || flags & LCF_LOCAL) {
LDLM_LOCK_RELEASE(lock);
return 0;
+ } else if (rc == LDLM_FL_BL_AST) {
+ /* BL_AST lock must not wait. */
+ lru_flags |= LDLM_LRU_FLAG_NO_WAIT;
}
exp = lock->l_conn_export;
- if (bl_ast) { /* Send RPC immedaitly for LDLM_FL_BL_AST */
- ldlm_cli_cancel_req(exp, lock, count, flags | LCF_ONE_LOCK);
+ /* If a lock has been taken from lru for a batched cancel and a later
+ * BL_AST came, send a CANCEL RPC individually for it right away, not
+ * waiting for the batch to be handled.
+ */
+ if (separate) {
+ ldlm_cli_cancel_req(exp, lock, NULL, 1, flags);
LDLM_LOCK_RELEASE(lock);
return 0;
}
@@ -1332,7 +1340,7 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
ns = ldlm_lock_to_ns(lock);
count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
- LCF_BL_AST, 0);
+ LCF_BL_AST, lru_flags);
}
ldlm_cli_cancel_list(&cancels, count, NULL, flags);
@@ -1345,7 +1353,7 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
* Return the number of cancelled locks.
*/
int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
- enum ldlm_cancel_flags flags)
+ enum ldlm_cancel_flags cancel_flags)
{
LIST_HEAD(head);
struct ldlm_lock *lock, *next;
@@ -1357,7 +1365,7 @@ int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
if (left-- == 0)
break;
- if (flags & LCF_LOCAL) {
+ if (cancel_flags & LCF_LOCAL) {
rc = LDLM_FL_LOCAL_ONLY;
ldlm_lock_cancel(lock);
} else {
@@ -1369,7 +1377,7 @@ int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
* with the LDLM_FL_BL_AST flag in a separate RPC from
* the one being generated now.
*/
- if (!(flags & LCF_BL_AST) && (rc == LDLM_FL_BL_AST)) {
+ if (!(cancel_flags & LCF_BL_AST) && (rc == LDLM_FL_BL_AST)) {
LDLM_DEBUG(lock, "Cancel lock separately");
list_move(&lock->l_bl_ast, &head);
bl_ast++;
@@ -1384,7 +1392,7 @@ int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
}
if (bl_ast > 0) {
count -= bl_ast;
- ldlm_cli_cancel_list(&head, bl_ast, NULL, 0);
+ ldlm_cli_cancel_list(&head, bl_ast, NULL, cancel_flags);
}
return count;
@@ -1887,11 +1895,11 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count,
ldlm_cancel_pack(req, cancels, count);
else
res = ldlm_cli_cancel_req(lock->l_conn_export,
- cancels, count,
+ NULL, cancels, count,
flags);
} else {
res = ldlm_cli_cancel_req(lock->l_conn_export,
- cancels, 1, flags);
+ NULL, cancels, 1, flags);
}
if (res < 0) {