@@ -731,7 +731,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
{
struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
struct ceph_mds_client *mdsc = fsc->mdsc;
- struct ceph_mds_request *req;
+ struct ceph_mds_request *req = NULL;
int op;
int mask;
int err;
@@ -765,6 +765,10 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
spin_unlock(&ci->i_ceph_lock);
}
+ mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
+ if (ceph_security_xattr_wanted(dir))
+ mask |= CEPH_CAP_XATTR_SHARED;
+
op = ceph_snap(dir) == CEPH_SNAPDIR ?
CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
@@ -772,12 +776,9 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
return ERR_CAST(req);
req->r_dentry = dget(dentry);
req->r_num_caps = 2;
-
- mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
- if (ceph_security_xattr_wanted(dir))
- mask |= CEPH_CAP_XATTR_SHARED;
+
req->r_args.getattr.mask = cpu_to_le32(mask);
-
+
req->r_parent = dir;
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
err = ceph_mdsc_do_request(mdsc, NULL, req);
@@ -1176,6 +1177,7 @@ static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags,
}
}
}
+ dout("dentry_lease_is_valid ttl = %ld, ceph_dentry.time = %ld, lease_renew_after = %ld, lease_renew_from = %ld, jiffies = %ld\n", ttl, di->time, di->lease_renew_after, di->lease_renew_from, jiffies);
}
spin_unlock(&dentry->d_lock);
@@ -1184,7 +1186,7 @@ static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags,
CEPH_MDS_LEASE_RENEW, seq);
ceph_put_mds_session(session);
}
- dout("dentry_lease_is_valid - dentry %p = %d\n", dentry, valid);
+ dout("dentry_lease_is_valid - di %p, dentry %p = %d\n", di, dentry, valid);
return valid;
}
@@ -1252,46 +1254,79 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
if (!valid) {
struct ceph_mds_client *mdsc =
ceph_sb_to_client(dir->i_sb)->mdsc;
- struct ceph_mds_request *req;
+ struct ceph_mds_request *req = NULL;
+ struct ceph_inode_info* cdir = ceph_inode(dir);
int op, err;
u32 mask;
if (flags & LOOKUP_RCU)
return -ECHILD;
+ mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
+ if (ceph_security_xattr_wanted(dir))
+ mask |= CEPH_CAP_XATTR_SHARED;
op = ceph_snap(dir) == CEPH_SNAPDIR ?
CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
- req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
- if (!IS_ERR(req)) {
- req->r_dentry = dget(dentry);
- req->r_num_caps = 2;
- req->r_parent = dir;
+ if (op == CEPH_MDS_OP_LOOKUP) {
+ mutex_lock(&cdir->lookups_inflight_lock);
+ dout("d_revalidate searching inode lookups inflight, %p, '%pd', inode %p offset %lld, mask: %d\n",
+ dentry, dentry, d_inode(dentry), ceph_dentry(dentry)->offset, mask);
+ req = __search_inode_getattr_or_lookup(&cdir->lookups_inflight, mask, true);
+ }
+ if (req && op == CEPH_MDS_OP_LOOKUP) {
+ dout("d_revalidate found previous lookup inflight, %p, '%pd', inode %p offset %lld, mask: %d, req jiffies: %ld\n",
+ dentry, dentry, d_inode(dentry), ceph_dentry(dentry)->offset, mask, req->r_started);
+ ceph_mdsc_get_request(req);
+ mutex_unlock(&cdir->lookups_inflight_lock);
+ err = ceph_mdsc_wait_for_request(req);
+ dout("d_revalidate waited previous lookup inflight, %p, '%pd', inode %p offset %lld, mask: %d, req jiffies: %ld, err: %d\n",
+ dentry, dentry, d_inode(dentry), ceph_dentry(dentry)->offset, mask, req->r_started, err);
+ } else {
- mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
- if (ceph_security_xattr_wanted(dir))
- mask |= CEPH_CAP_XATTR_SHARED;
- req->r_args.getattr.mask = cpu_to_le32(mask);
+ req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
+ if (op == CEPH_MDS_OP_LOOKUP) {
+ if (!IS_ERR(req)) {
+ req->r_dentry = dget(dentry);
+ req->r_num_caps = 2;
+ req->r_parent = dir;
+ req->r_args.getattr.mask = cpu_to_le32(mask);
+ __register_inode_getattr_or_lookup(cdir, req, true);
+ dout("d_revalidate no previous lookup inflight, just registered a new one, %p, '%pd', inode %p offset %lld, mask: %d, req jiffies: %ld\n",
+ dentry, dentry, d_inode(dentry), ceph_dentry(dentry)->offset, mask, req->r_started);
+ }
+ mutex_unlock(&cdir->lookups_inflight_lock);
+ }
+ if (IS_ERR(req))
+ goto out;
err = ceph_mdsc_do_request(mdsc, NULL, req);
- switch (err) {
- case 0:
- if (d_really_is_positive(dentry) &&
- d_inode(dentry) == req->r_target_inode)
- valid = 1;
- break;
- case -ENOENT:
- if (d_really_is_negative(dentry))
- valid = 1;
- /* Fallthrough */
- default:
- break;
+ if (op == CEPH_MDS_OP_LOOKUP) {
+ mutex_lock(&cdir->lookups_inflight_lock);
+ __unregister_inode_getattr_or_lookup(cdir, req, true);
+ dout("d_revalidate just unregistered one, %p, '%pd', inode %p offset %lld, mask: %d, req jiffies: %ld, err: %d\n",
+ dentry, dentry, d_inode(dentry), ceph_dentry(dentry)->offset, mask, req->r_started, err);
+ mutex_unlock(&cdir->lookups_inflight_lock);
}
- ceph_mdsc_put_request(req);
- dout("d_revalidate %p lookup result=%d\n",
- dentry, err);
}
+ switch (err) {
+ case 0:
+ if (d_really_is_positive(dentry) &&
+ d_inode(dentry) == req->r_target_inode)
+ valid = 1;
+ break;
+ case -ENOENT:
+ if (d_really_is_negative(dentry))
+ valid = 1;
+ /* Fallthrough */
+ default:
+ break;
+ }
+ ceph_mdsc_put_request(req);
+ dout("d_revalidate %p lookup result=%d\n",
+ dentry, err);
}
+out:
dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
if (valid) {
ceph_dentry_lru_touch(dentry);
@@ -430,6 +430,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
dout("alloc_inode %p\n", &ci->vfs_inode);
spin_lock_init(&ci->i_ceph_lock);
+ mutex_init(&ci->getattrs_inflight_lock);
+ mutex_init(&ci->lookups_inflight_lock);
ci->i_version = 0;
ci->i_inline_version = 0;
@@ -461,6 +463,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_xattrs.index_version = 0;
ci->i_caps = RB_ROOT;
+ ci->getattrs_inflight = RB_ROOT;
+ ci->lookups_inflight = RB_ROOT;
ci->i_auth_cap = NULL;
ci->i_dirty_caps = 0;
ci->i_flushing_caps = 0;
@@ -1047,9 +1051,10 @@ static void update_dentry_lease(struct dentry *dentry,
* Make sure dentry's inode matches tgt_vino. NULL tgt_vino means that
* we expect a negative dentry.
*/
+ dout("update_dentry_lease, d_inode: %p\n", dentry->d_inode);
if (!tgt_vino && d_really_is_positive(dentry))
return;
-
+ dout("update_dentry_lease, d_inode: %p\n", dentry->d_inode);
if (tgt_vino && (d_really_is_negative(dentry) ||
!ceph_ino_compare(d_inode(dentry), tgt_vino)))
return;
@@ -2194,6 +2199,7 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
struct ceph_mds_request *req;
int mode;
int err;
+ struct ceph_inode_info* cinode = ceph_inode(inode);
if (ceph_snap(inode) == CEPH_SNAPDIR) {
dout("do_getattr inode %p SNAPDIR\n", inode);
@@ -2205,16 +2211,36 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
return 0;
- mode = (mask & CEPH_STAT_RSTAT) ? USE_AUTH_MDS : USE_ANY_MDS;
- req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode);
- if (IS_ERR(req))
- return PTR_ERR(req);
- req->r_inode = inode;
- ihold(inode);
- req->r_num_caps = 1;
- req->r_args.getattr.mask = cpu_to_le32(mask);
- req->r_locked_page = locked_page;
- err = ceph_mdsc_do_request(mdsc, NULL, req);
+ mutex_lock(&cinode->getattrs_inflight_lock);
+ dout("__ceph_do_getattr searching inode getattrs inflight, inode %p, mask: %d\n", inode, mask);
+ req = __search_inode_getattr_or_lookup(&cinode->getattrs_inflight, mask, false);
+ if (req) {
+ dout("__ceph_do_getattr found previous inode getattr inflight, inode %p, mask: %d, req jiffies: %ld\n", inode, mask, req->r_started);
+ ceph_mdsc_get_request(req);
+ mutex_unlock(&cinode->getattrs_inflight_lock);
+ err = ceph_mdsc_wait_for_request(req);
+ dout("__ceph_do_getattr waited previous inode getattr inflight, inode %p, mask: %d, req jiffies: %ld, err: %d\n", inode, mask, req->r_started, err);
+ } else {
+ mode = (mask & CEPH_STAT_RSTAT) ? USE_AUTH_MDS : USE_ANY_MDS;
+ req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode);
+ if (!IS_ERR(req)) {
+ req->r_inode = inode;
+ ihold(inode);
+ req->r_num_caps = 1;
+ req->r_args.getattr.mask = cpu_to_le32(mask);
+ req->r_locked_page = locked_page;
+ __register_inode_getattr_or_lookup(cinode, req, false);
+ dout("__ceph_do_getattr no previous getattr inflight, inode %p, mask: %d, req jiffies: %ld\n", inode, mask, req->r_started);
+ }
+ mutex_unlock(&cinode->getattrs_inflight_lock);
+ if (IS_ERR(req))
+ return PTR_ERR(req);
+ err = ceph_mdsc_do_request(mdsc, NULL, req);
+ mutex_lock(&cinode->getattrs_inflight_lock);
+ __unregister_inode_getattr_or_lookup(cinode, req, false);
+ dout("__ceph_do_getattr just unregistered inode getattr inflight, inode %p, mask: %d, req jiffies: %ld, err: %d\n", inode, mask, req->r_started, err);
+ mutex_unlock(&cinode->getattrs_inflight_lock);
+ }
if (locked_page && err == 0) {
u64 inline_version = req->r_reply_info.targeti.inline_version;
if (inline_version == 0) {
@@ -1792,7 +1792,10 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
req->r_fmode = -1;
kref_init(&req->r_kref);
RB_CLEAR_NODE(&req->r_node);
+ RB_CLEAR_NODE(&req->getattr_node);
+ RB_CLEAR_NODE(&req->lookup_node);
INIT_LIST_HEAD(&req->r_wait);
+ init_completion(&req->batch_op_completion);
init_completion(&req->r_completion);
init_completion(&req->r_safe_completion);
INIT_LIST_HEAD(&req->r_unsafe_item);
@@ -2386,6 +2389,23 @@ void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
mutex_unlock(&mdsc->mutex);
}
+int ceph_mdsc_wait_for_request(struct ceph_mds_request* req)
+{
+ int err = 0;
+ long timeleft = wait_for_completion_killable_timeout(
+ &req->batch_op_completion,
+ ceph_timeout_jiffies(req->r_timeout));
+ if (timeleft > 0)
+ err = 0;
+ else if (!timeleft)
+ err = -EIO; /* timed out */
+ else
+ err = timeleft; /* killed */
+ if (!err)
+ return err;
+ return req->batch_op_err;
+}
+
/*
* Synchrously perform an mds request. Take care of all of the
* session setup, forwarding, retry details.
@@ -2458,7 +2478,8 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
} else {
err = req->r_err;
}
-
+ req->batch_op_err = err;
+ complete_all(&req->batch_op_completion);
out:
mutex_unlock(&mdsc->mutex);
dout("do_request %p done, result %d\n", req, err);
@@ -199,6 +199,7 @@ typedef int (*ceph_mds_request_wait_callback_t) (struct ceph_mds_client *mdsc,
struct ceph_mds_request {
u64 r_tid; /* transaction id */
struct rb_node r_node;
+ struct rb_node getattr_node, lookup_node;
struct ceph_mds_client *r_mdsc;
int r_op; /* mds op code */
@@ -250,7 +251,7 @@ struct ceph_mds_request {
struct ceph_msg *r_reply;
struct ceph_mds_reply_info_parsed r_reply_info;
struct page *r_locked_page;
- int r_err;
+ int r_err, batch_op_err;
unsigned long r_timeout; /* optional. jiffies, 0 is "wait forever" */
unsigned long r_started; /* start time to measure timeout against */
@@ -273,6 +274,7 @@ struct ceph_mds_request {
struct kref r_kref;
struct list_head r_wait;
+ struct completion batch_op_completion;
struct completion r_completion;
struct completion r_safe_completion;
ceph_mds_request_callback_t r_callback;
@@ -411,6 +413,7 @@ extern struct ceph_mds_request *
ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode);
extern void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
struct ceph_mds_request *req);
+extern int ceph_mdsc_wait_for_request(struct ceph_mds_request* req);
extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
struct inode *dir,
struct ceph_mds_request *req);
@@ -1158,6 +1158,74 @@ static void __exit exit_ceph(void)
destroy_caches();
}
+void __unregister_inode_getattr_or_lookup(struct ceph_inode_info* ci,
+ struct ceph_mds_request* req,
+ bool is_lookup)
+{
+ if (!is_lookup)
+ rb_erase(&req->getattr_node, &ci->getattrs_inflight);
+ else
+ rb_erase(&req->lookup_node, &ci->lookups_inflight);
+}
+
+void __register_inode_getattr_or_lookup(struct ceph_inode_info* ci,
+ struct ceph_mds_request* req,
+ bool is_lookup)
+{
+ struct rb_node **p = NULL, *parent = NULL;
+ struct ceph_mds_request *tmp = NULL;
+
+ if (!is_lookup)
+ p = &ci->getattrs_inflight.rb_node;
+ else
+ p = &ci->lookups_inflight.rb_node;
+
+ while (*p) {
+ parent = *p;
+ if (!is_lookup)
+ tmp = rb_entry(parent, struct ceph_mds_request, getattr_node);
+ else
+ tmp = rb_entry(parent, struct ceph_mds_request, lookup_node);
+ if (req->r_args.getattr.mask < tmp->r_args.getattr.mask)
+ p = &(*p)->rb_left;
+ else if (req->r_args.getattr.mask > tmp->r_args.getattr.mask)
+ p = &(*p)->rb_right;
+ else
+ BUG();
+ }
+
+ if (!is_lookup) {
+ rb_link_node(&req->getattr_node, parent, p);
+ rb_insert_color(&req->getattr_node, &ci->getattrs_inflight);
+ } else {
+ rb_link_node(&req->lookup_node, parent, p);
+ rb_insert_color(&req->lookup_node, &ci->getattrs_inflight);
+ }
+}
+
+struct ceph_mds_request* __search_inode_getattr_or_lookup(struct rb_root* root,
+ int mask,
+ bool is_lookup)
+{
+ struct rb_node *node = root->rb_node; /* top of the tree */
+
+ while (node)
+ {
+ struct ceph_mds_request* tmp = NULL;
+ if (!is_lookup)
+ tmp = rb_entry(node, struct ceph_mds_request, getattr_node);
+ else
+ tmp = rb_entry(node, struct ceph_mds_request, lookup_node);
+
+ if (tmp->r_args.getattr.mask > mask)
+ node = node->rb_left;
+ else if (tmp->r_args.getattr.mask < mask)
+ node = node->rb_right;
+ else
+ return tmp; /* Found it */
+ }
+ return NULL;
+}
module_init(init_ceph);
module_exit(exit_ceph);
@@ -292,6 +292,8 @@ struct ceph_inode_info {
struct ceph_vino i_vino; /* ceph ino + snap */
spinlock_t i_ceph_lock;
+ struct mutex getattrs_inflight_lock, lookups_inflight_lock;
+ struct rb_root getattrs_inflight, lookups_inflight;
u64 i_version;
u64 i_inline_version;
@@ -859,6 +861,17 @@ extern int ceph_fill_file_size(struct inode *inode, int issued,
extern void ceph_fill_file_time(struct inode *inode, int issued,
u64 time_warp_seq, struct timespec *ctime,
struct timespec *mtime, struct timespec *atime);
+extern void __register_inode_getattr_or_lookup(struct ceph_inode_info* ci,
+ struct ceph_mds_request* req,
+ bool is_lookup);
+
+extern void __unregister_inode_getattr_or_lookup(struct ceph_inode_info* ci,
+ struct ceph_mds_request* req,
+ bool is_lookup);
+
+extern struct ceph_mds_request* __search_inode_getattr_or_lookup(struct rb_root* root,
+ int mask,
+ bool is_lookup);
extern int ceph_fill_trace(struct super_block *sb,
struct ceph_mds_request *req);
extern int ceph_readdir_prepopulate(struct ceph_mds_request *req,