[2/2] ceph: send cap releases more aggressively
diff mbox series

Message ID 20190115080305.37000-2-zyan@redhat.com
State New
Headers show
Series
  • [1/2] ceph: add mount option to limit caps count
Related show

Commit Message

Yan, Zheng Jan. 15, 2019, 8:03 a.m. UTC
When pending cap releases fill up one message, start a work to send
cap release message. (old way is sending cap releases every 5 seconds)

Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
---
 fs/ceph/caps.c       | 29 +++++++++-------------
 fs/ceph/inode.c      |  2 +-
 fs/ceph/mds_client.c | 58 +++++++++++++++++++++++++++++++++++++-------
 fs/ceph/mds_client.h | 10 +++++---
 fs/ceph/super.c      |  9 ++++++-
 fs/ceph/super.h      |  6 +++--
 6 files changed, 80 insertions(+), 34 deletions(-)

Comments

Jeff Layton Jan. 15, 2019, 2:56 p.m. UTC | #1
On Tue, 2019-01-15 at 16:03 +0800, Yan, Zheng wrote:
> When pending cap releases fill up one message, start a work to send
> cap release message. (old way is sending cap releases every 5 seconds)
> 
> Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
> ---
>  fs/ceph/caps.c       | 29 +++++++++-------------
>  fs/ceph/inode.c      |  2 +-
>  fs/ceph/mds_client.c | 58 +++++++++++++++++++++++++++++++++++++-------
>  fs/ceph/mds_client.h | 10 +++++---
>  fs/ceph/super.c      |  9 ++++++-
>  fs/ceph/super.h      |  6 +++--
>  6 files changed, 80 insertions(+), 34 deletions(-)
> 
> diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
> index ef57491157fc..0f69b97205d4 100644
> --- a/fs/ceph/caps.c
> +++ b/fs/ceph/caps.c
> @@ -1113,9 +1113,7 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
>  	    (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
>  		cap->queue_release = 1;
>  		if (removed) {
> -			list_add_tail(&cap->session_caps,
> -				      &session->s_cap_releases);
> -			session->s_num_cap_releases++;
> +			__ceph_queue_cap_release(session, cap);
>  			removed = 0;
>  		}
>  	} else {
> @@ -1277,7 +1275,7 @@ static int send_cap_msg(struct cap_msg_args *arg)
>   * Queue cap releases when an inode is dropped from our cache.  Since
>   * inode is about to be destroyed, there is no need for i_ceph_lock.
>   */
> -void ceph_queue_caps_release(struct inode *inode)
> +void __ceph_remove_caps(struct inode *inode)
>  {
>  	struct ceph_inode_info *ci = ceph_inode(inode);
>  	struct rb_node *p;
> @@ -3918,12 +3916,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
>  			cap->seq = seq;
>  			cap->issue_seq = seq;
>  			spin_lock(&session->s_cap_lock);
> -			list_add_tail(&cap->session_caps,
> -					&session->s_cap_releases);
> -			session->s_num_cap_releases++;
> +			__ceph_queue_cap_release(session, cap);
>  			spin_unlock(&session->s_cap_lock);
>  		}
> -		goto flush_cap_releases;
> +		goto done;
>  	}
>  
>  	/* these will work even if we don't have a cap yet */
> @@ -3993,7 +3989,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
>  		       ceph_cap_op_name(op));
>  	}
>  
> -	goto done;
> +done:
> +	mutex_unlock(&session->s_mutex);
> +done_unlocked:
> +	iput(inode);
> +	ceph_put_string(extra_info.pool_ns);
> +	return;
>  
>  flush_cap_releases:
>  	/*
> @@ -4001,14 +4002,8 @@ void ceph_handle_caps(struct ceph_mds_session *session,
>  	 * along for the mds (who clearly thinks we still have this
>  	 * cap).
>  	 */
> -	ceph_send_cap_releases(mdsc, session);
> -
> -done:
> -	mutex_unlock(&session->s_mutex);
> -done_unlocked:
> -	iput(inode);
> -	ceph_put_string(extra_info.pool_ns);
> -	return;
> +	ceph_flush_cap_releases(mdsc, session);
> +	goto done;
>  
>  bad:
>  	pr_err("ceph_handle_caps: corrupt message\n");
> diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
> index e6012de58aae..f588b2d7b598 100644
> --- a/fs/ceph/inode.c
> +++ b/fs/ceph/inode.c
> @@ -537,7 +537,7 @@ void ceph_destroy_inode(struct inode *inode)
>  
>  	ceph_fscache_unregister_inode_cookie(ci);
>  
> -	ceph_queue_caps_release(inode);
> +	__ceph_remove_caps(inode);
>  
>  	if (__ceph_has_any_quota(ci))
>  		ceph_adjust_quota_realms_count(inode, false);
> diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
> index c9f1b1e8fa03..15df7108a68f 100644
> --- a/fs/ceph/mds_client.c
> +++ b/fs/ceph/mds_client.c
> @@ -57,6 +57,7 @@ struct ceph_reconnect_state {
>  
>  static void __wake_requests(struct ceph_mds_client *mdsc,
>  			    struct list_head *head);
> +static void ceph_cap_release_work(struct work_struct *work);
>  
>  static const struct ceph_connection_operations mds_con_ops;
>  
> @@ -634,6 +635,8 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
>  	s->s_cap_reconnect = 0;
>  	s->s_cap_iterator = NULL;
>  	INIT_LIST_HEAD(&s->s_cap_releases);
> +	INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
> +
>  	INIT_LIST_HEAD(&s->s_cap_flushing);
>  
>  	mdsc->sessions[mds] = s;
> @@ -659,6 +662,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc,
>  	dout("__unregister_session mds%d %p\n", s->s_mds, s);
>  	BUG_ON(mdsc->sessions[s->s_mds] != s);
>  	mdsc->sessions[s->s_mds] = NULL;
> +	s->s_state = 0;
>  	ceph_con_close(&s->s_con);
>  	ceph_put_mds_session(s);
>  	atomic_dec(&mdsc->num_sessions);
> @@ -1321,13 +1325,10 @@ static int iterate_session_caps(struct ceph_mds_session *session,
>  			cap->session = NULL;
>  			list_del_init(&cap->session_caps);
>  			session->s_nr_caps--;
> -			if (cap->queue_release) {
> -				list_add_tail(&cap->session_caps,
> -					      &session->s_cap_releases);
> -				session->s_num_cap_releases++;
> -			} else {
> +			if (cap->queue_release)
> +				__ceph_queue_cap_release(session, cap);
> +			else
>  				old_cap = cap;  /* put_cap it w/o locks held */
> -			}
>  		}
>  		if (ret < 0)
>  			goto out;
> @@ -1762,7 +1763,7 @@ int ceph_trim_caps(struct ceph_mds_client *mdsc,
>  		session->s_trim_caps = 0;
>  	}
>  
> -	ceph_send_cap_releases(mdsc, session);
> +	ceph_flush_cap_releases(mdsc, session);
>  	return 0;
>  }
>  
> @@ -1805,8 +1806,8 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc,
>  /*
>   * called under s_mutex
>   */
> -void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
> -			    struct ceph_mds_session *session)
> +static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
> +				   struct ceph_mds_session *session)
>  {
>  	struct ceph_msg *msg = NULL;
>  	struct ceph_mds_cap_release *head;
> @@ -1898,6 +1899,45 @@ void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
>  	spin_unlock(&session->s_cap_lock);
>  }
>  
> +static void ceph_cap_release_work(struct work_struct *work)
> +{
> +	struct ceph_mds_session *session =
> +		container_of(work, struct ceph_mds_session, s_cap_release_work);
> +
> +	mutex_lock(&session->s_mutex);
> +	if (session->s_state == CEPH_MDS_SESSION_OPEN ||
> +	    session->s_state == CEPH_MDS_SESSION_HUNG)
> +		ceph_send_cap_releases(session->s_mdsc, session);
> +	mutex_unlock(&session->s_mutex);
> +	ceph_put_mds_session(session);
> +}
> +
> +void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
> +		             struct ceph_mds_session *session)
> +{
> +	get_session(session);
> +	if (queue_work(mdsc->fsc->cap_release_wq,
> +		       &session->s_cap_release_work)) {
> +		dout("cap release work queued\n");
> +	} else {
> +		ceph_put_mds_session(session);
> +		dout("failed to queue cap release work\n");
> +	}
> +}
> +
> +/*
> + * caller holds session->s_cap_lock
> + */
> +void __ceph_queue_cap_release(struct ceph_mds_session *session,
> +			      struct ceph_cap *cap)
> +{
> +	list_add_tail(&cap->session_caps, &session->s_cap_releases);
> +	session->s_num_cap_releases++;
> +
> +	if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
> +		ceph_flush_cap_releases(session->s_mdsc, session);
> +}
> +
>  /*
>   * requests
>   */
> diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
> index 94fe2312c092..a6052fb79733 100644
> --- a/fs/ceph/mds_client.h
> +++ b/fs/ceph/mds_client.h
> @@ -172,12 +172,13 @@ struct ceph_mds_session {
>  	/* protected by s_cap_lock */
>  	spinlock_t        s_cap_lock;
>  	struct list_head  s_caps;     /* all caps issued by this session */
> +	struct ceph_cap  *s_cap_iterator;
>  	int               s_nr_caps, s_trim_caps;
>  	int               s_num_cap_releases;
>  	int		  s_cap_reconnect;
>  	int		  s_readonly;
>  	struct list_head  s_cap_releases; /* waiting cap_release messages */
> -	struct ceph_cap  *s_cap_iterator;
> +	struct work_struct s_cap_release_work;
>  
>  	/* protected by mutex */
>  	struct list_head  s_cap_flushing;     /* inodes w/ flushing caps */
> @@ -458,9 +459,10 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req)
>  	kref_put(&req->r_kref, ceph_mdsc_release_request);
>  }
>  
> -extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
> -				   struct ceph_mds_session *session);
> -
> +extern void __ceph_queue_cap_release(struct ceph_mds_session *session,
> +				    struct ceph_cap *cap);
> +extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
> +				    struct ceph_mds_session *session);
>  extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
>  
>  extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
> diff --git a/fs/ceph/super.c b/fs/ceph/super.c
> index 93404e3c89db..0e85dbd9ac8d 100644
> --- a/fs/ceph/super.c
> +++ b/fs/ceph/super.c
> @@ -680,6 +680,9 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
>  	fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1);
>  	if (!fsc->trunc_wq)
>  		goto fail_pg_inv_wq;
> +	fsc->cap_release_wq = alloc_workqueue("ceph-cap-release", 0, 1);
> +	if (!fsc->cap_release_wq)
> +		goto fail_trunc_wq;
>  
>  	/* set up mempools */
>  	err = -ENOMEM;
> @@ -687,10 +690,12 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
>  	size = sizeof (struct page *) * (page_count ? page_count : 1);
>  	fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10, size);
>  	if (!fsc->wb_pagevec_pool)
> -		goto fail_trunc_wq;
> +		goto fail_cap_release_wq;
>  
>  	return fsc;
>  
> +fail_cap_release_wq:
> +	destroy_workqueue(fsc->cap_release_wq);
>  fail_trunc_wq:
>  	destroy_workqueue(fsc->trunc_wq);
>  fail_pg_inv_wq:
> @@ -712,6 +717,7 @@ static void flush_fs_workqueues(struct ceph_fs_client *fsc)
>  	flush_workqueue(fsc->wb_wq);
>  	flush_workqueue(fsc->pg_inv_wq);
>  	flush_workqueue(fsc->trunc_wq);
> +	flush_workqueue(fsc->cap_release_wq);
>  }
>  
>  static void destroy_fs_client(struct ceph_fs_client *fsc)
> @@ -721,6 +727,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
>  	destroy_workqueue(fsc->wb_wq);
>  	destroy_workqueue(fsc->pg_inv_wq);
>  	destroy_workqueue(fsc->trunc_wq);
> +	destroy_workqueue(fsc->cap_release_wq);
>  
>  	mempool_destroy(fsc->wb_pagevec_pool);
>  
> diff --git a/fs/ceph/super.h b/fs/ceph/super.h
> index 631b46e824a8..3eab95c02cec 100644
> --- a/fs/ceph/super.h
> +++ b/fs/ceph/super.h
> @@ -107,10 +107,12 @@ struct ceph_fs_client {
>  
>  	/* writeback */
>  	mempool_t *wb_pagevec_pool;
> +	atomic_long_t writeback_count;
> +
>  	struct workqueue_struct *wb_wq;
>  	struct workqueue_struct *pg_inv_wq;
>  	struct workqueue_struct *trunc_wq;
> -	atomic_long_t writeback_count;
> +	struct workqueue_struct *cap_release_wq;
>  
>  #ifdef CONFIG_DEBUG_FS
>  	struct dentry *debugfs_dentry_lru, *debugfs_caps;
> @@ -989,11 +991,11 @@ extern void ceph_add_cap(struct inode *inode,
>  			 unsigned cap, unsigned seq, u64 realmino, int flags,
>  			 struct ceph_cap **new_cap);
>  extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
> +extern void __ceph_remove_caps(struct inode* inode);
>  extern void ceph_put_cap(struct ceph_mds_client *mdsc,
>  			 struct ceph_cap *cap);
>  extern int ceph_is_any_caps(struct inode *inode);
>  
> -extern void ceph_queue_caps_release(struct inode *inode);
>  extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);
>  extern int ceph_fsync(struct file *file, loff_t start, loff_t end,
>  		      int datasync);

