diff mbox series

[3/4] libceph: introduce generic journaling

Message ID 1534399172-27610-4-git-send-email-dongsheng.yang@easystack.cn (mailing list archive)
State New, archived
Headers show
Series [1/4] libceph: support op append | expand

Commit Message

Dongsheng Yang Aug. 16, 2018, 5:59 a.m. UTC
This is the generic journaling in ceph kernel client.

There are three works this module has to do:
(1) journal recording:
	generic journaling module provide an api named as ceph_journaler_append().
    this function is used to append event entries to journaling.

(2) journal replaying:
	When we are going to make sure the data and journal are consistent in opening
    journal, we can call the api of start_replay() to replay the all events recorded
    but not committed.

(3) journal trimming:
	This is transparent to upper layer, when some old journal entries is not needed,
    we need to trim the releated objects

Signed-off-by: Dongsheng Yang <dongsheng.yang@easystack.cn>
---
 include/linux/ceph/journaler.h |  131 +++++
 net/ceph/Makefile              |    3 +-
 net/ceph/journaler.c           | 1208 ++++++++++++++++++++++++++++++++++++++++
 3 files changed, 1341 insertions(+), 1 deletion(-)
 create mode 100644 include/linux/ceph/journaler.h
 create mode 100644 net/ceph/journaler.c
diff mbox series

Patch

diff --git a/include/linux/ceph/journaler.h b/include/linux/ceph/journaler.h
new file mode 100644
index 0000000..28dfda4
--- /dev/null
+++ b/include/linux/ceph/journaler.h
@@ -0,0 +1,131 @@ 
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef _FS_CEPH_JOURNAL_H
+#define _FS_CEPH_JOURNAL_H
+
+#include <linux/bitrev.h>
+#include <linux/completion.h>
+#include <linux/kref.h>
+#include <linux/mempool.h>
+#include <linux/rbtree.h>
+#include <linux/refcount.h>
+
+#include <linux/ceph/types.h>
+#include <linux/ceph/osdmap.h>
+#include <linux/ceph/messenger.h>
+#include <linux/ceph/msgpool.h>
+#include <linux/ceph/auth.h>
+#include <linux/ceph/pagelist.h>
+#include <linux/ceph/cls_journaler_client.h>
+
+struct ceph_msg;
+struct ceph_snap_context;
+struct ceph_osd_request;
+struct ceph_osd_client;
+
+#define JOURNAL_HEADER_PREFIX	"journal."
+#define JOURNAL_OBJECT_PREFIX	"journal_data."
+
+#define LOCAL_MIRROR_UUID	""
+
+static const uint32_t HEADER_FIXED_SIZE = 25; /// preamble, version, entry tid, tag id
+static const uint32_t REMAINDER_FIXED_SIZE = 8; /// data size, crc
+
+static const uint64_t PREAMBLE = 0x3141592653589793;
+
+struct ceph_journaler_future {
+	uint64_t tag_tid;
+	uint64_t entry_tid;
+	uint64_t commit_tid;
+};
+
+struct ceph_journaler_entry {
+	uint64_t tag_tid;
+	uint64_t entry_tid;
+	ssize_t data_len;
+	char *data;
+
+	struct list_head node;
+};
+
+struct ceph_journaler_entry *ceph_journaler_entry_decode(void **p, void *end);
+void ceph_journaler_entry_encode(struct ceph_journaler_entry *entry, void **p, void *end);
+
+struct entry_tid {
+	struct list_head	node;
+	uint64_t tag_tid;
+	uint64_t entry_tid;
+};
+
+struct commit_entry {
+	struct rb_node	r_node;
+	uint64_t commit_tid;
+	uint64_t object_num;
+	uint64_t tag_tid;
+	uint64_t entry_tid;
+	bool committed;
+};
+
+struct ceph_journaler {
+	struct ceph_osd_client *osdc;
+	struct ceph_object_locator data_oloc;
+
+	struct ceph_object_id header_oid;
+	struct ceph_object_locator header_oloc;
+
+	char *object_oid_prefix;
+
+	struct mutex mutex;
+	spinlock_t		meta_lock;
+	spinlock_t		entry_tid_lock;
+	spinlock_t		commit_lock;
+	uint8_t order;
+	uint8_t splay_width;
+	int64_t pool_id;
+
+	uint64_t commit_tid;
+
+	uint64_t minimum_set;
+	uint64_t active_set;
+
+	struct list_head	clients;
+	struct ceph_journaler_client *client;
+	struct ceph_journaler_client *clients_array;
+	struct list_head	object_positions_pending;
+
+	struct rb_root		commit_entries;
+
+	struct list_head	entry_tids;
+
+	struct workqueue_struct	*task_wq;
+	struct work_struct	notify_update_work;
+	struct work_struct	commit_work;
+
+	uint64_t active_tag_tid;
+	bool commit_pos_valid;
+	struct ceph_journaler_object_pos *commit_pos;
+	uint64_t splay_offset;
+
+	int (*handle_entry)(void *entry_handler, struct ceph_journaler_entry *entry);
+	void *entry_handler;
+
+	struct ceph_osd_linger_request *watch_handle;
+};
+
+struct ceph_journaler *ceph_journaler_create(struct ceph_osd_client *osdc,
+					     const char *journal_id,
+					     struct ceph_object_locator*_oloc);
+
+void ceph_journaler_destroy(struct ceph_journaler *journal);
+
+int ceph_journaler_open(struct ceph_journaler *journal);
+void ceph_journaler_close(struct ceph_journaler *journal);
+
+void start_replay(struct ceph_journaler *journaler);
+
+int ceph_journaler_append(struct ceph_journaler *journaler, uint64_t tag_tid, char *data, ssize_t data_len, struct ceph_journaler_future **future);
+void ceph_journaler_client_committed(struct ceph_journaler *journaler, uint64_t commit_tid);
+
+int ceph_journaler_allocate_tag(struct ceph_journaler *journaler, uint64_t tag_class, void *buf, uint32_t buf_len, struct ceph_journaler_tag *tag);
+
+int ceph_journaler_get_cached_client(struct ceph_journaler *journaler, char *client_id, uint32_t id_len, struct ceph_journaler_client *client_result);
+#endif
diff --git a/net/ceph/Makefile b/net/ceph/Makefile
index 12bf497..0572f20 100644
--- a/net/ceph/Makefile
+++ b/net/ceph/Makefile
@@ -14,5 +14,6 @@  libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
 	crypto.o armor.o \
 	auth_x.o \
 	ceph_fs.o ceph_strings.o ceph_hash.o \
