@@ -4088,6 +4088,7 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
rbd_dev->layout.stripe_count = 1;
rbd_dev->layout.object_size = 1 << RBD_MAX_OBJ_ORDER;
rbd_dev->layout.pool_id = spec->pool_id;
+ rbd_dev->layout.pool_ns = NULL;
/*
* If this is a mapping rbd_dev (as opposed to a parent one),
@@ -396,6 +396,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_symlink = NULL;
memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
+ memset(&ci->i_layout, 0, sizeof(ci->i_layout));
ci->i_fragtree = RB_ROOT;
mutex_init(&ci->i_fragtree_mutex);
@@ -518,6 +519,8 @@ void ceph_destroy_inode(struct inode *inode)
if (ci->i_xattrs.prealloc_blob)
ceph_buffer_put(ci->i_xattrs.prealloc_blob);
+ ceph_put_pool_ns(ci->i_layout.pool_ns);
+
call_rcu(&inode->i_rcu, ceph_i_callback);
}
@@ -53,6 +53,7 @@ struct ceph_file_layout_legacy {
__le32 fl_pg_pool; /* namespace, crush ruleset, rep level */
} __attribute__ ((packed));
+struct ceph_pool_ns;
/*
* ceph_file_layout - describe data layout for a file/inode
*/
@@ -62,6 +63,7 @@ struct ceph_file_layout {
u32 stripe_count; /* over this many objects */
u32 object_size; /* until objects are this big */
s64 pool_id; /* rados pool id */
+ struct ceph_pool_ns *pool_ns; /* rados pool namespace */
};
extern int ceph_file_layout_is_valid(const struct ceph_file_layout *layout);
@@ -55,6 +55,7 @@ static inline bool ceph_can_shift_osds(struct ceph_pg_pool_info *pool)
struct ceph_object_locator {
s64 pool;
+ struct ceph_pool_ns *pool_ns;
};
/*
@@ -63,6 +64,7 @@ struct ceph_object_locator {
* (probably outdated: must be >= RBD_MAX_MD_NAME_LEN -- currently 100)
*/
#define CEPH_MAX_OID_NAME_LEN 100
+#define CEPH_MAX_NAMESPACE_LEN 100
struct ceph_object_id {
char name[CEPH_MAX_OID_NAME_LEN];
@@ -36,6 +36,7 @@ void ceph_file_layout_from_legacy(struct ceph_file_layout *fl,
fl->pool_id = le64_to_cpu(legacy->fl_pg_pool);
if (fl->pool_id == 0)
fl->pool_id = -1;
+ fl->pool_ns = NULL;
}
EXPORT_SYMBOL(ceph_file_layout_from_legacy);
@@ -339,6 +339,9 @@ static void ceph_osdc_release_request(struct kref *kref)
kfree(req->r_ops);
ceph_put_snap_context(req->r_snapc);
+ ceph_put_pool_ns(req->r_base_oloc.pool_ns);
+ ceph_put_pool_ns(req->r_target_oloc.pool_ns);
+
if (req->r_mempool)
mempool_free(req, req->r_osdc->req_mempool);
else
@@ -388,6 +391,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
req->r_num_ops = 0;
req->r_max_ops = num_ops;
+ req->r_base_oloc.pool = -1;
+ req->r_target_oloc.pool = -1;
+
if (num_ops <= CEPH_OSD_INITIAL_OP) {
req->r_ops = req->r_inline_ops;
} else {
@@ -409,9 +415,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
INIT_LIST_HEAD(&req->r_req_lru_item);
INIT_LIST_HEAD(&req->r_osd_item);
- req->r_base_oloc.pool = -1;
- req->r_target_oloc.pool = -1;
-
/* create reply message */
msg_size = OSD_OPREPLY_FRONT_LEN;
if (num_ops > CEPH_OSD_INITIAL_OP) {
@@ -433,7 +436,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
/* create request message; allow space for oid */
msg_size = 4 + 4 + 8 + 8 + 4 + 8;
- msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
+ msg_size += 2 + 4 + 8 + 4 + 4 + 4 + CEPH_MAX_NAMESPACE_LEN; /* oloc */
msg_size += 1 + 8 + 4 + 4; /* pg_t */
msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
msg_size += 2 + num_ops * sizeof(struct ceph_osd_op);
@@ -864,6 +867,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
}
req->r_base_oloc.pool = layout->pool_id;
+ req->r_base_oloc.pool_ns = ceph_try_get_pool_ns(&layout->pool_ns);
snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name),
"%llx.%08llx", vino.ino, objnum);
@@ -1407,6 +1411,8 @@ static int __calc_request_pg(struct ceph_osdmap *osdmap,
need_check_tiering = false;
if (req->r_target_oloc.pool == -1) {
req->r_target_oloc = req->r_base_oloc; /* struct */
+ if (req->r_target_oloc.pool_ns)
+ ceph_get_pool_ns(req->r_target_oloc.pool_ns);
need_check_tiering = true;
}
if (req->r_target_oid.name_len == 0) {
@@ -1719,10 +1725,10 @@ static int ceph_oloc_decode(void **p, void *end,
}
if (struct_v >= 5) {
- len = ceph_decode_32(p);
- if (len > 0) {
- pr_warn("ceph_object_locator::nspace is set\n");
- goto e_inval;
+ u32 ns_len = ceph_decode_32(p);
+ if (ns_len > 0) {
+ ceph_decode_need(p, end, ns_len, e_inval);
+ *p += ns_len;
}
}
@@ -1907,7 +1913,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
__unregister_request(osdc, req);
- req->r_target_oloc = redir.oloc; /* struct */
+ req->r_target_oloc.pool = redir.oloc.pool;
/*
* Start redirect requests with nofail=true. If
@@ -2459,6 +2465,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
struct timespec *mtime)
{
struct ceph_msg *msg = req->r_request;
+ struct ceph_pool_ns *pool_ns;
void *p;
size_t msg_size;
int flags = req->r_flags;
@@ -2483,14 +2490,27 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
req->r_request_reassert_version = p;
p += sizeof(struct ceph_eversion); /* will get filled in */
+ if (req->r_target_oloc.pool_ns)
+ pool_ns = req->r_target_oloc.pool_ns;
+ else if (req->r_base_oloc.pool_ns)
+ pool_ns = req->r_base_oloc.pool_ns;
+ else
+ pool_ns = NULL;
+
/* oloc */
+ ceph_encode_8(&p, 5);
ceph_encode_8(&p, 4);
- ceph_encode_8(&p, 4);
- ceph_encode_32(&p, 8 + 4 + 4);
+ ceph_encode_32(&p, 8 + 4 + 4 + 4 + (pool_ns ? pool_ns->name_len : 0));
req->r_request_pool = p;
p += 8;
ceph_encode_32(&p, -1); /* preferred */
ceph_encode_32(&p, 0); /* key len */
+ if (pool_ns) {
+ ceph_encode_32(&p, pool_ns->name_len);
+ ceph_encode_copy(&p, pool_ns->name, pool_ns->name_len);
+ } else {
+ ceph_encode_32(&p, 0);
+ }
ceph_encode_8(&p, 1);
req->r_request_pgid = p;
@@ -1470,12 +1470,33 @@ int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap,
if (!pi)
return -EIO;
- pg_out->pool = oloc->pool;
- pg_out->seed = ceph_str_hash(pi->object_hash, oid->name,
- oid->name_len);
-
- dout("%s '%.*s' pgid %llu.%x\n", __func__, oid->name_len, oid->name,
- pg_out->pool, pg_out->seed);
+ if (!oloc->pool_ns) {
+ pg_out->pool = oloc->pool;
+ pg_out->seed = ceph_str_hash(pi->object_hash, oid->name,
+ oid->name_len);
+ dout("%s '%.*s' pgid %llu.%x\n", __func__,
+ oid->name_len, oid->name, pg_out->pool, pg_out->seed);
+ } else {
+ char stack_buf[256];
+ char *buf = stack_buf;
+ int nsl = oloc->pool_ns->name_len;
+ size_t total = nsl + 1 + oid->name_len;
+ if (total > sizeof(stack_buf)) {
+ buf = kmalloc(total, GFP_NOFS);
+ if (!buf)
+ return -ENOMEM;
+ }
+ memcpy(buf, oloc->pool_ns->name, nsl);
+ buf[nsl] = '\037';
+ memcpy(buf + nsl + 1, oid->name, oid->name_len);
+ pg_out->pool = oloc->pool;
+ pg_out->seed = ceph_str_hash(pi->object_hash, buf, total);
+ if (buf != stack_buf)
+ kfree(buf);
+ dout("%s '%.*s' ns '%.*s' pgid %llu.%x\n", __func__,
+ oid->name_len, oid->name, nsl, oloc->pool_ns->name,
+ pg_out->pool, pg_out->seed);
+ }
return 0;
}
EXPORT_SYMBOL(ceph_oloc_oid_to_pg);
Signed-off-by: Yan, Zheng <zyan@redhat.com> --- drivers/block/rbd.c | 1 + fs/ceph/inode.c | 3 +++ include/linux/ceph/ceph_fs.h | 2 ++ include/linux/ceph/osdmap.h | 2 ++ net/ceph/ceph_fs.c | 1 + net/ceph/osd_client.c | 42 +++++++++++++++++++++++++++++++----------- net/ceph/osdmap.c | 33 +++++++++++++++++++++++++++------ 7 files changed, 67 insertions(+), 17 deletions(-)