Yes. Sorely needed.

Reviewed-by: Jeff Layton <jlayton@redhat.com>

Patch
diff mbox series

diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index ef57491157fc..0f69b97205d4 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1113,9 +1113,7 @@  void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
 	    (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
 		cap->queue_release = 1;
 		if (removed) {
-			list_add_tail(&cap->session_caps,
-				      &session->s_cap_releases);
-			session->s_num_cap_releases++;
+			__ceph_queue_cap_release(session, cap);
 			removed = 0;
 		}
 	} else {
@@ -1277,7 +1275,7 @@  static int send_cap_msg(struct cap_msg_args *arg)
  * Queue cap releases when an inode is dropped from our cache.  Since
  * inode is about to be destroyed, there is no need for i_ceph_lock.
  */
-void ceph_queue_caps_release(struct inode *inode)
+void __ceph_remove_caps(struct inode *inode)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct rb_node *p;
@@ -3918,12 +3916,10 @@  void ceph_handle_caps(struct ceph_mds_session *session,
 			cap->seq = seq;
 			cap->issue_seq = seq;
 			spin_lock(&session->s_cap_lock);
-			list_add_tail(&cap->session_caps,
-					&session->s_cap_releases);
-			session->s_num_cap_releases++;
+			__ceph_queue_cap_release(session, cap);
 			spin_unlock(&session->s_cap_lock);
 		}
-		goto flush_cap_releases;
+		goto done;
 	}
 
 	/* these will work even if we don't have a cap yet */
