diff mbox

[v2] Inline data support

Message ID 1378819450-8318-1-git-send-email-liwang@ubuntukylin.com (mailing list archive)
State New, archived
Headers show

Commit Message

Li Wang Sept. 10, 2013, 1:24 p.m. UTC
This patch implements inline data support for Ceph.

Signed-off-by: Yunchuan Wen <yunchuanwen@ubuntukylin.com>
Signed-off-by: Li Wang <liwang@ubuntukylin.com>
---
Against v1:
With simplified process under multiple-writer case,
referred to
http://pad.ceph.com/p/mds-inline-data,
http://www.spinics.net/lists/ceph-devel/msg16018.html
---
 src/ceph_mds.cc             |    1 +
 src/client/Client.cc        |  202 +++++++++++++++++++++++++++++++++++++------
 src/client/Client.h         |    4 +
 src/client/Inode.h          |    5 ++
 src/include/ceph_features.h |    2 +
 src/include/ceph_fs.h       |    3 +
 src/include/rados.h         |    1 +
 src/mds/CInode.cc           |   22 +++++
 src/mds/Capability.h        |    2 +
 src/mds/Locker.cc           |    7 ++
 src/mds/mdstypes.cc         |   12 ++-
 src/mds/mdstypes.h          |    3 +
 src/messages/MClientCaps.h  |   18 +++-
 src/messages/MClientReply.h |    9 ++
 src/osd/ReplicatedPG.cc     |    5 +-
 src/osdc/Objecter.h         |   21 ++++-
 16 files changed, 283 insertions(+), 34 deletions(-)

Comments

Sage Weil Sept. 17, 2013, 4:59 p.m. UTC | #1
Hi Li,

I broke this up into pieces and pushed to wip-inline to ease review.  The 
commits should each get a changelog with signoff for the final pull 
request.

There were a few minor points, but the main one is the way that the 
migration to non-inline data works.  The current version blocks, but I 
think we can streamline it and avoid blocking at all in that case, at 
least in the case where teh subsequent read/write is going to be on the 
first object.

Basically, if we know we are about to send the sync read/write, we can 
precede it with the conditional uninline request but do not wait for the 
result.  It will always be ordered before the subsequent read/write.  We 
could keep a completion that just stores the error code in a variable on 
the read/write method's stack, and then check it after the second request 
completes.  That way it can be propagated back to the caller, or at least 
we can generate a message in the log.

That aside, I think this looks pretty good!  Any maybe we should just 
start with the current sync approach and optimize the first-object 
case later.

It would be good to have a strong test case for this code.  Maybe 
something that creates a lot of small files and then makes them larger.  
And something that specifically stress tests the inline -> not inline 
transition for multiple readers/writers.

Thanks!
sage



On Tue, 10 Sep 2013, Li Wang wrote:

