@@ -991,7 +991,11 @@ int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
int __ceph_caps_wanted(struct ceph_inode_info *ci)
{
int w = __ceph_caps_file_wanted(ci) | __ceph_caps_used(ci);
- if (!S_ISDIR(ci->vfs_inode.i_mode)) {
+ if (S_ISDIR(ci->vfs_inode.i_mode)) {
+ /* we want EXCL if holding caps of dir ops */
+ if (w & CEPH_CAP_ANY_DIR_OPS)
+ w |= CEPH_CAP_FILE_EXCL;
+ } else {
/* we want EXCL if dirty data */
if (w & CEPH_CAP_FILE_BUFFER)
w |= CEPH_CAP_FILE_EXCL;
@@ -1886,10 +1890,13 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
* revoking the shared cap on every create/unlink
* operation.
*/
- if (IS_RDONLY(inode))
+ if (IS_RDONLY(inode)) {
want = CEPH_CAP_ANY_SHARED;
- else
- want = CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
+ } else {
+ want = CEPH_CAP_ANY_SHARED |
+ CEPH_CAP_FILE_EXCL |
+ CEPH_CAP_ANY_DIR_OPS;
+ }
retain |= want;
} else {
@@ -2652,7 +2659,10 @@ static int try_get_cap_refs(struct inode *inode, int need, int want,
}
snap_rwsem_locked = true;
}
- *got = need | (have & want);
+ if ((have & want) == want)
+ *got = need | want;
+ else
+ *got = need;
if (S_ISREG(inode->i_mode) &&
(need & CEPH_CAP_FILE_RD) &&
!(*got & CEPH_CAP_FILE_CACHE))
@@ -2742,13 +2752,16 @@ int ceph_try_get_caps(struct inode *inode, int need, int want,
int ret;
BUG_ON(need & ~CEPH_CAP_FILE_RD);
- BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO|CEPH_CAP_FILE_SHARED));
- ret = ceph_pool_perm_check(inode, need);
- if (ret < 0)
- return ret;
+ if (need) {
+ ret = ceph_pool_perm_check(inode, need);
+ if (ret < 0)
+ return ret;
+ }
- ret = try_get_cap_refs(inode, need, want, 0,
- (nonblock ? NON_BLOCKING : 0), got);
+ BUG_ON(want & ~(CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO |
+ CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL |
+ CEPH_CAP_ANY_DIR_OPS));
+ ret = try_get_cap_refs(inode, need, want, 0, nonblock, got);
return ret == -EAGAIN ? 0 : ret;
}
@@ -1068,6 +1068,47 @@ int ceph_async_dirop_request_wait(struct inode *inode)
return ret;
}
+static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req)
+{
+ /* If op failed, set error on parent directory */
+ mapping_set_error(req->r_parent->i_mapping, req->r_err);
+ if (req->r_err)
+ printk("%s: req->r_err = %d\n", __func__, req->r_err);
+ ceph_put_cap_refs(ceph_inode(req->r_parent),
+ CEPH_CAP_FILE_EXCL | CEPH_CAP_DIR_UNLINK);
+ iput(req->r_old_inode);
+}
+
+static bool get_caps_for_async_unlink(struct inode *dir, struct dentry *dentry)
+{
+ struct ceph_inode_info *ci = ceph_inode(dir);
+ struct ceph_dentry_info *di;
+ int ret, want, got;
+
+ want = CEPH_CAP_FILE_EXCL | CEPH_CAP_DIR_UNLINK;
+ ret = ceph_try_get_caps(dir, 0, want, true, &got);
+ dout("Fx on %p ret=%d got=%d\n", dir, ret, got);
+ if (ret != 1 || got != want)
+ return false;
+
+ spin_lock(&dentry->d_lock);
+ di = ceph_dentry(dentry);
+ /* - We are holding CEPH_CAP_FILE_EXCL, which implies
+ * CEPH_CAP_FILE_SHARED.
+ * - Only support async unlink for primary linkage */
+ if (atomic_read(&ci->i_shared_gen) != di->lease_shared_gen ||
+ !(di->flags & CEPH_DENTRY_PRIMARY_LINK))
+ ret = 0;
+ spin_unlock(&dentry->d_lock);
+
+ if (!ret) {
+ ceph_put_cap_refs(ci, got);
+ return false;
+ }
+ return true;
+}
+
/*
* rmdir and unlink are differ only by the metadata op code
*/
@@ -1105,13 +1146,33 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
req->r_dentry = dget(dentry);
req->r_num_caps = 2;
req->r_parent = dir;
- set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
req->r_inode_drop = ceph_drop_caps_for_unlink(inode);
- err = ceph_mdsc_do_request(mdsc, dir, req);
- if (!err && !req->r_reply_info.head->is_dentry)
- d_delete(dentry);
+
+ if (op == CEPH_MDS_OP_UNLINK &&
+ get_caps_for_async_unlink(dir, dentry)) {
+ dout("ceph: Async unlink on %lu/%.*s", dir->i_ino,
+ dentry->d_name.len, dentry->d_name.name);
+ req->r_callback = ceph_async_unlink_cb;
+ req->r_old_inode = d_inode(dentry);
+ ihold(req->r_old_inode);
+ err = ceph_mdsc_submit_request(mdsc, dir, req);
+ if (!err) {
+ /*
+ * We have enough caps, so we assume that the unlink
+ * will succeed. Fix up the target inode and dcache.
+ */
+ drop_nlink(inode);
+ d_delete(dentry);
+ }
+ } else {
+ set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
+ err = ceph_mdsc_do_request(mdsc, dir, req);
+ if (!err && !req->r_reply_info.head->is_dentry)
+ d_delete(dentry);
+ }
+
ceph_mdsc_put_request(req);
out:
return err;
@@ -1455,6 +1516,7 @@ void ceph_invalidate_dentry_lease(struct dentry *dentry)
spin_lock(&dentry->d_lock);
di->time = jiffies;
di->lease_shared_gen = 0;
+ di->flags &= ~CEPH_DENTRY_PRIMARY_LINK;
__dentry_lease_unlist(di);
spin_unlock(&dentry->d_lock);
}
@@ -1047,6 +1047,7 @@ static void __update_dentry_lease(struct inode *dir, struct dentry *dentry,
struct ceph_mds_session **old_lease_session)
{
struct ceph_dentry_info *di = ceph_dentry(dentry);
+ unsigned mask = le16_to_cpu(lease->mask);
long unsigned duration = le32_to_cpu(lease->duration_ms);
long unsigned ttl = from_time + (duration * HZ) / 1000;
long unsigned half_ttl = from_time + (duration * HZ / 2) / 1000;
@@ -1058,8 +1059,13 @@ static void __update_dentry_lease(struct inode *dir, struct dentry *dentry,
if (ceph_snap(dir) != CEPH_NOSNAP)
return;
+ if (mask & CEPH_LEASE_PRIMARY_LINK)
+ di->flags |= CEPH_DENTRY_PRIMARY_LINK;
+ else
+ di->flags &= ~CEPH_DENTRY_PRIMARY_LINK;
+
di->lease_shared_gen = atomic_read(&ceph_inode(dir)->i_shared_gen);
- if (duration == 0) {
+ if (!(mask & CEPH_LEASE_VALID)) {
__ceph_dentry_dir_lease_touch(di);
return;
}
@@ -282,6 +282,7 @@ struct ceph_dentry_info {
#define CEPH_DENTRY_REFERENCED 1
#define CEPH_DENTRY_LEASE_LIST 2
#define CEPH_DENTRY_SHRINK_LIST 4
+#define CEPH_DENTRY_PRIMARY_LINK 8
struct ceph_inode_xattrs_info {
/*
@@ -530,6 +530,9 @@ struct ceph_mds_reply_lease {
__le32 seq;
} __attribute__ ((packed));
+#define CEPH_LEASE_VALID (1 | 2) /* old and new bit values */
+#define CEPH_LEASE_PRIMARY_LINK 4 /* primary linkage */
+
struct ceph_mds_reply_dirfrag {
__le32 frag; /* fragment */
__le32 auth; /* auth mds, if this is a delegation point */
@@ -659,6 +662,12 @@ int ceph_flags_to_mode(int flags);
#define CEPH_CAP_LOCKS (CEPH_LOCK_IFILE | CEPH_LOCK_IAUTH | CEPH_LOCK_ILINK | \
CEPH_LOCK_IXATTR)
+/* cap masks async dir operations */
+#define CEPH_CAP_DIR_CREATE CEPH_CAP_FILE_CACHE
+#define CEPH_CAP_DIR_UNLINK CEPH_CAP_FILE_RD
+#define CEPH_CAP_ANY_DIR_OPS (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_RD | \
+ CEPH_CAP_FILE_WREXTEND | CEPH_CAP_FILE_LAZYIO)
+
int ceph_caps_for_mode(int mode);
enum {