@@ -3993,7 +3989,12 @@  void ceph_handle_caps(struct ceph_mds_session *session,
 		       ceph_cap_op_name(op));
 	}
 
-	goto done;
+done:
+	mutex_unlock(&session->s_mutex);
+done_unlocked:
+	iput(inode);
+	ceph_put_string(extra_info.pool_ns);
+	return;
 
 flush_cap_releases:
 	/*
@@ -4001,14 +4002,8 @@  void ceph_handle_caps(struct ceph_mds_session *session,
 	 * along for the mds (who clearly thinks we still have this
 	 * cap).
 	 */
-	ceph_send_cap_releases(mdsc, session);
-
-done:
-	mutex_unlock(&session->s_mutex);
-done_unlocked:
-	iput(inode);
-	ceph_put_string(extra_info.pool_ns);
-	return;
+	ceph_flush_cap_releases(mdsc, session);
+	goto done;
 
 bad:
 	pr_err("ceph_handle_caps: corrupt message\n");
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index e6012de58aae..f588b2d7b598 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -537,7 +537,7 @@  void ceph_destroy_inode(struct inode *inode)
 
 	ceph_fscache_unregister_inode_cookie(ci);
 
-	ceph_queue_caps_release(inode);
+	__ceph_remove_caps(inode);
 
 	if (__ceph_has_any_quota(ci))
 		ceph_adjust_quota_realms_count(inode, false);
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index c9f1b1e8fa03..15df7108a68f 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -57,6 +57,7 @@  struct ceph_reconnect_state {
 
 static void __wake_requests(struct ceph_mds_client *mdsc,
 			    struct list_head *head);
+static void ceph_cap_release_work(struct work_struct *work);
 
 static const struct ceph_connection_operations mds_con_ops;
 
@@ -634,6 +635,8 @@  static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 	s->s_cap_reconnect = 0;
 	s->s_cap_iterator = NULL;
 	INIT_LIST_HEAD(&s->s_cap_releases);
+	INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
+
 	INIT_LIST_HEAD(&s->s_cap_flushing);
 
 	mdsc->sessions[mds] = s;
@@ -659,6 +662,7 @@  static void __unregister_session(struct ceph_mds_client *mdsc,
 	dout("__unregister_session mds%d %p\n", s->s_mds, s);
 	BUG_ON(mdsc->sessions[s->s_mds] != s);
 	mdsc->sessions[s->s_mds] = NULL;
+	s->s_state = 0;
 	ceph_con_close(&s->s_con);
 	ceph_put_mds_session(s);
 	atomic_dec(&mdsc->num_sessions);
@@ -1321,13 +1325,10 @@  static int iterate_session_caps(struct ceph_mds_session *session,
 			cap->session = NULL;
 			list_del_init(&cap->session_caps);
 			session->s_nr_caps--;
-			if (cap->queue_release) {
-				list_add_tail(&cap->session_caps,
-					      &session->s_cap_releases);
-				session->s_num_cap_releases++;
-			} else {
+			if (cap->queue_release)
+				__ceph_queue_cap_release(session, cap);
+			else
 				old_cap = cap;  /* put_cap it w/o locks held */
-			}
 		}
 		if (ret < 0)
 			goto out;
@@ -1762,7 +1763,7 @@  int ceph_trim_caps(struct ceph_mds_client *mdsc,
 		session->s_trim_caps = 0;
 	}
 
-	ceph_send_cap_releases(mdsc, session);
+	ceph_flush_cap_releases(mdsc, session);
 	return 0;
 }
 
@@ -1805,8 +1806,8 @@  static void wait_caps_flush(struct ceph_mds_client *mdsc,
 /*
  * called under s_mutex
  */
-void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
-			    struct ceph_mds_session *session)
+static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
+				   struct ceph_mds_session *session)
 {
 	struct ceph_msg *msg = NULL;
 	struct ceph_mds_cap_release *head;
@@ -1898,6 +1899,45 @@  void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
 	spin_unlock(&session->s_cap_lock);
 }
 