-	pagevec.o snapshot.o string_table.o
+	pagevec.o snapshot.o string_table.o \
+	journaler.o cls_journaler_client.o
 
diff --git a/net/ceph/journaler.c b/net/ceph/journaler.c
new file mode 100644
index 0000000..3876ed2
--- /dev/null
+++ b/net/ceph/journaler.c
@@ -0,0 +1,1208 @@ 
+// SPDX-License-Identifier: GPL-2.0
+
+#include <linux/ceph/ceph_debug.h>
+
+#include <linux/crc32c.h>
+#include <linux/module.h>
+#include <linux/err.h>
+#include <linux/highmem.h>
+#include <linux/mm.h>
+#include <linux/pagemap.h>
+#include <linux/slab.h>
+#include <linux/uaccess.h>
+#ifdef CONFIG_BLOCK
+#include <linux/bio.h>
+#endif
+
+#include <linux/ceph/ceph_features.h>
+#include <linux/ceph/libceph.h>
+#include <linux/ceph/osd_client.h>
+#include <linux/ceph/journaler.h>
+#include <linux/ceph/cls_journaler_client.h>
+#include <linux/ceph/messenger.h>
+#include <linux/ceph/decode.h>
+#include <linux/ceph/auth.h>
+#include <linux/ceph/pagelist.h>
+#include <linux/ceph/striper.h>
+
+#define LOCAL_MIRROR_UUID	""
+
+static char *object_oid_prefix(int pool_id, const char *journal_id)
+{
+	char *prefix = NULL;
+	ssize_t len = snprintf(NULL, 0, "%s%d.%s.", JOURNAL_OBJECT_PREFIX, pool_id, journal_id);
+
+	prefix = kzalloc(len + 1, GFP_KERNEL);
+
+	if (!prefix)
+		return prefix;
+
+	WARN_ON(snprintf(prefix, len + 1, "%s%d.%s.", JOURNAL_OBJECT_PREFIX, pool_id, journal_id) != len);
+
+	return prefix;
+}
+
+static void notify_update(struct ceph_journaler* journaler);
+static void journaler_notify_update(struct work_struct *work)
+{
+	struct ceph_journaler *journaler = container_of(work, struct ceph_journaler,
+						  notify_update_work);
+
+	notify_update(journaler);
+}
+
+static int copy_object_pos(struct ceph_journaler_object_pos *pos, struct ceph_journaler_object_pos **new_pos)
+{
+	struct ceph_journaler_object_pos *temp_pos;
+
+	temp_pos = kzalloc(sizeof(*temp_pos), GFP_KERNEL);
+	if (temp_pos == NULL) {
+		return -ENOMEM;
+	}
+	INIT_LIST_HEAD(&temp_pos->node);
+	temp_pos->object_num = pos->object_num;
+	temp_pos->tag_tid = pos->tag_tid;
+	temp_pos->entry_tid = pos->entry_tid;
+
+	*new_pos = temp_pos;
+
+	return 0;
+}
+
+static void journaler_client_commit(struct work_struct *work)
+{
+	struct ceph_journaler *journaler = container_of(work, struct ceph_journaler,
+							commit_work);
+
+	struct list_head object_positions;
+	struct ceph_journaler_object_pos *pos = NULL, *next = NULL;
+	int ret = 0;
+
+	INIT_LIST_HEAD(&object_positions);
+	spin_lock(&journaler->commit_lock);
+	list_for_each_entry_safe(pos, next, &journaler->object_positions_pending, node) {
+		struct ceph_journaler_object_pos *new_pos = NULL;
+
+		ret = copy_object_pos(pos, &new_pos);
+		list_add_tail(&new_pos->node, &object_positions);
+	}
+	spin_unlock(&journaler->commit_lock);
+
+	ret = ceph_cls_journaler_client_committed(journaler->osdc, &journaler->header_oid, &journaler->header_oloc, journaler->client, &object_positions);
+
+	list_for_each_entry_safe(pos, next, &object_positions, node) {
+		list_del(&pos->node);
+		kfree(pos);
+	}
+
+	queue_work(journaler->task_wq, &journaler->notify_update_work);
+	return;
+}
+
+
+struct ceph_journaler *ceph_journaler_create(struct ceph_osd_client *osdc,
+					     const char *journal_id,
+					     struct ceph_object_locator *oloc)
+{
+	struct ceph_journaler *journaler = NULL;
+	int ret = 0;
+
+	journaler = kzalloc(sizeof(struct ceph_journaler), GFP_KERNEL);
+	if (!journaler)
+		return NULL;
+
+	ceph_oid_init(&journaler->header_oid);
+	ret = ceph_oid_aprintf(&journaler->header_oid, GFP_KERNEL, "%s%s",
+				JOURNAL_HEADER_PREFIX, journal_id);
+	if (ret) {
+		pr_err("aprintf error : %d", ret);
+		goto err_free_journaler;
+	}
+
+	journaler->osdc = osdc;
+	ceph_oloc_init(&journaler->header_oloc);
+	ceph_oloc_copy(&journaler->header_oloc, oloc);
+	ceph_oloc_init(&journaler->data_oloc);
+	INIT_LIST_HEAD(&journaler->clients);
+	INIT_LIST_HEAD(&journaler->entry_tids);
+	INIT_LIST_HEAD(&journaler->object_positions_pending);
+	journaler->commit_entries = RB_ROOT;
+	journaler->object_oid_prefix = object_oid_prefix(journaler->header_oloc.pool, journal_id);
+	journaler->commit_tid = 0;
+	journaler->client = NULL;
+	journaler->clients_array = NULL;
+
+	spin_lock_init(&journaler->meta_lock);
+	spin_lock_init(&journaler->entry_tid_lock);
+	spin_lock_init(&journaler->commit_lock);
+	mutex_init(&journaler->mutex);
+	INIT_WORK(&journaler->notify_update_work, journaler_notify_update);
+	INIT_WORK(&journaler->commit_work, journaler_client_commit);
+
+	journaler->task_wq = alloc_ordered_workqueue("journaler-tasks", WQ_MEM_RECLAIM);
+	if (!journaler->task_wq)
+		goto err_free_oid_prefix;
+
+	return journaler;
+
+err_free_oid_prefix:
+	kfree(journaler->object_oid_prefix);
+	ceph_oid_destroy(&journaler->header_oid);
+	ceph_oloc_destroy(&journaler->header_oloc);
+	ceph_oloc_destroy(&journaler->data_oloc);
+err_free_journaler:
+	kfree(journaler);
+	return NULL;
+}
+EXPORT_SYMBOL(ceph_journaler_create);
+
+void ceph_journaler_destroy(struct ceph_journaler *journaler)
+{
+	destroy_workqueue(journaler->task_wq);
+	kfree(journaler->object_oid_prefix);
+	ceph_oid_destroy(&journaler->header_oid);
+	ceph_oloc_destroy(&journaler->header_oloc);
+	ceph_oloc_destroy(&journaler->data_oloc);
+	kfree(journaler);
+}
+EXPORT_SYMBOL(ceph_journaler_destroy);
+
+static void notify_update(struct ceph_journaler* journaler)
+{
+	int ret;
+
+	ret = ceph_osdc_notify(journaler->osdc, &journaler->header_oid,
+				&journaler->header_oloc, NULL, 0,
+				5000, NULL, NULL);
+
+	if (ret)
+		pr_err("notify_update failed: %d", ret);
+}
+
+static int set_minimum_set(struct ceph_journaler* journaler, uint64_t minimum_set)
+{
+	int ret = 0;
+
+	ret = ceph_cls_journaler_set_minimum_set(journaler->osdc, &journaler->header_oid, &journaler->header_oloc, minimum_set);
+	if (ret < 0) {
+		pr_err("%s: failed to set_minimum_set: %d", __func__, ret);
+		return ret;
+	}
+
+	queue_work(journaler->task_wq, &journaler->notify_update_work);
+
+	return ret;
+}
+
+static int ceph_journaler_obj_remove_sync(struct ceph_journaler *journaler,
+			     struct ceph_object_id *oid,
+			     struct ceph_object_locator *oloc);
+static int remove_set(struct ceph_journaler *journaler, uint64_t object_set)
+{
+	uint64_t object_num = 0;
+	int splay_offset = 0;
+	struct ceph_object_id object_oid;
+	int ret = 0;
+
+	ceph_oid_init(&object_oid);
+	for (splay_offset = 0; splay_offset < journaler->splay_width; splay_offset++) {
+		object_num = splay_offset + (object_set * journaler->splay_width);
+		if (!ceph_oid_empty(&object_oid)) {
+			ceph_oid_destroy(&object_oid);
+			ceph_oid_init(&object_oid);
+		}
+		ret = ceph_oid_aprintf(&object_oid, GFP_KERNEL, "%s%llu",
+					journaler->object_oid_prefix, object_num);
+		if (ret) {
+			pr_err("aprintf error : %d", ret);
+			return ret;
+		}
+		ret = ceph_journaler_obj_remove_sync(journaler, &object_oid, &journaler->data_oloc);
+		if (ret < 0) {
+			pr_err("%s: failed to remove object: %llu", __func__, object_num);
+			return ret;
+		}
+	}
+
+	return ret;
+}
+
+static void destroy_client(struct ceph_journaler_client *client);
+static int refresh(struct ceph_journaler *journaler)
+{
+	int ret = 0;
+	int i = 0;
+	uint32_t client_num = 0;
+	struct ceph_journaler_client *clients = NULL;
+	struct ceph_journaler_client *client, *next;
+	uint64_t minimum_commit_set;
+	uint64_t minimum_set;
+
+	ret = ceph_cls_journaler_get_mutable_metas(journaler->osdc, &journaler->header_oid, &journaler->header_oloc, &journaler->minimum_set, &journaler->active_set);
+	if (ret)
+		return ret;
+
+	ret = ceph_cls_journaler_client_list(journaler->osdc, &journaler->header_oid, &journaler->header_oloc, &clients, &client_num);
+	if (ret)
+		return ret;
+
+	list_for_each_entry_safe(client, next, &journaler->clients, node) {
+		list_del(&client->node);
+		destroy_client(client);
+	}
+
+	if (journaler->clients_array != NULL)
+		kfree(journaler->clients_array);
+
+	journaler->clients_array = clients;
+
+	for (i = 0; i < client_num; i++) {
+		struct ceph_journaler_client *client = &clients[i];
+
+		list_add_tail(&client->node, &journaler->clients);
+		if (!memcmp(client->id, LOCAL_MIRROR_UUID, sizeof(LOCAL_MIRROR_UUID))) {
+			journaler->client = client;
+		}
+	}
+
+	minimum_commit_set = journaler->active_set;
+	list_for_each_entry(client, &journaler->clients, node) {
+		//TODO check the state of client
+		struct ceph_journaler_object_pos *pos;
+		list_for_each_entry(pos, &client->object_positions, node) {
+			uint64_t object_set = pos->object_num / journaler->splay_width;
+			if (object_set < journaler->minimum_set) {
+				minimum_commit_set = object_set;
+			}
+		}
+	}
+
+	minimum_set = journaler->minimum_set;
+	pr_debug("minimum_commit_set: %llu, minimum_set: %llu", minimum_commit_set, minimum_set);
+	if (minimum_commit_set > minimum_set) {
+		uint64_t trim_set = minimum_set;
+		while (trim_set < minimum_commit_set) {
+			ret = remove_set(journaler, trim_set);
+			if (ret < 0) {
+				pr_err("failed to trim object_set: %llu", trim_set);
+				return ret;
+			}
+			trim_set++;
+		}
+		ret = set_minimum_set(journaler, minimum_commit_set);
+		if (ret < 0) {
+			pr_err("failed to set minimum set to %llu", minimum_commit_set);
+			return ret;
+		}
+	}
+
+	return 0;
+
+}
+
+static void journaler_watch_cb(void *arg, u64 notify_id, u64 cookie,
+			 u64 notifier_id, void *data, size_t data_len)
+{
+	struct ceph_journaler *journaler = arg;
+	int ret;
+
+	ret = ceph_osdc_notify_ack(journaler->osdc, &journaler->header_oid,
+				   &journaler->header_oloc, notify_id, cookie,
+				   NULL, 0);
+
+	if (ret)
+		pr_err("acknowledge_notify failed: %d", ret);
+
+	mutex_lock(&journaler->mutex);
+	ret = refresh(journaler);
+	mutex_unlock(&journaler->mutex);
+
+	if (ret < 0) {
+		pr_err("%s: failed to refresh journaler: %d", __func__, ret);
+	}
+}
+
+static void journaler_watch_errcb(void *arg, u64 cookie, int err)
+{
+	pr_err("journaler watch error: %d", err);
+}
+
+static int journaler_watch(struct ceph_journaler *journaler)
+{
+	struct ceph_osd_client *osdc = journaler->osdc;
+	struct ceph_osd_linger_request *handle;
+
+	handle = ceph_osdc_watch(osdc, &journaler->header_oid,
+				 &journaler->header_oloc, journaler_watch_cb,
+				 journaler_watch_errcb, journaler);
+	if (IS_ERR(handle))
+		return PTR_ERR(handle);
+
+	journaler->watch_handle = handle;
+	return 0;
+}
+
+static void journaler_unwatch(struct ceph_journaler *journaler)
+{
+	struct ceph_osd_client *osdc = journaler->osdc;
+	int ret = 0;
+
+	ret = ceph_osdc_unwatch(osdc, journaler->watch_handle);
+	if (ret)
+		pr_err("%s: failed to unwatch: %d", __func__, ret);
+
+	journaler->watch_handle = NULL;
+}
+
+static int ceph_journaler_obj_remove_sync(struct ceph_journaler *journaler,
+			     struct ceph_object_id *oid,
+			     struct ceph_object_locator *oloc)
+
+{
+	struct ceph_osd_client *osdc = journaler->osdc;
+	struct ceph_osd_request *req;
+	int ret;
+
+	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
+	if (!req)
+		return -ENOMEM;
+
+	ceph_oid_copy(&req->r_base_oid, oid);
+	ceph_oloc_copy(&req->r_base_oloc, oloc);
+	req->r_flags = CEPH_OSD_FLAG_WRITE;
+
+	ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
+	if (ret)
+		goto out_req;
+
+	osd_req_op_init(req, 0, CEPH_OSD_OP_DELETE, 0);
+
+	ceph_osdc_start_request(osdc, req, false);
+	ret = ceph_osdc_wait_request(osdc, req);
+
+out_req:
+	ceph_osdc_put_request(req);
+	return ret;
+}
+
+//TODO make it async
+static int ceph_journaler_obj_write_sync(struct ceph_journaler *journaler,
+			     struct ceph_object_id *oid,
+			     struct ceph_object_locator *oloc,
+			     void *buf, int buf_len)
+
+{
+	struct ceph_osd_client *osdc = journaler->osdc;
+	struct ceph_osd_request *req;
+	struct page **pages;
+	int num_pages = calc_pages_for(0, buf_len);
+	int ret;
+
+	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
+	if (!req)
+		return -ENOMEM;
+
+	ceph_oid_copy(&req->r_base_oid, oid);
+	ceph_oloc_copy(&req->r_base_oloc, oloc);
+	req->r_flags = CEPH_OSD_FLAG_WRITE;
+
+	ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
+	if (ret)
+		goto out_req;
+
+	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+	if (IS_ERR(pages)) {
+		ret = PTR_ERR(pages);
+		goto out_req;
+	}
+
+	ceph_copy_to_page_vector(pages, buf, 0, buf_len);
+
+	osd_req_op_extent_init(req, 0, CEPH_OSD_OP_APPEND, 0, buf_len, 0, 0);
+	osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
+					 true);
+
+	ceph_osdc_start_request(osdc, req, false);
+	ret = ceph_osdc_wait_request(osdc, req);
+
+out_req:
+	ceph_osdc_put_request(req);
+	return ret;
+}
+
+static int ceph_journaler_obj_read_sync(struct ceph_journaler *journaler,
+			     struct ceph_object_id *oid,
+			     struct ceph_object_locator *oloc,
+			     void *buf, uint32_t read_off, uint64_t buf_len)
+
+{
+	struct ceph_osd_client *osdc = journaler->osdc;
+	struct ceph_osd_request *req;
+	struct page **pages;
+	int num_pages = calc_pages_for(0, buf_len);
+	int ret;
+
+	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
+	if (!req)
+		return -ENOMEM;
+
+	ceph_oid_copy(&req->r_base_oid, oid);
+	ceph_oloc_copy(&req->r_base_oloc, oloc);
+	req->r_flags = CEPH_OSD_FLAG_READ;
+
+	ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
+	if (ret)
+		goto out_req;
+
+	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+	if (IS_ERR(pages)) {
+		ret = PTR_ERR(pages);
+		goto out_req;
+	}
+
+	osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, read_off, buf_len, 0, 0);
+	osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
+					 true);
+
+	ceph_osdc_start_request(osdc, req, false);
+	ret = ceph_osdc_wait_request(osdc, req);
+	if (ret >= 0)
+		ceph_copy_from_page_vector(pages, buf, 0, ret);
+
+out_req:
+	ceph_osdc_put_request(req);
+	return ret;
+}
+
+static bool entry_is_readable(struct ceph_journaler *journaler, void *buf, void *end, uint32_t *bytes_needed)
+{
+	uint32_t remaining = end - buf;
+	uint64_t preamble;
+	uint32_t data_size;
+	void *origin_buf = buf;
+	uint32_t crc = 0, crc_encoded = 0;
+
+	if (remaining < HEADER_FIXED_SIZE) {
+		*bytes_needed = HEADER_FIXED_SIZE - remaining;
+		return false;
+	}
+
+	preamble = ceph_decode_64(&buf);
+	if (PREAMBLE != preamble) {
+		pr_err("preamble is not correct");
+		*bytes_needed = 0;
+		return false;
+	}
+
+	buf += (HEADER_FIXED_SIZE - sizeof(preamble));
+	remaining = end - buf;
+	if (remaining < sizeof(uint32_t)) {
+		*bytes_needed = sizeof(uint32_t) - remaining;
+		return false;
+	}
+
+	data_size = ceph_decode_32(&buf);
+	remaining = end - buf;
+	if (remaining < data_size) {
+		*bytes_needed = data_size - remaining;
+		return false;
+	}
+
+	buf += data_size;
+	
+	remaining = end - buf;
+	if (remaining < sizeof(uint32_t)) {
+		*bytes_needed = sizeof(uint32_t) - remaining;
+		return false;
+	}
+
+	*bytes_needed = 0;
+	crc = crc32c(0, origin_buf, buf - origin_buf);
+
+	crc_encoded = ceph_decode_32(&buf);
+
+	if (crc != crc_encoded) {
+		return false;
+	}
+	return true;
+}
+
+static void playback_entry(struct ceph_journaler *journaler, struct ceph_journaler_entry *entry)
+{
+	//TODO verify the entry, skip stale tag_tid or others.
+	if (journaler->handle_entry != NULL) {
+		journaler->handle_entry(journaler->entry_handler, entry);
+	}
+}
+
+static void reserve_entry_tid(struct ceph_journaler *journaler, uint64_t tag_tid, uint64_t entry_tid)
+{
+	struct entry_tid *pos;
+
+	spin_lock(&journaler->entry_tid_lock);
+	list_for_each_entry(pos, &journaler->entry_tids, node) {
+		if (pos->tag_tid == tag_tid) {
+			if (pos->entry_tid < entry_tid) {
+				pos->entry_tid = entry_tid;
+			}
+
+			spin_unlock(&journaler->entry_tid_lock);
+			return;
+		}
+	}
+
+	pos = kzalloc(sizeof(struct entry_tid), GFP_KERNEL);
+	WARN_ON(pos == NULL);
+
+	pos->tag_tid = tag_tid;
+	pos->entry_tid = entry_tid;
+	INIT_LIST_HEAD(&pos->node);
+
+	list_add_tail(&pos->node, &journaler->entry_tids);
+	spin_unlock(&journaler->entry_tid_lock);
+
+	return;
+}
+
+DEFINE_RB_INSDEL_FUNCS(commit_entry, struct commit_entry, commit_tid, r_node)
+
+static int add_commit_entry(struct ceph_journaler *journaler, uint64_t commit_tid, uint64_t object_num, uint64_t tag_tid, uint64_t entry_tid)
+{
+	struct commit_entry	*entry = NULL;
+	int ret = 0;
+
+	entry = kzalloc(sizeof(*entry), GFP_KERNEL);
+	if (entry == NULL) {
+		ret = -ENOMEM;
+		goto out;
+	}
+	RB_CLEAR_NODE(&entry->r_node);
+
+	entry->commit_tid = commit_tid;
+	entry->object_num = object_num;
+	entry->tag_tid = tag_tid;
+	entry->entry_tid = entry_tid;
+
+	spin_lock(&journaler->commit_lock);
+	insert_commit_entry(&journaler->commit_entries, entry);
+	spin_unlock(&journaler->commit_lock);
+
+out:
+	return ret;
+}
+
+static uint64_t allocate_commit_tid(struct ceph_journaler *journaler,
+				    uint64_t object_num, uint64_t tag_tid,
+				    uint64_t entry_tid)
+{
+	return ++journaler->commit_tid;
+}
+
+void ceph_journaler_client_committed(struct ceph_journaler *journaler, uint64_t commit_tid);
+static void process_entries(struct ceph_journaler *journaler, struct list_head *entry_list, struct ceph_journaler_object_pos *position)
+{
+	struct ceph_journaler_entry *entry, *next;
+	bool found_commit = false;
+	uint64_t commit_tid;
+
+	list_for_each_entry_safe(entry, next, entry_list, node) {
+		if (entry->tag_tid == position->tag_tid &&
+			entry->entry_tid == position->entry_tid) {
+			found_commit = true;
+			continue;
+		} else if (found_commit) {
+			reserve_entry_tid(journaler, entry->tag_tid, entry->entry_tid);
+			commit_tid = allocate_commit_tid(journaler, position->object_num, entry->tag_tid, entry->entry_tid);
+			playback_entry(journaler, entry);
+			add_commit_entry(journaler, commit_tid, position->object_num, entry->tag_tid, entry->entry_tid);
+			ceph_journaler_client_committed(journaler, commit_tid);
+		} else {
+			reserve_entry_tid(journaler, entry->tag_tid, entry->entry_tid);
+		}
+		list_del(&entry->node);
+		kfree(entry);
+	}
+	return;
+}
+
+static struct ceph_journaler_entry *journaler_entry_decode(void **p, void *end);
+static void fetch(struct ceph_journaler *journaler, uint64_t object_num)
+{
+	struct ceph_object_id object_oid;
+	int ret = 0;
+	void *buf = NULL, *read_buf = NULL, *buf_p = NULL;
+	void *end = NULL;
+	uint64_t read_len = 2 << journaler->order;
+	uint32_t read_off = 0;
+	uint64_t buf_len = read_len;
+	struct list_head entry_list;
+	bool position_found = false;
+
+	struct ceph_journaler_object_pos *pos;
+
+	list_for_each_entry(pos, &journaler->client->object_positions, node) {
+		if (pos->object_num == object_num) {
+			position_found = true;
+			break;
+		}
+	}
+
+	if (!position_found) {
+		return;
+	}
+
+	INIT_LIST_HEAD(&entry_list);
+	ceph_oid_init(&object_oid);
+	ret = ceph_oid_aprintf(&object_oid, GFP_KERNEL, "%s%llu",
+				journaler->object_oid_prefix, object_num);
+	if (ret) {
+		pr_err("aprintf error : %d", ret);
+		return;
+	}
+
+	buf = vmalloc(buf_len);
+	if (!buf) {
+		pr_err("failed to vmalloc buf: %llu", buf_len);
+		goto err_free_object_oid;
+	}
+	read_buf = buf;
+	buf_p = buf;
+
+refetch:
+	ret = ceph_journaler_obj_read_sync(journaler, &object_oid, &journaler->data_oloc, read_buf, read_off, read_len);
+	if (ret == -ENOENT) {
+		pr_err("no such object: %d", ret);
+		goto err_free_buf;
+	} else if (ret < 0) {
+		pr_err("failed to read: %d", ret);
+		goto err_free_buf;
+	} else if (ret == 0) {
+		pr_err("no data: %d", ret);
+		goto err_free_buf;
+	}
+	read_off = read_off + ret;
+
+	end = read_buf + ret;
+	while (buf < end) {
+		uint32_t bytes_needed = 0;
+		struct ceph_journaler_entry *entry = NULL;
+
+		if (!entry_is_readable(journaler, buf, end, &bytes_needed)) {
+			uint64_t remain = end - buf;
+			if (bytes_needed != 0) {
+				void *new_buf = vmalloc(read_len + remain);
+				if (!new_buf) {
+					pr_err("failed to alloc new buf");
+					goto err_free_buf;
+				}
+				memcpy(new_buf, buf, remain);
+				vfree(buf_p);
+				buf_p = new_buf;
+				buf = new_buf;
+				read_buf = buf + remain;
+				goto refetch;
+			} else {
+				pr_err("entry corruption");
+				goto err_free_buf;
+			}
+		}
+		entry = journaler_entry_decode(&buf, end);
+		if (!entry)
+			goto err_free_buf;
+
+		list_add_tail(&entry->node, &entry_list);
+	}
+
+	process_entries(journaler, &entry_list, pos);
+
+err_free_buf:
+	vfree(buf_p);
+err_free_object_oid:
+	ceph_oid_destroy(&object_oid);
+	return;
+}
+
+void start_replay(struct ceph_journaler *journaler)
+{	
+	struct ceph_journaler_object_pos *active_pos = NULL;
+	struct ceph_journaler_client *client = NULL;
+	uint64_t *fetch_objects = kzalloc(sizeof(uint64_t) * journaler->splay_width, GFP_KERNEL);
+	int index = 0;
+	int i = 0;
+
+	mutex_lock(&journaler->mutex);
+	client = journaler->client;
+	active_pos = list_first_entry(&journaler->client->object_positions, struct ceph_journaler_object_pos, node);
+
+	journaler->active_tag_tid = active_pos->tag_tid;
+	journaler->commit_pos_valid = true;
+	journaler->commit_pos = active_pos;
+	journaler->splay_offset = active_pos->object_num % journaler->splay_width;
+
+	list_for_each_entry(active_pos, &client->object_positions, node) {
+		fetch_objects[index++] = active_pos->object_num;
+	}
+
+	for (i = 0; i < index; i++) {
+		fetch(journaler, fetch_objects[i]);
+	}
+	mutex_unlock(&journaler->mutex);
+}
+EXPORT_SYMBOL(start_replay);
+
+int ceph_journaler_open(struct ceph_journaler *journaler)
+{
+	struct ceph_journaler_object_pos *pos = NULL;
+	int ret = 0;
+
+	ret = journaler_watch(journaler);
+	if (ret) {
+		pr_err("journaler_watch error: %d", ret);
+		return ret;
+	}
+
+	ret = ceph_cls_journaler_get_immutable_metas(journaler->osdc,
+					&journaler->header_oid,
+					&journaler->header_oloc,
+					&journaler->order,
+					&journaler->splay_width,
+					&journaler->pool_id);
+	if (ret) {
+		pr_err("failed to get immutable metas.");;
+		goto err_unwatch;
+	}
+
+	if (journaler->pool_id == -1) {
+		ceph_oloc_copy(&journaler->data_oloc, &journaler->header_oloc);
+		journaler->pool_id = journaler->data_oloc.pool;
+	} else {
+		journaler->data_oloc.pool = journaler->pool_id;
+	}
+
+	mutex_lock(&journaler->mutex);
+	ret = refresh(journaler);
+	mutex_unlock(&journaler->mutex);
+	if (ret)
+		goto err_unwatch;
+
+	list_for_each_entry(pos, &journaler->client->object_positions, node) {
+		struct ceph_journaler_object_pos *new_pos = NULL;
+
+		ret = copy_object_pos(pos, &new_pos);
+		if (ret) {
+			goto err_unwatch;
+		}
+
+		list_add_tail(&new_pos->node, &journaler->object_positions_pending);
+	}
+
+	return 0;
+
+err_unwatch:
+	journaler_unwatch(journaler);
+
+	return ret;
+}
+EXPORT_SYMBOL(ceph_journaler_open);
+
+static void destroy_client(struct ceph_journaler_client *client)
+{
+	struct ceph_journaler_object_pos *pos, *next;
+
+	list_for_each_entry_safe(pos, next, &client->object_positions, node) {
+		list_del(&pos->node);
+		kfree(pos);
+	}
+	kfree(client->id);
+	kfree(client->data);
+
+}
+
+void ceph_journaler_close(struct ceph_journaler *journaler)
+{
+	struct ceph_journaler_client *client = NULL, *next = NULL;
+	struct commit_entry *entry = NULL;
+	struct entry_tid *entry_tid = NULL, *entry_tid_next = NULL;
+	struct rb_node *n;
+
+	journaler_unwatch(journaler);
+
+	list_for_each_entry_safe(client, next, &journaler->clients, node) {
+		list_del(&client->node);
+		destroy_client(client);
+	}
+
+	if (journaler->clients_array != NULL)
+		kfree(journaler->clients_array);
+
+	for (n = rb_first(&journaler->commit_entries); n;) {
+		entry = rb_entry(n, struct commit_entry, r_node);
+
+		n = rb_next(n);
+		erase_commit_entry(&journaler->commit_entries, entry);
+		kfree(entry);
+	}
+
+
+	list_for_each_entry_safe(entry_tid, entry_tid_next, &journaler->entry_tids, node) {
+		list_del(&entry_tid->node);
+		kfree(entry_tid);
+	}
+
+	return;
+}
+EXPORT_SYMBOL(ceph_journaler_close);
+
+static struct ceph_journaler_entry *journaler_entry_decode(void **p, void *end)
+{
+	struct ceph_journaler_entry *entry = NULL;
+	uint64_t preamble = 0;
+	uint8_t version = 0;
+	uint32_t crc = 0, crc_encoded = 0;
+	void *start = *p;
+
+	preamble = ceph_decode_64(p);
+	if (PREAMBLE != preamble) {
+		return NULL;
+	}
+
+	version = ceph_decode_8(p);
+
+	if (version != 1)
+		return NULL;
+
+	entry = kzalloc(sizeof(struct ceph_journaler_entry), GFP_KERNEL);
+
+	INIT_LIST_HEAD(&entry->node);
+	entry->entry_tid = ceph_decode_64(p);
+	entry->tag_tid = ceph_decode_64(p);
+	entry->data = ceph_extract_encoded_string(p, end, &entry->data_len, GFP_NOIO); 
+	if (!entry->data)
+		goto error;
+
+	crc = crc32c(0, start, *p - start);
+
+	crc_encoded = ceph_decode_32(p);
+
+	if (crc != crc_encoded)
+		goto free_data;
+
+	return entry;
+free_data:
+	kfree(entry->data);
+error:
+	kfree(entry);
+	return NULL;
+}
+
+static void journaler_entry_encode(struct ceph_journaler_entry *entry, void **p, void *end)
+{
+	void *start = *p;
+	uint32_t crc = 0;
+
+	ceph_encode_64(p, PREAMBLE);
+	ceph_encode_8(p, (uint8_t)1);
+	ceph_encode_64(p, entry->entry_tid);
+	ceph_encode_64(p, entry->tag_tid);
+	ceph_encode_string(p, end, entry->data, entry->data_len);
+
+	crc = crc32c(0, start, *p - start);
+
+	ceph_encode_32(p, crc);
+
+	return;
+}
+
+// record
+static ssize_t ceph_entry_buf_size(struct ceph_journaler_entry *entry)
+{
+	// PEAMBLE(8) + version(1) + entry_tid(8) + tag_tid(8) + string_len(4) + crc(4) = 33
+	return entry->data_len + 33;
+}
+
+static uint64_t get_object(struct ceph_journaler *journaler, uint64_t splay_offset)
+{
+	return splay_offset + (journaler->splay_width * journaler->active_set);
+}
+
+static void advance_object_set(struct ceph_journaler *journaler)
+{
+	int ret = 0;
+
+	journaler->active_set++;
+
+	ret = ceph_cls_journaler_set_active_set(journaler->osdc, &journaler->header_oid, &journaler->header_oloc, journaler->active_set);
+}
+
+static int ceph_journaler_object_append(struct ceph_journaler *journaler, uint64_t object_num, 
+				  struct ceph_journaler_future *future,
+				  struct ceph_journaler_entry *entry)
+{
+	void *buf = NULL;
+	void *start_buf = NULL;
+	void *end = NULL;
+	ssize_t buf_len;
+	struct ceph_object_id object_oid;
+	int ret = 0;
+
+
+	buf_len = ceph_entry_buf_size(entry);
+	buf = vmalloc(buf_len);
+	end = buf + buf_len;
+	start_buf = buf;
+
+	journaler_entry_encode(entry, &buf, end);
+
+	ceph_oid_init(&object_oid);
+
+retry:
+	if (!ceph_oid_empty(&object_oid)) {
+		ceph_oid_destroy(&object_oid);
+		ceph_oid_init(&object_oid);
+	}
+	ret = ceph_oid_aprintf(&object_oid, GFP_KERNEL, "%s%llu",
+				journaler->object_oid_prefix, object_num);
+
+	//TODO send "guard append" and "write" in a single request
+	ret = ceph_cls_journaler_guard_append(journaler->osdc, &object_oid, &journaler->header_oloc, 1 << journaler->order);
+
+	if (ret == -EOVERFLOW) {
+		pr_debug("overflow: %llu", journaler->active_set);
+		advance_object_set(journaler);
+		object_num = get_object(journaler, entry->entry_tid % journaler->splay_width);
+		goto retry;
+	}
+
+	ret = ceph_journaler_obj_write_sync(journaler, &object_oid, &journaler->data_oloc, start_buf, buf_len);
+
+	if (ret) {
+		pr_err("error in write entry: %d", ret);
+	}
+
+	pr_debug("write event: %d, tagtid: %llu", ret, entry->tag_tid);
+
+	return ret;
+}
+
+static uint64_t allocate_entry_tid(struct ceph_journaler *journaler, uint64_t tag_tid)
+{
+	struct entry_tid *pos = NULL;
+	uint64_t entry_tid = 0;
+
+	spin_lock(&journaler->entry_tid_lock);
+	list_for_each_entry(pos, &journaler->entry_tids, node) {
+		if (pos->tag_tid == tag_tid) {
+			entry_tid = pos->entry_tid++;
+			spin_unlock(&journaler->entry_tid_lock);
+			return entry_tid;
+		}
+	}
+	pos = kzalloc(sizeof(struct entry_tid), GFP_KERNEL);
+	WARN_ON(pos == NULL);
+
+	pos->tag_tid = tag_tid;
+	pos->entry_tid = 0;
+	INIT_LIST_HEAD(&pos->node);
+
+	list_add_tail(&pos->node, &journaler->entry_tids);
+	entry_tid = pos->entry_tid++;
+	spin_unlock(&journaler->entry_tid_lock);
+
+	return entry_tid;
+}
+
+static struct ceph_journaler_future *create_future(uint64_t tag_tid, uint64_t entry_tid, uint64_t commit_tid)
+{
+	struct ceph_journaler_future *future = NULL;
+
+	future = kzalloc(sizeof(struct ceph_journaler_future), GFP_KERNEL);
+	if (!future)
+		return NULL;
+	future->tag_tid = tag_tid;
+	future->entry_tid = entry_tid;
+	future->commit_tid = commit_tid;
+
+	return future;
+}
+
+static struct ceph_journaler_entry *create_entry(uint64_t tag_tid, uint64_t entry_tid, char* data, ssize_t data_len)
+{
+	struct ceph_journaler_entry *entry = NULL;
+
+	entry = kzalloc(sizeof(struct ceph_journaler_entry), GFP_KERNEL);
+	if (!entry)
+		return NULL;
+	entry->tag_tid = tag_tid;
+	entry->entry_tid = entry_tid;
+	entry->data = data;
+	entry->data_len = data_len;
+
+	return entry;
+}
+
+int ceph_journaler_append(struct ceph_journaler *journaler, uint64_t tag_tid, char *data, ssize_t data_len, struct ceph_journaler_future **journal_future)
+{
+	uint64_t entry_tid;
+	uint8_t splay_width;
+	uint8_t splay_offset;
+
+	uint64_t object_num;
+	uint64_t commit_tid;
+
+	struct ceph_journaler_future *future;
+
+	struct ceph_journaler_entry *entry;
+
+	int ret = 0;
+
+	spin_lock(&journaler->meta_lock);
+	entry_tid = allocate_entry_tid(journaler, tag_tid);
+	splay_width = journaler->splay_width;
+	splay_offset = entry_tid % splay_width;
+
+	object_num = get_object(journaler, splay_offset);
+	commit_tid = allocate_commit_tid(journaler, object_num, tag_tid, entry_tid);
+
+	future = create_future(tag_tid, entry_tid, commit_tid);
+
+	entry = create_entry(tag_tid, entry_tid, data, data_len);
+	spin_unlock(&journaler->meta_lock);
+
+	ret = ceph_journaler_object_append(journaler, object_num, future, entry);
+	if (ret) 
+		goto out;
+
+	ret = add_commit_entry(journaler, commit_tid, object_num, tag_tid, entry_tid);
+	if (ret) 
+		goto out;
+
+	*journal_future = future;
+out:
+	return ret;
+}
+EXPORT_SYMBOL(ceph_journaler_append);
+
+static int add_object_position(struct commit_entry *entry, struct list_head *object_positions, uint64_t splay_width)
+{
+	struct ceph_journaler_object_pos *position = NULL;
+	uint8_t splay_offset = entry->object_num % splay_width;
+	bool found = false;
+	int ret = 0;
+	
+	list_for_each_entry(position, object_positions, node) {
+		if (splay_offset == position->object_num % splay_width) {
+			found = true;
+			break;
+		}
+	}
+
+	if (!found) {
+		position = kzalloc(sizeof(*position), GFP_KERNEL);
+
+		if (!position) {
+			pr_err("failed to allocate position");
+			return -ENOMEM;
+		}
+		list_add(&position->node, object_positions);
+	} else {
+		list_move(&position->node, object_positions);
+	}
+
+	position->object_num = entry->object_num;
+	position->tag_tid = entry->tag_tid;
+	position->entry_tid = entry->entry_tid;
+
+	return ret;
+}
+
+void ceph_journaler_client_committed(struct ceph_journaler *journaler, uint64_t commit_tid)
+{
+	struct commit_entry *entry = NULL;
+	bool update_client_commit = true;
+	struct list_head object_positions;
+	struct rb_node *n;
+
+	INIT_LIST_HEAD(&object_positions);
+	spin_lock(&journaler->commit_lock);
+	for (n = rb_first(&journaler->commit_entries); n; n = rb_next(n)) {
+		entry = rb_entry(n, struct commit_entry, r_node);
+
+		if (entry->commit_tid == commit_tid) {
+			entry->committed = true;
+			break;
+		}
+
+		if (entry->committed == false) {
+			update_client_commit = false;
+		}
+	}
+
+	if (update_client_commit) {
+		for (n = rb_first(&journaler->commit_entries); n;) {
+			entry = rb_entry(n, struct commit_entry, r_node);
+			n = rb_next(n);
+
+			if (entry->commit_tid > commit_tid)
+				break;
+			add_object_position(entry, &journaler->object_positions_pending, journaler->splay_width);
+			erase_commit_entry(&journaler->commit_entries, entry);
+			kfree(entry);
+		}
+	}
+	spin_unlock(&journaler->commit_lock);
+
+	if (update_client_commit) {
+		queue_work(journaler->task_wq, &journaler->commit_work);
+	}
+}
+EXPORT_SYMBOL(ceph_journaler_client_committed);
+
+int ceph_journaler_allocate_tag(struct ceph_journaler *journaler, uint64_t tag_class, void *buf, uint32_t buf_len, struct ceph_journaler_tag *tag)
+{
+	uint64_t tag_tid = 0;
+	int ret = 0;
+
+	ret = ceph_cls_journaler_get_next_tag_tid(journaler->osdc,
+					&journaler->header_oid,
+					&journaler->header_oloc,
+					&tag_tid);
+	if (ret)
+		goto out;
+
+	ret = ceph_cls_journaler_tag_create(journaler->osdc,
+					&journaler->header_oid,
+					&journaler->header_oloc,
+					tag_tid, tag_class,
+					buf, buf_len);
+	if (ret)
+		goto out;
+
+	ret = ceph_cls_journaler_get_tag(journaler->osdc,
+					&journaler->header_oid,
+					&journaler->header_oloc,
+					tag_tid, tag);
+	if (ret)
+		goto out;
+
+out:
+	return ret;
+}
+EXPORT_SYMBOL(ceph_journaler_allocate_tag);
+
+int ceph_journaler_get_cached_client(struct ceph_journaler *journaler, char *client_id, uint32_t id_len, struct ceph_journaler_client *client_result)
+{
+	struct ceph_journaler_client *client = list_first_entry(&journaler->clients, struct ceph_journaler_client, node);
+
+	int ret = -ENOENT;
+
+	list_for_each_entry(client, &journaler->clients, node) {
+		if (!memcmp(client->id, client_id, sizeof(*client_id))) {
+			client_result = client;
+			ret = 0;
+			break;
+		}
+	}
+
+	return ret;
+}
+EXPORT_SYMBOL(ceph_journaler_get_cached_client);