> This patch implements inline data support for Ceph.
> 
> Signed-off-by: Yunchuan Wen <yunchuanwen@ubuntukylin.com>
> Signed-off-by: Li Wang <liwang@ubuntukylin.com>
> ---
> Against v1:
> With simplified process under multiple-writer case,
> referred to
> http://pad.ceph.com/p/mds-inline-data,
> http://www.spinics.net/lists/ceph-devel/msg16018.html
> ---
>  src/ceph_mds.cc             |    1 +
>  src/client/Client.cc        |  202 +++++++++++++++++++++++++++++++++++++------
>  src/client/Client.h         |    4 +
>  src/client/Inode.h          |    5 ++
>  src/include/ceph_features.h |    2 +
>  src/include/ceph_fs.h       |    3 +
>  src/include/rados.h         |    1 +
>  src/mds/CInode.cc           |   22 +++++
>  src/mds/Capability.h        |    2 +
>  src/mds/Locker.cc           |    7 ++
>  src/mds/mdstypes.cc         |   12 ++-
>  src/mds/mdstypes.h          |    3 +
>  src/messages/MClientCaps.h  |   18 +++-
>  src/messages/MClientReply.h |    9 ++
>  src/osd/ReplicatedPG.cc     |    5 +-
>  src/osdc/Objecter.h         |   21 ++++-
>  16 files changed, 283 insertions(+), 34 deletions(-)
> 
> diff --git a/src/ceph_mds.cc b/src/ceph_mds.cc
> index 88b807b..dac676f 100644
> --- a/src/ceph_mds.cc
> +++ b/src/ceph_mds.cc
> @@ -243,6 +243,7 @@ int main(int argc, const char **argv)
>      CEPH_FEATURE_UID |
>      CEPH_FEATURE_NOSRCADDR |
>      CEPH_FEATURE_DIRLAYOUTHASH |
> +    CEPH_FEATURE_MDS_INLINE_DATA |
>      CEPH_FEATURE_PGID64 |
>      CEPH_FEATURE_MSG_AUTH;
>    uint64_t required =
> diff --git a/src/client/Client.cc b/src/client/Client.cc
> index 77fd208..f47579f 100644
> --- a/src/client/Client.cc
> +++ b/src/client/Client.cc
> @@ -485,6 +485,8 @@ void Client::update_inode_file_bits(Inode *in,
>  				    uint64_t time_warp_seq, utime_t ctime,
>  				    utime_t mtime,
>  				    utime_t atime,
> +				    uint64_t inline_version,
> +				    bufferlist& inline_data,
>  				    int issued)
>  {
>    bool warn = false;
> @@ -495,6 +497,11 @@ void Client::update_inode_file_bits(Inode *in,
>  	   << " local " << in->time_warp_seq << dendl;
>    uint64_t prior_size = in->size;
>  
> +  if (inline_version > in->inline_version) {
> +    in->inline_data = inline_data;
> +    in->inline_version = inline_version;
> +  }
> +
>    if (truncate_seq > in->truncate_seq ||
>        (truncate_seq == in->truncate_seq && size > in->size)) {
>      ldout(cct, 10) << "size " << in->size << " -> " << size << dendl;
> @@ -511,6 +518,13 @@ void Client::update_inode_file_bits(Inode *in,
>  	_invalidate_inode_cache(in, truncate_size, prior_size - truncate_size, true);
>        }
>      }
> +
> +    // truncate inline data
> +    if (in->inline_version < CEPH_INLINE_DISABLED) {
> +      uint32_t len = in->inline_data.length();
> +      if (size < len)
> +        in->inline_data.splice(size, len - size);
> +    }
>    }
>    if (truncate_seq >= in->truncate_seq &&
>        in->truncate_size != truncate_size) {
> @@ -645,6 +659,7 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from, MetaSession *sessi
>    
>      update_inode_file_bits(in, st->truncate_seq, st->truncate_size, st->size,
>  			   st->time_warp_seq, st->ctime, st->mtime, st->atime,
> +			   st->inline_version, st->inline_data,
>  			   issued);
>    }
>  
> @@ -2353,6 +2368,11 @@ void Client::send_cap(Inode *in, MetaSession *session, Cap *cap,
>    in->ctime.encode_timeval(&m->head.ctime);
>    m->head.time_warp_seq = in->time_warp_seq;
>      
> +  if (flush & CEPH_CAP_FILE_WR) {
> +    m->inline_version = in->inline_version;
> +    m->inline_data = in->inline_data;
> +  }
> +
>    in->reported_size = in->size;
>    m->set_snap_follows(follows);
>    cap->wanted = want;
> @@ -3482,7 +3502,9 @@ void Client::handle_cap_trunc(MetaSession *session, Inode *in, MClientCaps *m)
>    issued |= implemented;
>    update_inode_file_bits(in, m->get_truncate_seq(), m->get_truncate_size(),
>                           m->get_size(), m->get_time_warp_seq(), m->get_ctime(),
> -                         m->get_mtime(), m->get_atime(), issued);
> +                         m->get_mtime(), m->get_atime(),
> +                         m->inline_version, m->inline_data,
> +                         issued);
>    m->put();
>  }
>  
> @@ -3589,7 +3611,8 @@ void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClient
>      in->xattr_version = m->head.xattr_version;
>    }
>    update_inode_file_bits(in, m->get_truncate_seq(), m->get_truncate_size(), m->get_size(),
> -			 m->get_time_warp_seq(), m->get_ctime(), m->get_mtime(), m->get_atime(), issued);
> +			 m->get_time_warp_seq(), m->get_ctime(), m->get_mtime(), m->get_atime(),
> +			 m->inline_version, m->inline_data, issued);
>  
>    // max_size
>    if (cap == in->auth_cap &&
> @@ -5643,6 +5666,57 @@ void Client::unlock_fh_pos(Fh *f)
>    f->pos_locked = false;
>  }
>  
> +int Client::migration_inline_data(Inode *in)
> +{
> +  ObjectOperation ops;
> +  bufferlist inline_version_bl;
> +  ::encode(in->inline_version, inline_version_bl);
> +  ops.cmpxattr("inline_version",
> +               CEPH_OSD_CMPXATTR_OP_GT,
> +               CEPH_OSD_CMPXATTR_MODE_U64,
> +               CEPH_OSD_OP_FLAG_NOENTOK,
> +               inline_version_bl);
> +  bufferlist inline_data = in->inline_data;
> +  ops.write(0, inline_data, in->truncate_size, in->truncate_seq);
> +  ops.setxattr("inline_version", inline_version_bl);
> +
> +  char oid_buf[32];
> +  snprintf(oid_buf, sizeof(oid_buf), "%llx.00000000", (long long unsigned)in->ino);
> +  object_t oid = oid_buf;
> +
> +  Mutex flock("Client::migration_inline_data flock");
> +  Cond cond;
> +  bool done = false;
> +  int ret;
> +  Context *oncommit = new C_SafeCond(&flock, &cond, &done, &ret);
> +
> +  objecter->mutate(oid,
> +                   OSDMap::file_to_object_locator(in->layout),
> +                   ops,
> +                   in->snaprealm->get_snap_context(),
> +                   ceph_clock_now(cct),
> +                   0,
> +                   NULL,
> +                   oncommit);
> +
> +  client_lock.Unlock();
> +  flock.Lock();
> +  while (!done)
> +    cond.Wait(flock);
> +  flock.Unlock();
> +  client_lock.Lock();
> +
> +  if (ret >= 0 || ret == -ECANCELED) {
> +    in->inline_data.clear();
> +    in->inline_version = CEPH_INLINE_DISABLED;
> +    mark_caps_dirty(in, CEPH_CAP_FILE_WR);
> +    check_caps(in, false);
> +
> +    ret = 0;
> +  }
> +
> +  return ret;
> +}
>  
>  // 
>  
> @@ -5688,6 +5762,30 @@ int Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl)
>      movepos = true;
>    }
>  
> +  if (in->inline_version < CEPH_INLINE_DISABLED) {
> +    if (!(have & CEPH_CAP_FILE_CACHE)) {
> +      r = migration_inline_data(in);
> +      if (r < 0)
> +        goto done;
> +    } else {
> +      uint32_t len = in->inline_data.length();
> +
> +      uint64_t endoff = offset + size;
> +      if (endoff > in->size)
> +        endoff = in->size;
> +
> +      if (endoff > len) {
> +        if (offset < len)
> +          bl->substr_of(in->inline_data, offset, len - offset);
> +        bl->append_zero(endoff - len);
> +      } else if (endoff > (uint64_t)offset) {
> +        bl->substr_of(in->inline_data, offset, endoff - offset);
> +      }
> +
> +      goto success;
> +    }
> +  }
> +
>    if (!conf->client_debug_force_sync_read &&
>        (cct->_conf->client_oc && (have & CEPH_CAP_FILE_CACHE))) {
>  
> @@ -5704,6 +5802,8 @@ int Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl)
>      goto done;
>    }
>  
> +success:
> +
>    if (movepos) {
>      // adjust fd pos
>      f->pos = offset+bl->length();
> @@ -5995,6 +6095,29 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
>  
>    ldout(cct, 10) << " snaprealm " << *in->snaprealm << dendl;
>  
> +  if (in->inline_version < CEPH_INLINE_DISABLED) {
> +    if (endoff > CEPH_INLINE_SIZE || !(have & CEPH_CAP_FILE_BUFFER)) {
> +      r = migration_inline_data(in);
> +      if (r < 0)
> +        goto done;
> +    } else {
> +      uint32_t len = in->inline_data.length();
> +
> +      if (endoff < len)
> +        in->inline_data.copy(endoff, len - endoff, bl);
> +
> +      if (offset < len)
> +        in->inline_data.splice(offset, len - offset);
> +      else if (offset > len)
> +        in->inline_data.append_zero(offset - len);
> +
> +      in->inline_data.append(bl);
> +      in->inline_version++;
> +
> +      goto success;
> +    }
> +  }
> +
>    if (cct->_conf->client_oc && (have & CEPH_CAP_FILE_BUFFER)) {
>      // do buffered write
>      if (!in->oset.dirty_or_tx)
> @@ -6045,7 +6168,7 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
>    }
>  
>    // if we get here, write was successful, update client metadata
> -
> +success:
>    // time
>    lat = ceph_clock_now(cct);
>    lat -= start;
> @@ -7719,33 +7842,60 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
>      return r;
>  
>    if (mode & FALLOC_FL_PUNCH_HOLE) {
> -    Mutex flock("Client::_punch_hole flock");
> -    Cond cond;
> -    bool done = false;
> -    Context *onfinish = new C_SafeCond(&flock, &cond, &done);
> -    Context *onsafe = new C_Client_SyncCommit(this, in);
> +    if (in->inline_version < CEPH_INLINE_DISABLED &&
> +        (have & CEPH_CAP_FILE_BUFFER)) {
> +      bufferlist bl;
> +      int len = in->inline_data.length();
> +      if (offset < len) {
> +        if (offset > 0)
> +          in->inline_data.copy(0, offset, bl);
> +        int size = length;
> +        if (offset + size > len)
> +          size = len - offset;
> +        if (size > 0)
> +          bl.append_zero(size);
> +        if (offset + size < len)
> +          in->inline_data.copy(offset + size, len - offset - size, bl);
> +        in->inline_data = bl;
> +        in->inline_version++;
> +      }
> +      in->mtime = ceph_clock_now(cct);
> +      mark_caps_dirty(in, CEPH_CAP_FILE_WR);
> +    } else {
> +      if (in->inline_version < CEPH_INLINE_DISABLED) {
> +        r = migration_inline_data(in);
> +        if (r < 0)
> +          goto done;
> +      }
>  
> -    unsafe_sync_write++;
> -    get_cap_ref(in, CEPH_CAP_FILE_BUFFER);
> +      Mutex flock("Client::_punch_hole flock");
> +      Cond cond;
> +      bool done = false;
> +      Context *onfinish = new C_SafeCond(&flock, &cond, &done);
> +      Context *onsafe = new C_Client_SyncCommit(this, in);
>  
> -    _invalidate_inode_cache(in, offset, length, true);
> -    r = filer->zero(in->ino, &in->layout,
> -                    in->snaprealm->get_snap_context(),
> -                    offset, length,
> -                    ceph_clock_now(cct),
> -                    0, true, onfinish, onsafe);
> -    if (r < 0)
> -      goto done;
> +      unsafe_sync_write++;
> +      get_cap_ref(in, CEPH_CAP_FILE_BUFFER);
>  
> -    in->mtime = ceph_clock_now(cct);
> -    mark_caps_dirty(in, CEPH_CAP_FILE_WR);
> +      _invalidate_inode_cache(in, offset, length, true);
> +      r = filer->zero(in->ino, &in->layout,
> +                      in->snaprealm->get_snap_context(),
> +                      offset, length,
> +                      ceph_clock_now(cct),
> +                      0, true, onfinish, onsafe);
> +      if (r < 0)
> +        goto done;
>  
> -    client_lock.Unlock();
> -    flock.Lock();
> -    while (!done)
> -      cond.Wait(flock);
> -    flock.Unlock();
> -    client_lock.Lock();
> +      in->mtime = ceph_clock_now(cct);
> +      mark_caps_dirty(in, CEPH_CAP_FILE_WR);
> +
> +      client_lock.Unlock();
> +      flock.Lock();
> +      while (!done)
> +        cond.Wait(flock);
> +      flock.Unlock();
> +      client_lock.Lock();
> +    }
>    } else if (!(mode & FALLOC_FL_KEEP_SIZE)) {
>      uint64_t size = offset + length;
>      if (size > in->size) {
> diff --git a/src/client/Client.h b/src/client/Client.h
> index c7c9cef..5fc05f4 100644
> --- a/src/client/Client.h
> +++ b/src/client/Client.h
> @@ -420,6 +420,9 @@ protected:
>  
>    void handle_lease(MClientLease *m);
>  
> +  // inline data
> +  int migration_inline_data(Inode *in);
> +
>    // file caps
>    void check_cap_issue(Inode *in, Cap *cap, unsigned issued);
>    void add_update_cap(Inode *in, MetaSession *session, uint64_t cap_id,
> @@ -495,6 +498,7 @@ protected:
>    void update_inode_file_bits(Inode *in,
>  			      uint64_t truncate_seq, uint64_t truncate_size, uint64_t size,
>  			      uint64_t time_warp_seq, utime_t ctime, utime_t mtime, utime_t atime,
> +			      uint64_t inline_version, bufferlist& inline_data,
>  			      int issued);
>    Inode *add_update_inode(InodeStat *st, utime_t ttl, MetaSession *session);
>    Dentry *insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dlease, 
> diff --git a/src/client/Inode.h b/src/client/Inode.h
> index cc054a6..bb17706 100644
> --- a/src/client/Inode.h
> +++ b/src/client/Inode.h
> @@ -111,6 +111,10 @@ class Inode {
>    version_t version;           // auth only
>    version_t xattr_version;
>  
> +  // inline data
> +  uint64_t   inline_version;
> +  bufferlist inline_data;
> +
>    bool is_symlink() const { return (mode & S_IFMT) == S_IFLNK; }
>    bool is_dir()     const { return (mode & S_IFMT) == S_IFDIR; }
>    bool is_file()    const { return (mode & S_IFMT) == S_IFREG; }
> @@ -207,6 +211,7 @@ class Inode {
>        rdev(0), mode(0), uid(0), gid(0), nlink(0),
>        size(0), truncate_seq(1), truncate_size(-1),
>        time_warp_seq(0), max_size(0), version(0), xattr_version(0),
> +      inline_version(0),
>        flags(0),
>        dir_hashed(false), dir_replicated(false), auth_cap(NULL),
>        dirty_caps(0), flushing_caps(0), flushing_cap_seq(0), shared_gen(0), cache_gen(0),
> diff --git a/src/include/ceph_features.h b/src/include/ceph_features.h
> index c0f01cc..70ee921 100644
> --- a/src/include/ceph_features.h
> +++ b/src/include/ceph_features.h
> @@ -40,6 +40,7 @@
>  #define CEPH_FEATURE_MON_SCRUB      (1ULL<<33)
>  #define CEPH_FEATURE_OSD_PACKED_RECOVERY (1ULL<<34)
>  #define CEPH_FEATURE_OSD_CACHEPOOL (1ULL<<35)
> +#define CEPH_FEATURE_MDS_INLINE_DATA     (1ULL<<36)
>  
>  /*
>   * The introduction of CEPH_FEATURE_OSD_SNAPMAPPER caused the feature
> @@ -103,6 +104,7 @@ static inline unsigned long long ceph_sanitize_features(unsigned long long f) {
>  	 CEPH_FEATURE_MON_SCRUB	|	    \
>  	 CEPH_FEATURE_OSD_PACKED_RECOVERY | \
>  	 CEPH_FEATURE_OSD_CACHEPOOL | \
> +	 CEPH_FEATURE_MDS_INLINE_DATA | \
>  	 0ULL)
>  
>  #define CEPH_FEATURES_SUPPORTED_DEFAULT  CEPH_FEATURES_ALL
> diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h
> index 6c41d14..406b51e 100644
> --- a/src/include/ceph_fs.h
> +++ b/src/include/ceph_fs.h
> @@ -522,6 +522,9 @@ struct ceph_filelock {
>  
>  int ceph_flags_to_mode(int flags);
>  
> +/* inline data state */
> +#define CEPH_INLINE_DISABLED	((__u64)-1)
> +#define CEPH_INLINE_SIZE	(1 << 12)
>  
>  /* capability bits */
>  #define CEPH_CAP_PIN         1  /* no specific capabilities beyond the pin */
> diff --git a/src/include/rados.h b/src/include/rados.h
> index 178c171..c387a2e 100644
> --- a/src/include/rados.h
> +++ b/src/include/rados.h
> @@ -342,6 +342,7 @@ enum {
>  enum {
>  	CEPH_OSD_OP_FLAG_EXCL = 1,      /* EXCL object create */
>  	CEPH_OSD_OP_FLAG_FAILOK = 2,    /* continue despite failure */
> +	CEPH_OSD_OP_FLAG_NOENTOK = 4,   /* ignore NOENT error */
>  };
>  
>  #define EOLDSNAPC    85  /* ORDERSNAP flag set; writer has old snapc*/
> diff --git a/src/mds/CInode.cc b/src/mds/CInode.cc
> index 46f8d33..729f126 100644
> --- a/src/mds/CInode.cc
> +++ b/src/mds/CInode.cc
> @@ -2825,6 +2825,16 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
>    e.files = i->dirstat.nfiles;
>    e.subdirs = i->dirstat.nsubdirs;
>  
> +  // inline data
> +  uint64_t inline_version = 0;
> +  bufferlist inline_data;
> +  if (!cap || (cap->client_inline_version < i->inline_version)) {
> +    inline_version = i->inline_version;
> +    inline_data = i->inline_data;
> +    if (cap)
> +      cap->client_inline_version = i->inline_version;
> +  }
> +
>    // nest (do same as file... :/)
>    i->rstat.rctime.encode_timeval(&e.rctime);
>    e.rbytes = i->rstat.rbytes;
> @@ -2863,6 +2873,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
>      bytes += (sizeof(__u32) + sizeof(__u32)) * dirfragtree._splits.size();
>      bytes += sizeof(__u32) + symlink.length();
>      bytes += sizeof(__u32) + xbl.length();
> +    bytes += sizeof(__u64) + sizeof(__u32) + inline_data.length();
>      if (bytes > max_bytes)
>        return -ENOSPC;
>    }
> @@ -2958,6 +2969,10 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
>      ::encode(i->dir_layout, bl);
>    }
>    ::encode(xbl, bl);
> +  if (session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) {
> +    ::encode(inline_version, bl);
> +    ::encode(inline_data, bl);
> +  }
>  
>    return valid;
>  }
> @@ -2990,6 +3005,13 @@ void CInode::encode_cap_message(MClientCaps *m, Capability *cap)
>    i->atime.encode_timeval(&m->head.atime);
>    m->head.time_warp_seq = i->time_warp_seq;
>  
> +  if (cap->client_inline_version < i->inline_version) {
> +    m->inline_version = cap->client_inline_version = i->inline_version;
> +    m->inline_data = i->inline_data;
> +  } else {
> +    m->inline_version = 0;
> +  }
> +
>    // max_size is min of projected, actual.
>    uint64_t oldms = oi->client_ranges.count(client) ? oi->client_ranges[client].range.last : 0;
>    uint64_t newms = pi->client_ranges.count(client) ? pi->client_ranges[client].range.last : 0;
> diff --git a/src/mds/Capability.h b/src/mds/Capability.h
> index fb6b3dc..995ea3a 100644
> --- a/src/mds/Capability.h
> +++ b/src/mds/Capability.h
> @@ -209,6 +209,7 @@ private:
>  public:
>    snapid_t client_follows;
>    version_t client_xattr_version;
> +  uint64_t client_inline_version;
>    
>    xlist<Capability*>::item item_session_caps;
>    xlist<Capability*>::item item_snaprealm_caps;
> @@ -223,6 +224,7 @@ public:
>      mseq(0),
>      suppress(0), stale(false),
>      client_follows(0), client_xattr_version(0),
> +    client_inline_version(0),
>      item_session_caps(this), item_snaprealm_caps(this) {
>      g_num_cap++;
>      g_num_capa++;
> diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc
> index 99bd761..4f1d322 100644
> --- a/src/mds/Locker.cc
> +++ b/src/mds/Locker.cc
> @@ -2686,6 +2686,7 @@ void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *
>      utime_t mtime = m->get_mtime();
>      utime_t ctime = m->get_ctime();
>      uint64_t size = m->get_size();
> +    uint64_t inline_version = m->inline_version;
>      
>      if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
>  	((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
> @@ -2705,6 +2706,12 @@ void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *
>        pi->size = size;
>        pi->rstat.rbytes = size;
>      }
> +    if (in->inode.is_file() &&
> +        (dirty & CEPH_CAP_FILE_WR) &&
> +        inline_version > pi->inline_version) {
> +      pi->inline_version = inline_version;
> +      pi->inline_data = m->inline_data;
> +    }
>      if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
>        dout(7) << "  atime " << pi->atime << " -> " << atime
>  	      << " for " << *in << dendl;
> diff --git a/src/mds/mdstypes.cc b/src/mds/mdstypes.cc
> index 6886786..8634adf 100644
> --- a/src/mds/mdstypes.cc
> +++ b/src/mds/mdstypes.cc
> @@ -204,7 +204,7 @@ ostream& operator<<(ostream& out, const client_writeable_range_t& r)
>   */
>  void inode_t::encode(bufferlist &bl) const
>  {
> -  ENCODE_START(7, 6, bl);
> +  ENCODE_START(8, 8, bl);
>  
>    ::encode(ino, bl);
>    ::encode(rdev, bl);
> @@ -227,6 +227,8 @@ void inode_t::encode(bufferlist &bl) const
>    ::encode(mtime, bl);
>    ::encode(atime, bl);
>    ::encode(time_warp_seq, bl);
> +  ::encode(inline_version, bl);
> +  ::encode(inline_data, bl);
>    ::encode(client_ranges, bl);
>  
>    ::encode(dirstat, bl);
> @@ -244,7 +246,7 @@ void inode_t::encode(bufferlist &bl) const
>  
>  void inode_t::decode(bufferlist::iterator &p)
>  {
> -  DECODE_START_LEGACY_COMPAT_LEN(7, 6, 6, p);
> +  DECODE_START_LEGACY_COMPAT_LEN(8, 6, 6, p);
>  
>    ::decode(ino, p);
>    ::decode(rdev, p);
> @@ -273,6 +275,12 @@ void inode_t::decode(bufferlist::iterator &p)
>    ::decode(mtime, p);
>    ::decode(atime, p);
>    ::decode(time_warp_seq, p);
> +  if (struct_v >= 8) {
> +    ::decode(inline_version, p);
> +    ::decode(inline_data, p);
> +  } else {
> +    inline_version = CEPH_INLINE_DISABLED;
> +  }
>    if (struct_v >= 3) {
>      ::decode(client_ranges, p);
>    } else {
> diff --git a/src/mds/mdstypes.h b/src/mds/mdstypes.h
> index 902e310..928167c 100644
> --- a/src/mds/mdstypes.h
> +++ b/src/mds/mdstypes.h
> @@ -335,6 +335,8 @@ struct inode_t {
>    utime_t    mtime;   // file data modify time.
>    utime_t    atime;   // file data access time.
>    uint32_t   time_warp_seq;  // count of (potential) mtime/atime timewarps (i.e., utimes())
> +  bufferlist inline_data;
> +  uint64_t   inline_version;
>  
>    map<client_t,client_writeable_range_t> client_ranges;  // client(s) can write to these ranges
>  
> @@ -356,6 +358,7 @@ struct inode_t {
>  	      size(0), truncate_seq(0), truncate_size(0), truncate_from(0),
>  	      truncate_pending(0),
>  	      time_warp_seq(0),
> +	      inline_version(1),
>  	      version(0), file_data_version(0), xattr_version(0), backtrace_version(0) {
>      clear_layout();
>      memset(&dir_layout, 0, sizeof(dir_layout));
> diff --git a/src/messages/MClientCaps.h b/src/messages/MClientCaps.h
> index 117f241..260d714 100644
> --- a/src/messages/MClientCaps.h
> +++ b/src/messages/MClientCaps.h
> @@ -21,7 +21,7 @@
>  
>  class MClientCaps : public Message {
>  
> -  static const int HEAD_VERSION = 2;   // added flock metadata
> +  static const int HEAD_VERSION = 3;   // added flock metadata, inline data
>    static const int COMPAT_VERSION = 1;
>  
>   public:
> @@ -29,6 +29,8 @@ class MClientCaps : public Message {
>    bufferlist snapbl;
>    bufferlist xattrbl;
>    bufferlist flockbl;
> +  uint64_t   inline_version;
> +  bufferlist inline_data;
>  
>    int      get_caps() { return head.caps; }
>    int      get_wanted() { return head.wanted; }
> @@ -148,6 +150,13 @@ public:
>      if (head.xattr_len)
>        xattrbl = middle;
>  
> +    if (header.version >= 3) {
> +      ::decode(inline_version, p);
> +      ::decode(inline_data, p);
> +    } else {
> +      inline_version = CEPH_INLINE_DISABLED;
> +    }
> +
>      // conditionally decode flock metadata
>      if (header.version >= 2)
>        ::decode(flockbl, p);
> @@ -160,6 +169,13 @@ public:
>  
>      middle = xattrbl;
>  
> +    if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
> +      ::encode(inline_version, payload);
> +      ::encode(inline_data, payload);
> +    } else {
> +      header.version = 2;
> +    }
> +
>      // conditionally include flock metadata
>      if (features & CEPH_FEATURE_FLOCK) {
>        ::encode(flockbl, payload);
> diff --git a/src/messages/MClientReply.h b/src/messages/MClientReply.h
> index 896245f..a8e83c2 100644
> --- a/src/messages/MClientReply.h
> +++ b/src/messages/MClientReply.h
> @@ -108,6 +108,8 @@ struct InodeStat {
>    uint64_t truncate_size;
>    utime_t ctime, mtime, atime;
>    version_t time_warp_seq;
> +  bufferlist inline_data;
> +  uint64_t inline_version;
>  
>    frag_info_t dirstat;
>    nest_info_t rstat;
> @@ -174,6 +176,13 @@ struct InodeStat {
>  
>      xattr_version = e.xattr_version;
>      ::decode(xattrbl, p);
> +
> +    if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
> +      ::decode(inline_version, p);
> +      ::decode(inline_data, p);
> +    } else {
> +      inline_version = CEPH_INLINE_DISABLED;
> +    }
>    }
>    
>    // see CInode::encode_inodestat for encoder.
> diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
> index b391e17..30f7d01 100644
> --- a/src/osd/ReplicatedPG.cc
> +++ b/src/osd/ReplicatedPG.cc
> @@ -2398,8 +2398,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
>  	  result = osd->store->getattr(coll, soid, name.c_str(), xattr);
>  	else
>  	  result = osd->store->getattr(coll, src_obc->obs.oi.soid, name.c_str(), xattr);
> -	if (result < 0 && result != -EEXIST && result != -ENODATA)
> +       int flags = le32_to_cpu(op.flags);
> +	if (result < 0 && result != -EEXIST && result != -ENODATA &&
> +	    (!(flags & CEPH_OSD_OP_FLAG_NOENTOK) || result != -ENOENT)) {
>  	  break;
> +	}
>  	
>  	ctx->delta_stats.num_rd++;
>  	ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(xattr.length(), 10);
> diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h
> index 154ee41..230745b 100644
> --- a/src/osdc/Objecter.h
> +++ b/src/osdc/Objecter.h
> @@ -112,9 +112,10 @@ struct ObjectOperation {
>        osd_op.indata.append(name);
>      osd_op.indata.append(data);
>    }
> -  void add_xattr_cmp(int op, const char *name, uint8_t cmp_op, uint8_t cmp_mode, const bufferlist& data) {
> +  void add_xattr_cmp(int op, const char *name, uint8_t cmp_op, uint8_t cmp_mode, uint32_t flags, const bufferlist& data) {
>      OSDOp& osd_op = add_op(op);
>      osd_op.op.op = op;
> +    osd_op.op.flags = flags;
>      osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
>      osd_op.op.xattr.value_len = data.length();
>      osd_op.op.xattr.cmp_op = cmp_op;
> @@ -279,8 +280,16 @@ struct ObjectOperation {
>      out_handler[p] = h;
>      out_rval[p] = prval;
>    }
> -  void write(uint64_t off, bufferlist& bl) {
> +  void write(uint64_t off, bufferlist& bl,
> +             uint64_t truncate_size,
> +             uint32_t truncate_seq) {
>      add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl);
> +    OSDOp& o = *ops.rbegin();
> +    o.op.extent.truncate_size = truncate_size;
> +    o.op.extent.truncate_seq = truncate_seq;
> +  }
> +  void write(uint64_t off, bufferlist& bl) {
> +    write(off, bl, 0, 0);
>    }
>    void write_full(bufferlist& bl) {
>      add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl);
> @@ -453,7 +462,10 @@ struct ObjectOperation {
>      add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
>    }
>    void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode, const bufferlist& bl) {
> -    add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl);
> +    add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, 0, bl);
> +  }
> +  void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode, uint32_t flags, const bufferlist& bl) {
> +    add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, flags, bl);
>    }
>    void rmxattr(const char *name) {
>      bufferlist bl;
> @@ -733,11 +745,12 @@ struct ObjectOperation {
>    }
>  
>    void cmpxattr(const char *name, const bufferlist& val,
> -		int op, int mode) {
> +		int op, int mode, int flags = 0) {
>      add_xattr(CEPH_OSD_OP_CMPXATTR, name, val);
>      OSDOp& o = *ops.rbegin();
>      o.op.xattr.cmp_op = op;
>      o.op.xattr.cmp_mode = mode;
> +    o.op.flags = flags;
>    }
>    void src_cmpxattr(const object_t& srcoid, snapid_t srcsnapid,
>  		    const char *name, const bufferlist& val,
> -- 
> 1.7.9.5
> 
> --
> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
> 
> 
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/src/ceph_mds.cc b/src/ceph_mds.cc
index 88b807b..dac676f 100644
--- a/src/ceph_mds.cc
+++ b/src/ceph_mds.cc
@@ -243,6 +243,7 @@  int main(int argc, const char **argv)
     CEPH_FEATURE_UID |
     CEPH_FEATURE_NOSRCADDR |
     CEPH_FEATURE_DIRLAYOUTHASH |
+    CEPH_FEATURE_MDS_INLINE_DATA |
     CEPH_FEATURE_PGID64 |
     CEPH_FEATURE_MSG_AUTH;
   uint64_t required =
diff --git a/src/client/Client.cc b/src/client/Client.cc
index 77fd208..f47579f 100644
--- a/src/client/Client.cc
+++ b/src/client/Client.cc
@@ -485,6 +485,8 @@  void Client::update_inode_file_bits(Inode *in,
 				    uint64_t time_warp_seq, utime_t ctime,
 				    utime_t mtime,
 				    utime_t atime,
+				    uint64_t inline_version,
+				    bufferlist& inline_data,
 				    int issued)
 {
   bool warn = false;
@@ -495,6 +497,11 @@  void Client::update_inode_file_bits(Inode *in,
 	   << " local " << in->time_warp_seq << dendl;
   uint64_t prior_size = in->size;
 
+  if (inline_version > in->inline_version) {
+    in->inline_data = inline_data;
+    in->inline_version = inline_version;
+  }
+
   if (truncate_seq > in->truncate_seq ||
       (truncate_seq == in->truncate_seq && size > in->size)) {
     ldout(cct, 10) << "size " << in->size << " -> " << size << dendl;
@@ -511,6 +518,13 @@  void Client::update_inode_file_bits(Inode *in,
 	_invalidate_inode_cache(in, truncate_size, prior_size - truncate_size, true);
       }
     }
+
+    // truncate inline data
+    if (in->inline_version < CEPH_INLINE_DISABLED) {
+      uint32_t len = in->inline_data.length();
+      if (size < len)
+        in->inline_data.splice(size, len - size);
+    }
   }
   if (truncate_seq >= in->truncate_seq &&
       in->truncate_size != truncate_size) {
@@ -645,6 +659,7 @@  Inode * Client::add_update_inode(InodeStat *st, utime_t from, MetaSession *sessi
   
     update_inode_file_bits(in, st->truncate_seq, st->truncate_size, st->size,
 			   st->time_warp_seq, st->ctime, st->mtime, st->atime,
+			   st->inline_version, st->inline_data,
 			   issued);
   }
 
@@ -2353,6 +2368,11 @@  void Client::send_cap(Inode *in, MetaSession *session, Cap *cap,
   in->ctime.encode_timeval(&m->head.ctime);
   m->head.time_warp_seq = in->time_warp_seq;
     
+  if (flush & CEPH_CAP_FILE_WR) {
+    m->inline_version = in->inline_version;
+    m->inline_data = in->inline_data;
+  }
+
   in->reported_size = in->size;
   m->set_snap_follows(follows);
   cap->wanted = want;
@@ -3482,7 +3502,9 @@  void Client::handle_cap_trunc(MetaSession *session, Inode *in, MClientCaps *m)
   issued |= implemented;
   update_inode_file_bits(in, m->get_truncate_seq(), m->get_truncate_size(),
                          m->get_size(), m->get_time_warp_seq(), m->get_ctime(),
-                         m->get_mtime(), m->get_atime(), issued);
+                         m->get_mtime(), m->get_atime(),
+                         m->inline_version, m->inline_data,
+                         issued);
   m->put();
 }
 
@@ -3589,7 +3611,8 @@  void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClient
     in->xattr_version = m->head.xattr_version;
   }
   update_inode_file_bits(in, m->get_truncate_seq(), m->get_truncate_size(), m->get_size(),
-			 m->get_time_warp_seq(), m->get_ctime(), m->get_mtime(), m->get_atime(), issued);
+			 m->get_time_warp_seq(), m->get_ctime(), m->get_mtime(), m->get_atime(),
+			 m->inline_version, m->inline_data, issued);
 
   // max_size
   if (cap == in->auth_cap &&
@@ -5643,6 +5666,57 @@  void Client::unlock_fh_pos(Fh *f)
   f->pos_locked = false;
 }
 
+int Client::migration_inline_data(Inode *in)
+{
+  ObjectOperation ops;
+  bufferlist inline_version_bl;
+  ::encode(in->inline_version, inline_version_bl);
+  ops.cmpxattr("inline_version",
+               CEPH_OSD_CMPXATTR_OP_GT,
+               CEPH_OSD_CMPXATTR_MODE_U64,
+               CEPH_OSD_OP_FLAG_NOENTOK,
+               inline_version_bl);
+  bufferlist inline_data = in->inline_data;
+  ops.write(0, inline_data, in->truncate_size, in->truncate_seq);
+  ops.setxattr("inline_version", inline_version_bl);
+
+  char oid_buf[32];
+  snprintf(oid_buf, sizeof(oid_buf), "%llx.00000000", (long long unsigned)in->ino);
+  object_t oid = oid_buf;
+
+  Mutex flock("Client::migration_inline_data flock");
+  Cond cond;
+  bool done = false;
+  int ret;
+  Context *oncommit = new C_SafeCond(&flock, &cond, &done, &ret);
+
+  objecter->mutate(oid,
+                   OSDMap::file_to_object_locator(in->layout),
+                   ops,
+                   in->snaprealm->get_snap_context(),
+                   ceph_clock_now(cct),
+                   0,
+                   NULL,
+                   oncommit);
+
+  client_lock.Unlock();
+  flock.Lock();
+  while (!done)
+    cond.Wait(flock);
+  flock.Unlock();
+  client_lock.Lock();
+
+  if (ret >= 0 || ret == -ECANCELED) {
+    in->inline_data.clear();
+    in->inline_version = CEPH_INLINE_DISABLED;
+    mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+    check_caps(in, false);
+
+    ret = 0;
+  }
+
+  return ret;
+}
 
 // 
 
@@ -5688,6 +5762,30 @@  int Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl)
     movepos = true;
   }
 
+  if (in->inline_version < CEPH_INLINE_DISABLED) {
+    if (!(have & CEPH_CAP_FILE_CACHE)) {
+      r = migration_inline_data(in);
+      if (r < 0)
+        goto done;
+    } else {
+      uint32_t len = in->inline_data.length();
+
+      uint64_t endoff = offset + size;
+      if (endoff > in->size)
+        endoff = in->size;
+
+      if (endoff > len) {
+        if (offset < len)
+          bl->substr_of(in->inline_data, offset, len - offset);
+        bl->append_zero(endoff - len);
+      } else if (endoff > (uint64_t)offset) {
+        bl->substr_of(in->inline_data, offset, endoff - offset);
+      }
+
+      goto success;
+    }
+  }
+
   if (!conf->client_debug_force_sync_read &&
       (cct->_conf->client_oc && (have & CEPH_CAP_FILE_CACHE))) {
 
@@ -5704,6 +5802,8 @@  int Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl)
     goto done;
   }
 
+success:
+
   if (movepos) {
     // adjust fd pos
     f->pos = offset+bl->length();
@@ -5995,6 +6095,29 @@  int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
 
   ldout(cct, 10) << " snaprealm " << *in->snaprealm << dendl;
 
+  if (in->inline_version < CEPH_INLINE_DISABLED) {
+    if (endoff > CEPH_INLINE_SIZE || !(have & CEPH_CAP_FILE_BUFFER)) {
+      r = migration_inline_data(in);
+      if (r < 0)
+        goto done;
+    } else {
+      uint32_t len = in->inline_data.length();
+
+      if (endoff < len)
+        in->inline_data.copy(endoff, len - endoff, bl);
+
+      if (offset < len)
+        in->inline_data.splice(offset, len - offset);
+      else if (offset > len)
+        in->inline_data.append_zero(offset - len);
+
+      in->inline_data.append(bl);
+      in->inline_version++;
+
+      goto success;
+    }
+  }
+
   if (cct->_conf->client_oc && (have & CEPH_CAP_FILE_BUFFER)) {
     // do buffered write
     if (!in->oset.dirty_or_tx)
@@ -6045,7 +6168,7 @@  int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
   }
 
   // if we get here, write was successful, update client metadata
-
+success:
   // time
   lat = ceph_clock_now(cct);
   lat -= start;
@@ -7719,33 +7842,60 @@  int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
     return r;
 
   if (mode & FALLOC_FL_PUNCH_HOLE) {
-    Mutex flock("Client::_punch_hole flock");
-    Cond cond;
-    bool done = false;
-    Context *onfinish = new C_SafeCond(&flock, &cond, &done);
-    Context *onsafe = new C_Client_SyncCommit(this, in);
+    if (in->inline_version < CEPH_INLINE_DISABLED &&
+        (have & CEPH_CAP_FILE_BUFFER)) {
+      bufferlist bl;
+      int len = in->inline_data.length();
+      if (offset < len) {
+        if (offset > 0)
+          in->inline_data.copy(0, offset, bl);
+        int size = length;
+        if (offset + size > len)
+          size = len - offset;
+        if (size > 0)
+          bl.append_zero(size);
+        if (offset + size < len)
+          in->inline_data.copy(offset + size, len - offset - size, bl);
+        in->inline_data = bl;
+        in->inline_version++;
+      }
+      in->mtime = ceph_clock_now(cct);
+      mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+    } else {
+      if (in->inline_version < CEPH_INLINE_DISABLED) {
+        r = migration_inline_data(in);
+        if (r < 0)
+          goto done;
+      }
 
-    unsafe_sync_write++;
-    get_cap_ref(in, CEPH_CAP_FILE_BUFFER);
+      Mutex flock("Client::_punch_hole flock");
+      Cond cond;
+      bool done = false;
+      Context *onfinish = new C_SafeCond(&flock, &cond, &done);
+      Context *onsafe = new C_Client_SyncCommit(this, in);
 
-    _invalidate_inode_cache(in, offset, length, true);
-    r = filer->zero(in->ino, &in->layout,
-                    in->snaprealm->get_snap_context(),
-                    offset, length,
-                    ceph_clock_now(cct),
-                    0, true, onfinish, onsafe);
-    if (r < 0)
-      goto done;
+      unsafe_sync_write++;
+      get_cap_ref(in, CEPH_CAP_FILE_BUFFER);
 
-    in->mtime = ceph_clock_now(cct);
-    mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+      _invalidate_inode_cache(in, offset, length, true);
+      r = filer->zero(in->ino, &in->layout,
+                      in->snaprealm->get_snap_context(),
+                      offset, length,
+                      ceph_clock_now(cct),
+                      0, true, onfinish, onsafe);
+      if (r < 0)
+        goto done;
 
-    client_lock.Unlock();
-    flock.Lock();
-    while (!done)
-      cond.Wait(flock);
-    flock.Unlock();
-    client_lock.Lock();
+      in->mtime = ceph_clock_now(cct);
+      mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+
+      client_lock.Unlock();
+      flock.Lock();
+      while (!done)
+        cond.Wait(flock);
+      flock.Unlock();
+      client_lock.Lock();
+    }
   } else if (!(mode & FALLOC_FL_KEEP_SIZE)) {
     uint64_t size = offset + length;
     if (size > in->size) {
diff --git a/src/client/Client.h b/src/client/Client.h
index c7c9cef..5fc05f4 100644
--- a/src/client/Client.h
+++ b/src/client/Client.h
@@ -420,6 +420,9 @@  protected:
 
   void handle_lease(MClientLease *m);
 
+  // inline data
+  int migration_inline_data(Inode *in);
+
   // file caps
   void check_cap_issue(Inode *in, Cap *cap, unsigned issued);
   void add_update_cap(Inode *in, MetaSession *session, uint64_t cap_id,
@@ -495,6 +498,7 @@  protected:
   void update_inode_file_bits(Inode *in,
 			      uint64_t truncate_seq, uint64_t truncate_size, uint64_t size,
 			      uint64_t time_warp_seq, utime_t ctime, utime_t mtime, utime_t atime,
+			      uint64_t inline_version, bufferlist& inline_data,
 			      int issued);
   Inode *add_update_inode(InodeStat *st, utime_t ttl, MetaSession *session);
   Dentry *insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dlease, 
diff --git a/src/client/Inode.h b/src/client/Inode.h
index cc054a6..bb17706 100644
--- a/src/client/Inode.h
+++ b/src/client/Inode.h
@@ -111,6 +111,10 @@  class Inode {
   version_t version;           // auth only
   version_t xattr_version;
 
+  // inline data
+  uint64_t   inline_version;
+  bufferlist inline_data;
+
   bool is_symlink() const { return (mode & S_IFMT) == S_IFLNK; }
   bool is_dir()     const { return (mode & S_IFMT) == S_IFDIR; }
   bool is_file()    const { return (mode & S_IFMT) == S_IFREG; }
@@ -207,6 +211,7 @@  class Inode {
       rdev(0), mode(0), uid(0), gid(0), nlink(0),
       size(0), truncate_seq(1), truncate_size(-1),
       time_warp_seq(0), max_size(0), version(0), xattr_version(0),
+      inline_version(0),
       flags(0),
       dir_hashed(false), dir_replicated(false), auth_cap(NULL),
       dirty_caps(0), flushing_caps(0), flushing_cap_seq(0), shared_gen(0), cache_gen(0),
diff --git a/src/include/ceph_features.h b/src/include/ceph_features.h
index c0f01cc..70ee921 100644
--- a/src/include/ceph_features.h
+++ b/src/include/ceph_features.h
@@ -40,6 +40,7 @@ 
 #define CEPH_FEATURE_MON_SCRUB      (1ULL<<33)
 #define CEPH_FEATURE_OSD_PACKED_RECOVERY (1ULL<<34)
 #define CEPH_FEATURE_OSD_CACHEPOOL (1ULL<<35)
+#define CEPH_FEATURE_MDS_INLINE_DATA     (1ULL<<36)
 
 /*
  * The introduction of CEPH_FEATURE_OSD_SNAPMAPPER caused the feature
@@ -103,6 +104,7 @@  static inline unsigned long long ceph_sanitize_features(unsigned long long f) {
 	 CEPH_FEATURE_MON_SCRUB	|	    \
 	 CEPH_FEATURE_OSD_PACKED_RECOVERY | \
 	 CEPH_FEATURE_OSD_CACHEPOOL | \
+	 CEPH_FEATURE_MDS_INLINE_DATA | \
 	 0ULL)
 
 #define CEPH_FEATURES_SUPPORTED_DEFAULT  CEPH_FEATURES_ALL
diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h
index 6c41d14..406b51e 100644
--- a/src/include/ceph_fs.h
+++ b/src/include/ceph_fs.h
@@ -522,6 +522,9 @@  struct ceph_filelock {
 
 int ceph_flags_to_mode(int flags);
 
+/* inline data state */
+#define CEPH_INLINE_DISABLED	((__u64)-1)
+#define CEPH_INLINE_SIZE	(1 << 12)
 
 /* capability bits */
 #define CEPH_CAP_PIN         1  /* no specific capabilities beyond the pin */
diff --git a/src/include/rados.h b/src/include/rados.h
index 178c171..c387a2e 100644
--- a/src/include/rados.h
+++ b/src/include/rados.h
@@ -342,6 +342,7 @@  enum {
 enum {
 	CEPH_OSD_OP_FLAG_EXCL = 1,      /* EXCL object create */
 	CEPH_OSD_OP_FLAG_FAILOK = 2,    /* continue despite failure */
+	CEPH_OSD_OP_FLAG_NOENTOK = 4,   /* ignore NOENT error */
 };
 
 #define EOLDSNAPC    85  /* ORDERSNAP flag set; writer has old snapc*/
diff --git a/src/mds/CInode.cc b/src/mds/CInode.cc
index 46f8d33..729f126 100644
--- a/src/mds/CInode.cc
+++ b/src/mds/CInode.cc
@@ -2825,6 +2825,16 @@  int CInode::encode_inodestat(bufferlist& bl, Session *session,
   e.files = i->dirstat.nfiles;
   e.subdirs = i->dirstat.nsubdirs;
 
+  // inline data
+  uint64_t inline_version = 0;
+  bufferlist inline_data;
+  if (!cap || (cap->client_inline_version < i->inline_version)) {
+    inline_version = i->inline_version;
+    inline_data = i->inline_data;
+    if (cap)
+      cap->client_inline_version = i->inline_version;
+  }
+
   // nest (do same as file... :/)
   i->rstat.rctime.encode_timeval(&e.rctime);
   e.rbytes = i->rstat.rbytes;
@@ -2863,6 +2873,7 @@  int CInode::encode_inodestat(bufferlist& bl, Session *session,
     bytes += (sizeof(__u32) + sizeof(__u32)) * dirfragtree._splits.size();
     bytes += sizeof(__u32) + symlink.length();
     bytes += sizeof(__u32) + xbl.length();
+    bytes += sizeof(__u64) + sizeof(__u32) + inline_data.length();
     if (bytes > max_bytes)
       return -ENOSPC;
   }
@@ -2958,6 +2969,10 @@  int CInode::encode_inodestat(bufferlist& bl, Session *session,
     ::encode(i->dir_layout, bl);
   }
   ::encode(xbl, bl);
+  if (session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) {
+    ::encode(inline_version, bl);
+    ::encode(inline_data, bl);
+  }
 
   return valid;
 }
@@ -2990,6 +3005,13 @@  void CInode::encode_cap_message(MClientCaps *m, Capability *cap)
   i->atime.encode_timeval(&m->head.atime);
   m->head.time_warp_seq = i->time_warp_seq;
 
+  if (cap->client_inline_version < i->inline_version) {
+    m->inline_version = cap->client_inline_version = i->inline_version;
+    m->inline_data = i->inline_data;
+  } else {
+    m->inline_version = 0;
+  }
+
   // max_size is min of projected, actual.
   uint64_t oldms = oi->client_ranges.count(client) ? oi->client_ranges[client].range.last : 0;
   uint64_t newms = pi->client_ranges.count(client) ? pi->client_ranges[client].range.last : 0;
diff --git a/src/mds/Capability.h b/src/mds/Capability.h
index fb6b3dc..995ea3a 100644
--- a/src/mds/Capability.h
+++ b/src/mds/Capability.h
@@ -209,6 +209,7 @@  private:
 public:
   snapid_t client_follows;
   version_t client_xattr_version;
+  uint64_t client_inline_version;
   
   xlist<Capability*>::item item_session_caps;
   xlist<Capability*>::item item_snaprealm_caps;
@@ -223,6 +224,7 @@  public:
     mseq(0),
     suppress(0), stale(false),
     client_follows(0), client_xattr_version(0),
+    client_inline_version(0),
     item_session_caps(this), item_snaprealm_caps(this) {
     g_num_cap++;
     g_num_capa++;
diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc
index 99bd761..4f1d322 100644
--- a/src/mds/Locker.cc
+++ b/src/mds/Locker.cc
@@ -2686,6 +2686,7 @@  void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *
     utime_t mtime = m->get_mtime();
     utime_t ctime = m->get_ctime();
     uint64_t size = m->get_size();
+    uint64_t inline_version = m->inline_version;
     
     if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
 	((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
@@ -2705,6 +2706,12 @@  void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *
       pi->size = size;
       pi->rstat.rbytes = size;
     }
+    if (in->inode.is_file() &&
+        (dirty & CEPH_CAP_FILE_WR) &&
+        inline_version > pi->inline_version) {
+      pi->inline_version = inline_version;
+      pi->inline_data = m->inline_data;
+    }
     if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
       dout(7) << "  atime " << pi->atime << " -> " << atime
 	      << " for " << *in << dendl;
diff --git a/src/mds/mdstypes.cc b/src/mds/mdstypes.cc
index 6886786..8634adf 100644
--- a/src/mds/mdstypes.cc
+++ b/src/mds/mdstypes.cc
@@ -204,7 +204,7 @@  ostream& operator<<(ostream& out, const client_writeable_range_t& r)
  */
 void inode_t::encode(bufferlist &bl) const
 {
-  ENCODE_START(7, 6, bl);
+  ENCODE_START(8, 8, bl);
 
   ::encode(ino, bl);
   ::encode(rdev, bl);
@@ -227,6 +227,8 @@  void inode_t::encode(bufferlist &bl) const
   ::encode(mtime, bl);
   ::encode(atime, bl);
   ::encode(time_warp_seq, bl);
+  ::encode(inline_version, bl);
+  ::encode(inline_data, bl);
   ::encode(client_ranges, bl);
 
   ::encode(dirstat, bl);
@@ -244,7 +246,7 @@  void inode_t::encode(bufferlist &bl) const
 
 void inode_t::decode(bufferlist::iterator &p)
 {
-  DECODE_START_LEGACY_COMPAT_LEN(7, 6, 6, p);
+  DECODE_START_LEGACY_COMPAT_LEN(8, 6, 6, p);
 
   ::decode(ino, p);
   ::decode(rdev, p);
@@ -273,6 +275,12 @@  void inode_t::decode(bufferlist::iterator &p)
   ::decode(mtime, p);
   ::decode(atime, p);
   ::decode(time_warp_seq, p);
+  if (struct_v >= 8) {
+    ::decode(inline_version, p);
+    ::decode(inline_data, p);
+  } else {
+    inline_version = CEPH_INLINE_DISABLED;
+  }
   if (struct_v >= 3) {
     ::decode(client_ranges, p);
   } else {
diff --git a/src/mds/mdstypes.h b/src/mds/mdstypes.h
index 902e310..928167c 100644
--- a/src/mds/mdstypes.h
+++ b/src/mds/mdstypes.h
@@ -335,6 +335,8 @@  struct inode_t {
   utime_t    mtime;   // file data modify time.
   utime_t    atime;   // file data access time.
   uint32_t   time_warp_seq;  // count of (potential) mtime/atime timewarps (i.e., utimes())
+  bufferlist inline_data;
+  uint64_t   inline_version;
 
   map<client_t,client_writeable_range_t> client_ranges;  // client(s) can write to these ranges
 
@@ -356,6 +358,7 @@  struct inode_t {
 	      size(0), truncate_seq(0), truncate_size(0), truncate_from(0),
 	      truncate_pending(0),
 	      time_warp_seq(0),
+	      inline_version(1),
 	      version(0), file_data_version(0), xattr_version(0), backtrace_version(0) {
     clear_layout();
     memset(&dir_layout, 0, sizeof(dir_layout));
diff --git a/src/messages/MClientCaps.h b/src/messages/MClientCaps.h
index 117f241..260d714 100644
--- a/src/messages/MClientCaps.h
+++ b/src/messages/MClientCaps.h
@@ -21,7 +21,7 @@ 
 
 class MClientCaps : public Message {
 
-  static const int HEAD_VERSION = 2;   // added flock metadata
+  static const int HEAD_VERSION = 3;   // added flock metadata, inline data
   static const int COMPAT_VERSION = 1;
 
  public:
@@ -29,6 +29,8 @@  class MClientCaps : public Message {
   bufferlist snapbl;
   bufferlist xattrbl;
   bufferlist flockbl;
+  uint64_t   inline_version;
+  bufferlist inline_data;
 
   int      get_caps() { return head.caps; }
   int      get_wanted() { return head.wanted; }
@@ -148,6 +150,13 @@  public:
     if (head.xattr_len)
       xattrbl = middle;
 
+    if (header.version >= 3) {
+      ::decode(inline_version, p);
+      ::decode(inline_data, p);
+    } else {
+      inline_version = CEPH_INLINE_DISABLED;
+    }
+
     // conditionally decode flock metadata
     if (header.version >= 2)
       ::decode(flockbl, p);
@@ -160,6 +169,13 @@  public:
 
     middle = xattrbl;
 
+    if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
+      ::encode(inline_version, payload);
+      ::encode(inline_data, payload);
+    } else {
+      header.version = 2;
+    }
+
     // conditionally include flock metadata
     if (features & CEPH_FEATURE_FLOCK) {
       ::encode(flockbl, payload);
diff --git a/src/messages/MClientReply.h b/src/messages/MClientReply.h
index 896245f..a8e83c2 100644
--- a/src/messages/MClientReply.h
+++ b/src/messages/MClientReply.h
@@ -108,6 +108,8 @@  struct InodeStat {
   uint64_t truncate_size;
   utime_t ctime, mtime, atime;
   version_t time_warp_seq;
+  bufferlist inline_data;
+  uint64_t inline_version;
 
   frag_info_t dirstat;
   nest_info_t rstat;
@@ -174,6 +176,13 @@  struct InodeStat {
 
     xattr_version = e.xattr_version;
     ::decode(xattrbl, p);
+
+    if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
+      ::decode(inline_version, p);
+      ::decode(inline_data, p);
+    } else {
+      inline_version = CEPH_INLINE_DISABLED;
+    }
   }
   
   // see CInode::encode_inodestat for encoder.
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index b391e17..30f7d01 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -2398,8 +2398,11 @@  int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 	  result = osd->store->getattr(coll, soid, name.c_str(), xattr);
 	else
 	  result = osd->store->getattr(coll, src_obc->obs.oi.soid, name.c_str(), xattr);
-	if (result < 0 && result != -EEXIST && result != -ENODATA)
+       int flags = le32_to_cpu(op.flags);
+	if (result < 0 && result != -EEXIST && result != -ENODATA &&
+	    (!(flags & CEPH_OSD_OP_FLAG_NOENTOK) || result != -ENOENT)) {
 	  break;
+	}
 	
 	ctx->delta_stats.num_rd++;
 	ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(xattr.length(), 10);
diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h
index 154ee41..230745b 100644
--- a/src/osdc/Objecter.h
+++ b/src/osdc/Objecter.h
@@ -112,9 +112,10 @@  struct ObjectOperation {
       osd_op.indata.append(name);
     osd_op.indata.append(data);
   }
-  void add_xattr_cmp(int op, const char *name, uint8_t cmp_op, uint8_t cmp_mode, const bufferlist& data) {
+  void add_xattr_cmp(int op, const char *name, uint8_t cmp_op, uint8_t cmp_mode, uint32_t flags, const bufferlist& data) {
     OSDOp& osd_op = add_op(op);
     osd_op.op.op = op;
+    osd_op.op.flags = flags;
     osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
     osd_op.op.xattr.value_len = data.length();
     osd_op.op.xattr.cmp_op = cmp_op;
@@ -279,8 +280,16 @@  struct ObjectOperation {
     out_handler[p] = h;
     out_rval[p] = prval;
   }
-  void write(uint64_t off, bufferlist& bl) {
+  void write(uint64_t off, bufferlist& bl,
+             uint64_t truncate_size,
+             uint32_t truncate_seq) {
     add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl);
+    OSDOp& o = *ops.rbegin();
+    o.op.extent.truncate_size = truncate_size;
+    o.op.extent.truncate_seq = truncate_seq;
+  }
+  void write(uint64_t off, bufferlist& bl) {
+    write(off, bl, 0, 0);
   }
   void write_full(bufferlist& bl) {
     add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl);
@@ -453,7 +462,10 @@  struct ObjectOperation {
     add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
   }
   void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode, const bufferlist& bl) {
-    add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl);
+    add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, 0, bl);
+  }
+  void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode, uint32_t flags, const bufferlist& bl) {
+    add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, flags, bl);
   }
   void rmxattr(const char *name) {
     bufferlist bl;
@@ -733,11 +745,12 @@  struct ObjectOperation {
   }
 
   void cmpxattr(const char *name, const bufferlist& val,
-		int op, int mode) {
+		int op, int mode, int flags = 0) {
     add_xattr(CEPH_OSD_OP_CMPXATTR, name, val);
     OSDOp& o = *ops.rbegin();
     o.op.xattr.cmp_op = op;
     o.op.xattr.cmp_mode = mode;
+    o.op.flags = flags;
   }
   void src_cmpxattr(const object_t& srcoid, snapid_t srcsnapid,
 		    const char *name, const bufferlist& val,