+static void ceph_cap_release_work(struct work_struct *work)
+{
+	struct ceph_mds_session *session =
+		container_of(work, struct ceph_mds_session, s_cap_release_work);
+
+	mutex_lock(&session->s_mutex);
+	if (session->s_state == CEPH_MDS_SESSION_OPEN ||
+	    session->s_state == CEPH_MDS_SESSION_HUNG)
+		ceph_send_cap_releases(session->s_mdsc, session);
+	mutex_unlock(&session->s_mutex);
+	ceph_put_mds_session(session);
+}
+
+void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
+		             struct ceph_mds_session *session)
+{
+	get_session(session);
+	if (queue_work(mdsc->fsc->cap_release_wq,
+		       &session->s_cap_release_work)) {
+		dout("cap release work queued\n");
+	} else {
+		ceph_put_mds_session(session);
+		dout("failed to queue cap release work\n");
+	}
+}
+
+/*
+ * caller holds session->s_cap_lock
+ */
+void __ceph_queue_cap_release(struct ceph_mds_session *session,
+			      struct ceph_cap *cap)
+{
+	list_add_tail(&cap->session_caps, &session->s_cap_releases);
+	session->s_num_cap_releases++;
+
+	if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
+		ceph_flush_cap_releases(session->s_mdsc, session);
+}
+
 /*
  * requests
  */
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 94fe2312c092..a6052fb79733 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -172,12 +172,13 @@  struct ceph_mds_session {
 	/* protected by s_cap_lock */
 	spinlock_t        s_cap_lock;
 	struct list_head  s_caps;     /* all caps issued by this session */
+	struct ceph_cap  *s_cap_iterator;
 	int               s_nr_caps, s_trim_caps;
 	int               s_num_cap_releases;
 	int		  s_cap_reconnect;
 	int		  s_readonly;
 	struct list_head  s_cap_releases; /* waiting cap_release messages */
-	struct ceph_cap  *s_cap_iterator;
+	struct work_struct s_cap_release_work;
 
 	/* protected by mutex */
 	struct list_head  s_cap_flushing;     /* inodes w/ flushing caps */
@@ -458,9 +459,10 @@  static inline void ceph_mdsc_put_request(struct ceph_mds_request *req)
 	kref_put(&req->r_kref, ceph_mdsc_release_request);
 }
 
-extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
-				   struct ceph_mds_session *session);
-
+extern void __ceph_queue_cap_release(struct ceph_mds_session *session,
+				    struct ceph_cap *cap);
+extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
+				    struct ceph_mds_session *session);
 extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
 
 extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 93404e3c89db..0e85dbd9ac8d 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -680,6 +680,9 @@  static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
 	fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1);
 	if (!fsc->trunc_wq)
 		goto fail_pg_inv_wq;
+	fsc->cap_release_wq = alloc_workqueue("ceph-cap-release", 0, 1);
+	if (!fsc->cap_release_wq)
+		goto fail_trunc_wq;
 
 	/* set up mempools */
 	err = -ENOMEM;
@@ -687,10 +690,12 @@  static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
 	size = sizeof (struct page *) * (page_count ? page_count : 1);
 	fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10, size);
 	if (!fsc->wb_pagevec_pool)
-		goto fail_trunc_wq;
+		goto fail_cap_release_wq;
 
 	return fsc;
 
+fail_cap_release_wq:
+	destroy_workqueue(fsc->cap_release_wq);
 fail_trunc_wq:
 	destroy_workqueue(fsc->trunc_wq);
 fail_pg_inv_wq:
@@ -712,6 +717,7 @@  static void flush_fs_workqueues(struct ceph_fs_client *fsc)
 	flush_workqueue(fsc->wb_wq);
 	flush_workqueue(fsc->pg_inv_wq);
 	flush_workqueue(fsc->trunc_wq);
