diff mbox series

[04/11] libceph: introduce generic journaling

Message ID 1543841435-13652-5-git-send-email-dongsheng.yang@easystack.cn (mailing list archive)
State New, archived
Headers show
Series [01/11] libceph: support prefix and suffix in bio_iter | expand

Commit Message

Dongsheng Yang Dec. 3, 2018, 12:50 p.m. UTC
This commit introduce the generic journaling. This is a initial
commit, which only includes some generic functions, such as
journaler_create|destroy() and journaler_open|close().

Signed-off-by: Dongsheng Yang <dongsheng.yang@easystack.cn>
---
 include/linux/ceph/journaler.h | 162 +++++++++++++
 net/ceph/Makefile              |   3 +-
 net/ceph/journaler.c           | 513 +++++++++++++++++++++++++++++++++++++++++
 3 files changed, 677 insertions(+), 1 deletion(-)
 create mode 100644 include/linux/ceph/journaler.h
 create mode 100644 net/ceph/journaler.c
diff mbox series

Patch

diff --git a/include/linux/ceph/journaler.h b/include/linux/ceph/journaler.h
new file mode 100644
index 0000000..50e8c52
--- /dev/null
+++ b/include/linux/ceph/journaler.h
@@ -0,0 +1,162 @@ 
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef _FS_CEPH_JOURNAL_H
+#define _FS_CEPH_JOURNAL_H
+
+#include <linux/rbtree.h>
+#include <linux/ceph/cls_journaler_client.h>
+
+struct ceph_osd_client;
+
+#define JOURNAL_HEADER_PREFIX	"journal."
+#define JOURNAL_OBJECT_PREFIX	"journal_data."
+
+#define LOCAL_MIRROR_UUID	""
+
+/// preamble, version, entry tid, tag id
+static const uint32_t HEADER_FIXED_SIZE = 25;
+/// data size, crc
+static const uint32_t REMAINDER_FIXED_SIZE = 8;
+static const uint64_t PREAMBLE = 0x3141592653589793;
+
+struct ceph_journaler_ctx;
+typedef void (*ceph_journalecallback_t)(struct ceph_journaler_ctx *);
+
+struct ceph_journaler_ctx {
+	struct list_head node;
+	struct ceph_bio_iter	*bio_iter;
+	int result;
+
+	void *priv;
+	ceph_journalecallback_t callback;
+};
+
+struct ceph_journaler_future {
+	uint64_t tag_tid;
+	uint64_t entry_tid;
+	uint64_t commit_tid;
+
+	bool safe;
+	bool consistent;
+
+	struct ceph_journaler_ctx *ctx;
+	struct ceph_journaler_future *wait;
+};
+
+struct ceph_journaler_entry {
+	struct list_head node;
+	uint64_t tag_tid;
+	uint64_t entry_tid;
+	ssize_t data_len;
+	char *data;
+	struct ceph_bio_iter *bio_iter;
+};
+
+struct entry_tid {
+	struct list_head	node;
+	uint64_t tag_tid;
+	uint64_t entry_tid;
+};
+
+struct commit_entry {
+	struct rb_node	r_node;
+	uint64_t commit_tid;
+	uint64_t object_num;
+	uint64_t tag_tid;
+	uint64_t entry_tid;
+	bool committed;
+};
+
+struct object_recorder {
+	spinlock_t lock;
+	bool overflowed;
+	uint64_t splay_offset;
+	uint64_t inflight_append;
+
+	struct list_head append_list;
+	struct list_head overflow_list;
+};
+
+struct object_replayer {
+	spinlock_t lock;
+	uint64_t object_num;
+	struct ceph_journaler_object_pos *pos;
+	struct list_head entry_list;
+};
+
+struct watch_cb_data {
+	struct list_head node;
+	u64 notify_id;
+	u64 cookie;
+};
+
+struct ceph_journaler {
+	struct ceph_osd_client *osdc;
+	struct ceph_object_locator data_oloc;
+
+	struct ceph_object_id header_oid;
+	struct ceph_object_locator header_oloc;
+
+	char *object_oid_prefix;
+	char *client_id;
+
+	struct object_recorder *obj_recorders;
+	struct object_replayer *obj_replayers;
+	uint64_t splay_offset;
+	uint64_t active_tag_tid;
+
+	spinlock_t		meta_lock;
+	bool	closing;
+	uint8_t order;
+	uint8_t splay_width;
+	int64_t pool_id;
+	struct ceph_journaler_future *prev_future;
+	struct list_head	watch_cb_list;
+
+	uint64_t commit_tid;
+	uint64_t minimum_set;
+	uint64_t active_set;
+
+	struct list_head	clients;
+	struct ceph_journaler_client *client;
+	struct ceph_journaler_client *clients_array;
+	struct list_head	object_positions_pending;
+
+	spinlock_t		commit_lock;
+	struct rb_root		commit_entries;
+	spinlock_t		entry_tid_lock;
+	struct list_head	entry_tids;
+	spinlock_t		finish_lock;
+	struct list_head	ctx_list;
+	spinlock_t		advancing_lock;
+	bool			advancing;
+
+	struct workqueue_struct	*task_wq;
+	struct workqueue_struct	*finish_wq;
+	struct work_struct	notify_update_work;
+	struct work_struct	commit_work;
+	struct work_struct	finish_work;
+	struct work_struct	overflow_work;
+	struct work_struct	flush_work;
+	struct work_struct	watch_cb_work;
+
+	int (*handle_entry)(void *entry_handler,
+			    struct ceph_journaler_entry *entry,
+			    uint64_t commit_tid);
+	void *entry_handler;
+
+	struct ceph_osd_linger_request *watch_handle;
+};
+
+// generic functions
+struct ceph_journaler *ceph_journaler_create(struct ceph_osd_client *osdc,
+					     struct ceph_object_locator*_oloc,
+					     const char *journal_id,
+					     const char *client_id);
+void ceph_journaler_destroy(struct ceph_journaler *journal);
+
+int ceph_journaler_open(struct ceph_journaler *journal);
+void ceph_journaler_close(struct ceph_journaler *journal);
+
+int ceph_journaler_get_cached_client(struct ceph_journaler *journaler, char *client_id,
+				     struct ceph_journaler_client **client_result);
+#endif
diff --git a/net/ceph/Makefile b/net/ceph/Makefile
index db09defe..a965d64 100644
--- a/net/ceph/Makefile
+++ b/net/ceph/Makefile
@@ -14,4 +14,5 @@  libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
 	crypto.o armor.o \
 	auth_x.o \
 	ceph_fs.o ceph_strings.o ceph_hash.o \
