@@ -979,6 +979,8 @@ static int send_cap_msg(struct ceph_mds_session *session,
{
struct ceph_mds_caps *fc;
struct ceph_msg *msg;
+ int msg_len;
+ __le32 *epoch_barrier;
dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
" seq %u/%u mseq %u follows %lld size %llu/%llu"
@@ -988,15 +990,31 @@ static int send_cap_msg(struct ceph_mds_session *session,
seq, issue_seq, mseq, follows, size, max_size,
xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
- msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS, false);
+ /*
+ * MSG_CLIENT_CAPS version 5 size calculation:
+ sizeof(ceph_mds_caps) for caps field
+ 0 bytes for snapbl field (headerless)
+ 4 bytes for flockbl field len=0
+ 0 bytes for peer field (op not in import|export)
+ 8 bytes for inline_version
+ 4 bytes for inline_data len=0
+ 4 bytes for epoch barrier
+ */
+ msg_len = sizeof(*fc) + 4 + 8 + 4 + 4;
+
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, msg_len, GFP_NOFS, false);
if (!msg)
return -ENOMEM;
+ memset(msg->front.iov_base, 0, msg_len);
+
+ epoch_barrier = msg->front.iov_base + sizeof(*fc) + 4 + 8 + 4;
+ *epoch_barrier = cpu_to_le32(session->s_mdsc->cap_epoch_barrier);
+ msg->hdr.version = cpu_to_le16(5);
+ msg->hdr.compat_version = cpu_to_le16(1);
msg->hdr.tid = cpu_to_le64(flush_tid);
fc = msg->front.iov_base;
- memset(fc, 0, sizeof(*fc));
-
fc->cap_id = cpu_to_le64(cid);
fc->op = cpu_to_le32(op);
fc->seq = cpu_to_le32(seq);
@@ -2973,14 +2991,39 @@ retry:
*target_cap = cap;
}
+
+
+/**
+ * Delay handling a cap message until a given OSD epoch
+ *
+ * Call with session mutex held
+ * Call with OSD map_sem held for read
+ */
+static void delay_message(struct ceph_mds_session *session, struct ceph_msg *msg, u32 epoch)
+{
+ struct ceph_delayed_message *dm;
+
+ ceph_msg_get(msg);
+
+ dm = kmalloc(sizeof(*dm), GFP_NOFS);
+ memset(dm, 0, sizeof(*dm));
+ dm->dm_epoch = epoch;
+ dm->dm_msg = msg;
+
+ list_add(&dm->dm_item, &session->s_delayed_msgs);
+}
+
/*
* Handle a caps message from the MDS.
*
* Identify the appropriate session, inode, and call the right handler
* based on the cap op.
+ *
+ * skip_epoch_check: skip checking epoch_barrier (avoid taking mdsc and osdc locks)
*/
void ceph_handle_caps(struct ceph_mds_session *session,
- struct ceph_msg *msg)
+ struct ceph_msg *msg,
+ bool skip_epoch_check)
{
struct ceph_mds_client *mdsc = session->s_mdsc;
struct super_block *sb = mdsc->fsc->sb;
@@ -3001,6 +3044,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
void *flock;
void *end;
u32 flock_len;
+ u64 inline_version;
+ u32 inline_len;
+ u32 epoch_barrier = 0;
dout("handle_caps from mds%d\n", mds);
@@ -3045,6 +3091,62 @@ void ceph_handle_caps(struct ceph_mds_session *session,
}
}
+ if (le16_to_cpu(msg->hdr.version) >= 5) {
+ void *p = flock + flock_len;
+
+ // Skip peer if applicable
+ if (op == CEPH_CAP_OP_IMPORT) {
+ p += sizeof(struct ceph_mds_cap_peer);
+ }
+
+ // We don't use this, but decode it to advance p
+ ceph_decode_64_safe(&p, end, inline_version, bad);
+
+ // Read 4 bytes for length of inline_data
+ ceph_decode_32_safe(&p, end, inline_len, bad);
+
+ // Skip length of inline_data
+ if (inline_len != 0) {
+ p += inline_len;
+ }
+
+ // Read epoch_barrier field
+ ceph_decode_32_safe(&p, end, epoch_barrier, bad);
+ }
+
+ dout("handle_caps v=%d barrier=%d skip=%d\n",
+ le16_to_cpu(msg->hdr.version),
+ epoch_barrier,
+ skip_epoch_check);
+
+ if (epoch_barrier && !skip_epoch_check) {
+ struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
+ // We are required to wait until we have this OSD map epoch
+ // before using the capability.
+ mutex_lock(&mdsc->mutex);
+ if (epoch_barrier > mdsc->cap_epoch_barrier) {
+ mdsc->cap_epoch_barrier = epoch_barrier;
+ }
+ mutex_unlock(&mdsc->mutex);
+
+ down_read(&osdc->map_sem);
+ if (osdc->osdmap->epoch < epoch_barrier) {
+ dout("handle_caps delaying message until OSD epoch %d\n", epoch_barrier);
+ mutex_lock(&session->s_mutex);
+ delay_message(session, msg, epoch_barrier);
+ mutex_unlock(&session->s_mutex);
+
+ // Kick OSD client to get the latest map
+ ceph_monc_request_next_osdmap(&osdc->client->monc);
+
+ up_read(&osdc->map_sem);
+ return;
+ } else {
+ dout("handle_caps barrier %d already satisfied (%d)\n", epoch_barrier, osdc->osdmap->epoch);
+ up_read(&osdc->map_sem);
+ }
+ }
+
/* lookup ino */
inode = ceph_find_inode(sb, vino);
ci = ceph_inode(inode);
@@ -332,6 +332,94 @@ static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
}
+/**
+ * Unlink and delete a ceph_delayed message
+ */
+static void discard_delayed(
+ struct ceph_mds_session *session,
+ struct ceph_delayed_message *dm)
+{
+ dout("discard_delayed: putting msg %p\n", dm->dm_msg);
+ ceph_msg_put(dm->dm_msg);
+ list_del(&dm->dm_item);
+ kfree(dm);
+}
+
+
+/**
+ * For all messages waiting for <= this epoch,
+ * dispatch
+ */
+static void replay_delayed(
+ struct ceph_mds_session *session,
+ struct ceph_delayed_message *dm)
+{
+ dout("replay_delayed: releasing delayed msg %p\n", dm->dm_msg);
+ ceph_handle_caps(session, dm->dm_msg, true);
+ discard_delayed(session, dm);
+}
+
+
+/**
+ * Find any delayed messages that are ready to be replayed,
+ * and move them to replay_list
+ */
+static void find_ready_delayed(
+ struct ceph_mds_session *session,
+ struct ceph_delayed_message *dm,
+ struct list_head *replay_list,
+ u32 epoch)
+{
+ if (dm->dm_epoch <= epoch) {
+ dout("find_ready_delayed: delayed msg %p ready (%d vs %d)\n", dm->dm_msg, dm->dm_epoch, epoch);
+ list_del(&dm->dm_item);
+ list_add(&dm->dm_item, replay_list);
+ }
+}
+
+
+/**
+ * Call this with map_sem held for read
+ */
+static void handle_osd_map(struct ceph_osd_client *osdc, void *p)
+{
+ struct ceph_mds_client *mdsc = (struct ceph_mds_client*)p;
+ u32 cancelled_epoch = 0;
+ int mds_id;
+
+ if (osdc->osdmap->flags & CEPH_OSDMAP_FULL) {
+ cancelled_epoch = ceph_osdc_cancel_writes(osdc, -ENOSPC);
+ if (cancelled_epoch) {
+ mdsc->cap_epoch_barrier = max(cancelled_epoch + 1,
+ mdsc->cap_epoch_barrier);
+ }
+ }
+
+ dout("handle_osd_map: epoch=%d\n", osdc->osdmap->epoch);
+
+ // Release any cap messages waiting for this epoch
+ for (mds_id = 0; mds_id < mdsc->max_sessions; ++mds_id) {
+ struct ceph_mds_session *session = mdsc->sessions[mds_id];
+ if (session != NULL) {
+ struct ceph_delayed_message *dm = NULL;
+ struct ceph_delayed_message *dm_next = NULL;
+ struct list_head replay_msgs;
+ INIT_LIST_HEAD(&replay_msgs);
+
+ dout("find_ready_delayed... (s=%p)\n", session);
+ mutex_lock(&session->s_mutex);
+ list_for_each_entry_safe(dm, dm_next, &session->s_delayed_msgs, dm_item)
+ find_ready_delayed(session, dm, &replay_msgs, osdc->osdmap->epoch);
+ mutex_unlock(&session->s_mutex);
+
+ dout("replay_delayed... (s=%p)\n", session);
+ list_for_each_entry_safe(dm, dm_next, &replay_msgs, dm_item)
+ replay_delayed(session, dm);
+ }
+ }
+}
+
+
/*
* sessions
*/
@@ -451,6 +539,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
INIT_LIST_HEAD(&s->s_cap_releases_done);
INIT_LIST_HEAD(&s->s_cap_flushing);
INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
+ INIT_LIST_HEAD(&s->s_delayed_msgs);
dout("register_session mds%d\n", mds);
if (mds >= mdsc->max_sessions) {
@@ -488,10 +577,17 @@ fail_realloc:
static void __unregister_session(struct ceph_mds_client *mdsc,
struct ceph_mds_session *s)
{
+ struct ceph_delayed_message *dm;
+ struct ceph_delayed_message *dm_next;
+
dout("__unregister_session mds%d %p\n", s->s_mds, s);
BUG_ON(mdsc->sessions[s->s_mds] != s);
mdsc->sessions[s->s_mds] = NULL;
ceph_con_close(&s->s_con);
+
+ list_for_each_entry_safe(dm, dm_next, &s->s_delayed_msgs, dm_item)
+ discard_delayed(s, dm);
+
ceph_put_mds_session(s);
}
@@ -3278,22 +3374,6 @@ static void delayed_work(struct work_struct *work)
schedule_delayed(mdsc);
}
-/**
- * Call this with map_sem held for read
- */
-static void handle_osd_map(struct ceph_osd_client *osdc, void *p)
-{
- struct ceph_mds_client *mdsc = (struct ceph_mds_client*)p;
- u32 cancelled_epoch = 0;
-
- if (osdc->osdmap->flags & CEPH_OSDMAP_FULL) {
- cancelled_epoch = ceph_osdc_cancel_writes(osdc, -ENOSPC);
- if (cancelled_epoch) {
- mdsc->cap_epoch_barrier = max(cancelled_epoch + 1,
- mdsc->cap_epoch_barrier);
- }
- }
-}
int ceph_mdsc_init(struct ceph_fs_client *fsc)
@@ -3683,7 +3763,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
handle_forward(mdsc, s, msg);
break;
case CEPH_MSG_CLIENT_CAPS:
- ceph_handle_caps(s, msg);
+ ceph_handle_caps(s, msg, false);
break;
case CEPH_MSG_CLIENT_SNAP:
ceph_handle_snap(mdsc, s, msg);
@@ -151,6 +151,15 @@ struct ceph_mds_session {
atomic_t s_ref;
struct list_head s_waiting; /* waiting requests */
struct list_head s_unsafe; /* unsafe requests */
+
+ struct list_head s_delayed_msgs; /* OSD epoch waiters */
+};
+
+struct ceph_delayed_message
+{
+ struct ceph_msg *dm_msg;
+ u32 dm_epoch;
+ struct list_head dm_item;
};
/*
@@ -814,7 +814,8 @@ static inline void ceph_forget_all_cached_acls(struct inode *inode)
/* caps.c */
extern const char *ceph_cap_string(int c);
extern void ceph_handle_caps(struct ceph_mds_session *session,
- struct ceph_msg *msg);
+ struct ceph_msg *msg,
+ bool skip_epoch_check);
extern struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
struct ceph_cap_reservation *ctx);
extern void ceph_add_cap(struct inode *inode,
In the latest version of this message, the barrier field is added as an instruction to clients that they may not use the attached capabilities until they have a particular OSD map epoch. Signed-off-by: John Spray <john.spray@redhat.com> --- fs/ceph/caps.c | 110 +++++++++++++++++++++++++++++++++++++++++++++++-- fs/ceph/mds_client.c | 114 +++++++++++++++++++++++++++++++++++++++++++-------- fs/ceph/mds_client.h | 9 ++++ fs/ceph/super.h | 3 +- 4 files changed, 214 insertions(+), 22 deletions(-)