+	flush_workqueue(fsc->cap_release_wq);
 }
 
 static void destroy_fs_client(struct ceph_fs_client *fsc)
@@ -721,6 +727,7 @@  static void destroy_fs_client(struct ceph_fs_client *fsc)
 	destroy_workqueue(fsc->wb_wq);
 	destroy_workqueue(fsc->pg_inv_wq);
 	destroy_workqueue(fsc->trunc_wq);
+	destroy_workqueue(fsc->cap_release_wq);
 
 	mempool_destroy(fsc->wb_pagevec_pool);
 
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 631b46e824a8..3eab95c02cec 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -107,10 +107,12 @@  struct ceph_fs_client {
 
 	/* writeback */
 	mempool_t *wb_pagevec_pool;
+	atomic_long_t writeback_count;
+
 	struct workqueue_struct *wb_wq;
 	struct workqueue_struct *pg_inv_wq;
 	struct workqueue_struct *trunc_wq;
-	atomic_long_t writeback_count;
+	struct workqueue_struct *cap_release_wq;
 
 #ifdef CONFIG_DEBUG_FS
 	struct dentry *debugfs_dentry_lru, *debugfs_caps;
@@ -989,11 +991,11 @@  extern void ceph_add_cap(struct inode *inode,
 			 unsigned cap, unsigned seq, u64 realmino, int flags,
 			 struct ceph_cap **new_cap);
 extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
+extern void __ceph_remove_caps(struct inode* inode);
 extern void ceph_put_cap(struct ceph_mds_client *mdsc,
 			 struct ceph_cap *cap);
 extern int ceph_is_any_caps(struct inode *inode);
 
-extern void ceph_queue_caps_release(struct inode *inode);
 extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);
 extern int ceph_fsync(struct file *file, loff_t start, loff_t end,
 		      int datasync);