-	pagevec.o snapshot.o string_table.o
+	pagevec.o snapshot.o string_table.o \
+	journaler.o cls_journaler_client.o
diff --git a/net/ceph/journaler.c b/net/ceph/journaler.c
new file mode 100644
index 0000000..8f2ed41
--- /dev/null
+++ b/net/ceph/journaler.c
@@ -0,0 +1,513 @@ 
+// SPDX-License-Identifier: GPL-2.0
+
+#include <linux/ceph/ceph_debug.h>
+#include <linux/ceph/ceph_features.h>
+#include <linux/ceph/cls_journaler_client.h>
+#include <linux/ceph/journaler.h>
+#include <linux/ceph/libceph.h>
+#include <linux/ceph/osd_client.h>
+
+#include <linux/crc32c.h>
+#include <linux/module.h>
+
+static char *object_oid_prefix(int pool_id, const char *journal_id)
+{
+	char *prefix = NULL;
+	ssize_t len = snprintf(NULL, 0, "%s%d.%s.", JOURNAL_OBJECT_PREFIX,
+			       pool_id, journal_id);
+
+	prefix = kzalloc(len + 1, GFP_KERNEL);
+
+	if (!prefix)
+		return NULL;
+
+	WARN_ON(snprintf(prefix, len + 1, "%s%d.%s.", JOURNAL_OBJECT_PREFIX,
+			 pool_id, journal_id) != len);
+
+	return prefix;
+}
+
+static void watch_cb_func(struct work_struct *work);
+
+struct ceph_journaler *ceph_journaler_create(struct ceph_osd_client *osdc,
+					     struct ceph_object_locator *oloc,
+					     const char *journal_id,
+					     const char *client_id)
+{
+	struct ceph_journaler *journaler = NULL;
+	int ret = 0;
+
+	journaler = kzalloc(sizeof(struct ceph_journaler), GFP_KERNEL);
+	if (!journaler)
+		return NULL;
+
+	ceph_oid_init(&journaler->header_oid);
+	ret = ceph_oid_aprintf(&journaler->header_oid, GFP_KERNEL, "%s%s",
+				JOURNAL_HEADER_PREFIX, journal_id);
+	if (ret) {
+		pr_err("aprintf error : %d", ret);
+		goto err_free_journaler;
+	}
+
+	journaler->client_id = kstrdup(client_id, GFP_KERNEL);
+	if (!journaler->client_id) {
+		ret = -ENOMEM;
+		goto err_free_header_oid;;
+	}
+
+	journaler->task_wq = alloc_ordered_workqueue("journaler-tasks",
+						     WQ_MEM_RECLAIM);
+	if (!journaler->task_wq)
+		goto err_free_client_id;
+
+
+	journaler->finish_wq = alloc_ordered_workqueue("journaler-finish",
+						     WQ_MEM_RECLAIM);
+	if (!journaler->finish_wq)
+		goto err_destroy_task_wq;
+
+	journaler->notify_wq = alloc_ordered_workqueue("journaler-notify",
+						     WQ_MEM_RECLAIM);
+	if (!journaler->notify_wq)
+		goto err_destroy_finish_wq;
+	ceph_oloc_init(&journaler->header_oloc);
+	ceph_oloc_copy(&journaler->header_oloc, oloc);
+	journaler->object_oid_prefix = object_oid_prefix(journaler->header_oloc.pool,
+							 journal_id);
+
+	if (!journaler->object_oid_prefix)
+		goto err_destroy_header_oloc;
+
+	journaler->osdc = osdc;
+	ceph_oloc_init(&journaler->data_oloc);
+	INIT_LIST_HEAD(&journaler->clients);
+	INIT_LIST_HEAD(&journaler->entry_tids);
+	INIT_LIST_HEAD(&journaler->object_positions_pending);
+	INIT_LIST_HEAD(&journaler->ctx_list);
+	INIT_LIST_HEAD(&journaler->watch_cb_list);
+	journaler->commit_entries = RB_ROOT;
+	journaler->commit_tid = 0;
+	journaler->client = NULL;
+	journaler->clients_array = NULL;
+	journaler->prev_future = NULL;
+	journaler->closing = false;
+
+	spin_lock_init(&journaler->meta_lock);
+	spin_lock_init(&journaler->entry_tid_lock);
+	spin_lock_init(&journaler->commit_lock);
+	spin_lock_init(&journaler->finish_lock);
+
+	INIT_WORK(&journaler->watch_cb_work, watch_cb_func);
+
+	return journaler;
+
+err_destroy_header_oloc:
+	ceph_oloc_destroy(&journaler->header_oloc);
+	destroy_workqueue(journaler->notify_wq);
+err_destroy_finish_wq:
+	destroy_workqueue(journaler->finish_wq);
+err_destroy_task_wq:
+	destroy_workqueue(journaler->task_wq);
+err_free_client_id:
+	kfree(journaler->client_id);
+err_free_header_oid:
+	ceph_oid_destroy(&journaler->header_oid);
+err_free_journaler:
+	kfree(journaler);
+	return NULL;
+}
+EXPORT_SYMBOL(ceph_journaler_create);
+
+void ceph_journaler_destroy(struct ceph_journaler *journaler)
+{
+	ceph_oloc_destroy(&journaler->data_oloc);
+	kfree(journaler->object_oid_prefix);
+	ceph_oloc_destroy(&journaler->header_oloc);
+	destroy_workqueue(journaler->notify_wq);
+	destroy_workqueue(journaler->finish_wq);
+	destroy_workqueue(journaler->task_wq);
+	kfree(journaler->client_id);
+	ceph_oid_destroy(&journaler->header_oid);
+	kfree(journaler);
+}
+EXPORT_SYMBOL(ceph_journaler_destroy);
+
+static int refresh(struct ceph_journaler *journaler, bool init)
+{
+	int ret = 0;
+	int i = 0;
+	uint32_t client_num = 0;
+	struct ceph_journaler_client *clients = NULL;
+	struct ceph_journaler_client *client, *next;
+	uint64_t minimum_commit_set;
+	uint64_t minimum_set;
+	uint64_t active_set;
+	bool need_advance = false;
+
+	ret = ceph_cls_journaler_get_mutable_metas(journaler->osdc,
+			&journaler->header_oid, &journaler->header_oloc,
+			&minimum_set, &active_set);
+	if (ret)
+		return ret;
+
+	ret = ceph_cls_journaler_client_list(journaler->osdc, &journaler->header_oid,
+		&journaler->header_oloc, &clients, &client_num);
+	if (ret)
+		return ret;
+
+	spin_lock(&journaler->meta_lock);
+	// update clients.
+	list_for_each_entry_safe(client, next, &journaler->clients, node) {
+		list_del(&client->node);
+		destroy_client(client);
+	}
+	if (journaler->clients_array != NULL)
+		kfree(journaler->clients_array);
+
+	journaler->clients_array = clients;
+	for (i = 0; i < client_num; i++) {
+		struct ceph_journaler_client *client = &clients[i];
+
+		list_add_tail(&client->node, &journaler->clients);
+		if (!strcmp(client->id, journaler->client_id)) {
+			journaler->client = client;
+		}
+	}
+
+	if (init) {
+		journaler->active_set = active_set;
+		journaler->minimum_set = minimum_set;
+	} else {
+		// check for advance active_set.
+		need_advance = active_set > journaler->active_set;
+		journaler->minimum_set = minimum_set;
+	}
+
+	// calculate the minimum_commit_set.
+	minimum_commit_set = journaler->active_set;
+	list_for_each_entry(client, &journaler->clients, node) {
+		//TODO check the state of client
+		struct ceph_journaler_object_pos *pos;
+		list_for_each_entry(pos, &client->object_positions, node) {
+			uint64_t object_set = pos->object_num / journaler->splay_width;
+			if (object_set < minimum_commit_set) {
+				minimum_commit_set = object_set;
+			}
+		}
+	}
+	spin_unlock(&journaler->meta_lock);
+
+	return 0;
+
+}
+
+static void watch_cb_func(struct work_struct *work)
+{
+	struct ceph_journaler *journaler = container_of(work, struct ceph_journaler,
+							watch_cb_work);
+	LIST_HEAD(tmp_cb_data_list);
+	struct watch_cb_data *data, *next_data;
+	int ret;
+
+	spin_lock(&journaler->meta_lock);
+	list_splice_tail_init(&journaler->watch_cb_list, &tmp_cb_data_list);
+	spin_unlock(&journaler->meta_lock);
+	
+	list_for_each_entry_safe(data, next_data, &tmp_cb_data_list, node) {
+		list_del(&data->node);
+		ret = ceph_osdc_notify_ack(journaler->osdc, &journaler->header_oid,
+					   &journaler->header_oloc, data->notify_id,
+					   data->cookie, NULL, 0);
+
+		kfree(data);
+		if (ret)
+			pr_err("acknowledge_notify failed: %d", ret);
+	}
+
+	ret = refresh(journaler, false);
+	if (ret < 0) {
+		pr_err("%s: failed to refresh journaler: %d", __func__, ret);
+	}
+}
+
+static void journaler_watch_cb(void *arg, u64 notify_id, u64 cookie,
+			 u64 notifier_id, void *data, size_t data_len)
+{
+	struct ceph_journaler *journaler = arg;
+	struct watch_cb_data *cb_data = NULL;
+
+	cb_data = kzalloc(sizeof(struct watch_cb_data), GFP_KERNEL);
+	if (!cb_data)
+		return;
+
+	cb_data->notify_id = notify_id;
+	cb_data->cookie = cookie;
+	INIT_LIST_HEAD(&cb_data->node);
+
+	spin_lock(&journaler->meta_lock);
+	if (journaler->closing) {
+		kfree(cb_data);
+		spin_unlock(&journaler->meta_lock);
+		return;
+	}
+	list_add_tail(&journaler->watch_cb_list, &cb_data->node);
+	queue_work(journaler->finish_wq, &journaler->watch_cb_work);
+	spin_unlock(&journaler->meta_lock);
+}
+
+static void journaler_watch_errcb(void *arg, u64 cookie, int err)
+{
+	pr_err("journaler watch error: %d", err);
+}
+
+static int journaler_watch(struct ceph_journaler *journaler)
+{
+	struct ceph_osd_client *osdc = journaler->osdc;
+	struct ceph_osd_linger_request *handle;
+
+	handle = ceph_osdc_watch(osdc, &journaler->header_oid,
+				 &journaler->header_oloc, journaler_watch_cb,
+				 journaler_watch_errcb, journaler);
+	if (IS_ERR(handle))
+		return PTR_ERR(handle);
+
+	journaler->watch_handle = handle;
+	return 0;
+}
+
+static void journaler_unwatch(struct ceph_journaler *journaler)
+{
+	struct ceph_osd_client *osdc = journaler->osdc;
+	int ret = 0;
+
+	ret = ceph_osdc_unwatch(osdc, journaler->watch_handle);
+	if (ret)
+		pr_err("%s: failed to unwatch: %d", __func__, ret);
+
+	journaler->watch_handle = NULL;
+}
+
+static int copy_object_pos(struct ceph_journaler_object_pos *pos,
+			   struct ceph_journaler_object_pos **new_pos)
+{
+	struct ceph_journaler_object_pos *temp_pos;
+
+	temp_pos = kzalloc(sizeof(*temp_pos), GFP_KERNEL);
+	if (temp_pos == NULL) {
+		return -ENOMEM;
+	}
+	INIT_LIST_HEAD(&temp_pos->node);
+	temp_pos->object_num = pos->object_num;
+	temp_pos->tag_tid = pos->tag_tid;
+	temp_pos->entry_tid = pos->entry_tid;
+
+	*new_pos = temp_pos;
+
+	return 0;
+}
+
+int ceph_journaler_open(struct ceph_journaler *journaler)
+{
+	uint8_t order, splay_width;
+	int64_t pool_id;
+	int i = 0, ret = 0;
+	struct ceph_journaler_object_pos *pos = NULL, *next_pos = NULL;
+	struct ceph_journaler_client *client = NULL, *next_client = NULL;
+
+	ret = ceph_cls_journaler_get_immutable_metas(journaler->osdc,
+					&journaler->header_oid,
+					&journaler->header_oloc,
+					&order,
+					&splay_width,
+					&pool_id);
+	if (ret) {
+		pr_err("failed to get immutable metas.");;
+		goto out;
+	}
+
+	spin_lock(&journaler->meta_lock);
+	// set the immutable metas.
+	journaler->order = order;
+	journaler->splay_width = splay_width;
+	journaler->pool_id = pool_id;
+
+	if (journaler->pool_id == -1) {
+		ceph_oloc_copy(&journaler->data_oloc, &journaler->header_oloc);
+		journaler->pool_id = journaler->data_oloc.pool;
+	} else {
+		journaler->data_oloc.pool = journaler->pool_id;
+	}
+
+	// initialize ->obj_recorders and ->obj_replayers.
+	journaler->obj_recorders = kzalloc(sizeof(struct object_recorder) *
+					   journaler->splay_width, GFP_KERNEL);
+
+	if (!journaler->obj_recorders) {
+		spin_unlock(&journaler->meta_lock);
+		goto out;
+	}
+
+	journaler->obj_replayers = kzalloc(sizeof(struct object_replayer) *
+					   journaler->splay_width, GFP_KERNEL);
+
+	if (!journaler->obj_replayers) {
+		spin_unlock(&journaler->meta_lock);
+		goto free_recorders;
+	}
+
+	for (i = 0; i < journaler->splay_width; i++) {
+		struct object_recorder *obj_recorder = &journaler->obj_recorders[i];
+		struct object_replayer *obj_replayer = &journaler->obj_replayers[i];
+
+		spin_lock_init(&obj_recorder->lock);
+		obj_recorder->overflowed = false;
+		obj_recorder->splay_offset = i;
+		obj_recorder->inflight_append = 0;
+		INIT_LIST_HEAD(&obj_recorder->append_list);
+		INIT_LIST_HEAD(&obj_recorder->overflow_list);
+
+		spin_lock_init(&obj_replayer->lock);
+		obj_replayer->object_num = i;
+		obj_replayer->pos = NULL;
+		INIT_LIST_HEAD(&obj_replayer->entry_list);
+	}
+	spin_unlock(&journaler->meta_lock);
+
+	ret = refresh(journaler, true);
+	if (ret)
+		goto free_replayers;
+
+	spin_lock(&journaler->meta_lock);
+	list_for_each_entry(pos, &journaler->client->object_positions, node) {
+		struct ceph_journaler_object_pos *new_pos = NULL;
+
+		ret = copy_object_pos(pos, &new_pos);
+		if (ret) {
+			spin_unlock(&journaler->meta_lock);
+			goto destroy_clients;
+		}
+
+		list_add_tail(&new_pos->node, &journaler->object_positions_pending);
+	}
+	spin_unlock(&journaler->meta_lock);
+
+	ret = journaler_watch(journaler);
+	if (ret) {
+		pr_err("journaler_watch error: %d", ret);
+		goto free_pos_pending;
+	}
+	return 0;
+
+free_pos_pending:
+	list_for_each_entry_safe(pos, next_pos,
+				 &journaler->object_positions_pending, node) {
+		list_del(&pos->node);
+		kfree(pos);
+	}
+destroy_clients:
+	list_for_each_entry_safe(client, next_client,
+				 &journaler->clients, node) {
+		list_del(&client->node);
+		destroy_client(client);
+	}
+
+	if (journaler->clients_array != NULL)
+		kfree(journaler->clients_array);
+free_replayers:
+	kfree(journaler->obj_replayers);
+free_recorders:
+	kfree(journaler->obj_recorders);
+out:
+	return ret;
+}
+EXPORT_SYMBOL(ceph_journaler_open);
+
+DEFINE_RB_INSDEL_FUNCS(commit_entry, struct commit_entry, commit_tid, r_node)
+
+void ceph_journaler_close(struct ceph_journaler *journaler)
+{
+	struct ceph_journaler_client *client = NULL, *next = NULL;
+	struct commit_entry *entry = NULL;
+	struct entry_tid *entry_tid = NULL, *entry_tid_next = NULL;
+	struct ceph_journaler_object_pos *pos = NULL, *next_pos = NULL;
+	struct rb_node *n;
+	int i = 0;
+
+	journaler_unwatch(journaler);
+
+	spin_lock(&journaler->meta_lock);
+	journaler->closing = true;
+	spin_unlock(&journaler->meta_lock);
+
+	queue_work(journaler->task_wq, &journaler->flush_work);
+	flush_workqueue(journaler->finish_wq);
+	flush_workqueue(journaler->task_wq);
+
+	list_for_each_entry_safe(pos, next_pos,
+				 &journaler->object_positions_pending, node) {
+		list_del(&pos->node);
+		kfree(pos);
+	}
+
+	list_for_each_entry_safe(client, next, &journaler->clients, node) {
+		list_del(&client->node);
+		destroy_client(client);
+	}
+
+	if (journaler->clients_array != NULL)
+		kfree(journaler->clients_array);
+
+	for (n = rb_first(&journaler->commit_entries); n;) {
+		entry = rb_entry(n, struct commit_entry, r_node);
+
+		n = rb_next(n);
+		erase_commit_entry(&journaler->commit_entries, entry);
+		kfree(entry);
+	}
+
+	for (i = 0; i < journaler->splay_width; i++) {
+		struct object_recorder *obj_recorder = &journaler->obj_recorders[i];
+		struct object_replayer *obj_replayer = &journaler->obj_replayers[i];
+
+		spin_lock(&obj_recorder->lock);
+		BUG_ON(!list_empty(&obj_recorder->append_list) ||
+			!list_empty(&obj_recorder->overflow_list));
+		spin_unlock(&obj_recorder->lock);
+
+		spin_lock(&obj_replayer->lock);
+		BUG_ON(!list_empty(&obj_replayer->entry_list));
+		spin_unlock(&obj_replayer->lock);
+	}
+
+	kfree(journaler->obj_recorders);
+	kfree(journaler->obj_replayers);
+
+	list_for_each_entry_safe(entry_tid, entry_tid_next,
+				 &journaler->entry_tids, node) {
+		list_del(&entry_tid->node);
+		kfree(entry_tid);
+	}
+
+	WARN_ON(!list_empty(&journaler->watch_cb_list));
+
+	return;
+}
+EXPORT_SYMBOL(ceph_journaler_close);
+
+int ceph_journaler_get_cached_client(struct ceph_journaler *journaler, char *client_id,
+				     struct ceph_journaler_client **client_result)
+{
+	struct ceph_journaler_client *client = NULL;
+	int ret = -ENOENT;
+
+	list_for_each_entry(client, &journaler->clients, node) {
+		if (!strcmp(client->id, client_id)) {
+			*client_result = client;
+			ret = 0;
+			break;
+		}
+	}
+
+	return ret;
+}
+EXPORT_SYMBOL(ceph_journaler_get_cached_client);