new file mode 100644
@@ -0,0 +1,162 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef _FS_CEPH_JOURNAL_H
+#define _FS_CEPH_JOURNAL_H
+
+#include <linux/rbtree.h>
+#include <linux/ceph/cls_journaler_client.h>
+
+struct ceph_osd_client;
+
+#define JOURNAL_HEADER_PREFIX "journal."
+#define JOURNAL_OBJECT_PREFIX "journal_data."
+
+#define LOCAL_MIRROR_UUID ""
+
+/// preamble, version, entry tid, tag id
+static const uint32_t HEADER_FIXED_SIZE = 25;
+/// data size, crc
+static const uint32_t REMAINDER_FIXED_SIZE = 8;
+static const uint64_t PREAMBLE = 0x3141592653589793;
+
+struct ceph_journaler_ctx;
+typedef void (*ceph_journalecallback_t)(struct ceph_journaler_ctx *);
+
+struct ceph_journaler_ctx {
+ struct list_head node;
+ struct ceph_bio_iter *bio_iter;
+ int result;
+
+ void *priv;
+ ceph_journalecallback_t callback;
+};
+
+struct ceph_journaler_future {
+ uint64_t tag_tid;
+ uint64_t entry_tid;
+ uint64_t commit_tid;
+
+ bool safe;
+ bool consistent;
+
+ struct ceph_journaler_ctx *ctx;
+ struct ceph_journaler_future *wait;
+};
+
+struct ceph_journaler_entry {
+ struct list_head node;
+ uint64_t tag_tid;
+ uint64_t entry_tid;
+ ssize_t data_len;
+ char *data;
+ struct ceph_bio_iter *bio_iter;
+};
+
+struct entry_tid {
+ struct list_head node;
+ uint64_t tag_tid;
+ uint64_t entry_tid;
+};
+
+struct commit_entry {
+ struct rb_node r_node;
+ uint64_t commit_tid;
+ uint64_t object_num;
+ uint64_t tag_tid;
+ uint64_t entry_tid;
+ bool committed;
+};
+
+struct object_recorder {
+ spinlock_t lock;
+ bool overflowed;
+ uint64_t splay_offset;
+ uint64_t inflight_append;
+
+ struct list_head append_list;
+ struct list_head overflow_list;
+};
+
+struct object_replayer {
+ spinlock_t lock;
+ uint64_t object_num;
+ struct ceph_journaler_object_pos *pos;
+ struct list_head entry_list;
+};
+
+struct watch_cb_data {
+ struct list_head node;
+ u64 notify_id;
+ u64 cookie;
+};
+
+struct ceph_journaler {
+ struct ceph_osd_client *osdc;
+ struct ceph_object_locator data_oloc;
+
+ struct ceph_object_id header_oid;
+ struct ceph_object_locator header_oloc;
+
+ char *object_oid_prefix;
+ char *client_id;
+
+ struct object_recorder *obj_recorders;
+ struct object_replayer *obj_replayers;
+ uint64_t splay_offset;
+ uint64_t active_tag_tid;
+
+ spinlock_t meta_lock;
+ bool closing;
+ uint8_t order;
+ uint8_t splay_width;
+ int64_t pool_id;
+ struct ceph_journaler_future *prev_future;
+ struct list_head watch_cb_list;
+
+ uint64_t commit_tid;
+ uint64_t minimum_set;
+ uint64_t active_set;
+
+ struct list_head clients;
+ struct ceph_journaler_client *client;
+ struct ceph_journaler_client *clients_array;
+ struct list_head object_positions_pending;
+
+ spinlock_t commit_lock;
+ struct rb_root commit_entries;
+ spinlock_t entry_tid_lock;
+ struct list_head entry_tids;
+ spinlock_t finish_lock;
+ struct list_head ctx_list;
+ spinlock_t advancing_lock;
+ bool advancing;
+
+ struct workqueue_struct *task_wq;
+ struct workqueue_struct *finish_wq;
+ struct work_struct notify_update_work;
+ struct work_struct commit_work;
+ struct work_struct finish_work;
+ struct work_struct overflow_work;
+ struct work_struct flush_work;
+ struct work_struct watch_cb_work;
+
+ int (*handle_entry)(void *entry_handler,
+ struct ceph_journaler_entry *entry,
+ uint64_t commit_tid);
+ void *entry_handler;
+
+ struct ceph_osd_linger_request *watch_handle;
+};
+
+// generic functions
+struct ceph_journaler *ceph_journaler_create(struct ceph_osd_client *osdc,
+ struct ceph_object_locator*_oloc,
+ const char *journal_id,
+ const char *client_id);
+void ceph_journaler_destroy(struct ceph_journaler *journal);
+
+int ceph_journaler_open(struct ceph_journaler *journal);
+void ceph_journaler_close(struct ceph_journaler *journal);
+
+int ceph_journaler_get_cached_client(struct ceph_journaler *journaler, char *client_id,
+ struct ceph_journaler_client **client_result);
+#endif
@@ -14,4 +14,5 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
crypto.o armor.o \
auth_x.o \
ceph_fs.o ceph_strings.o ceph_hash.o \
- pagevec.o snapshot.o string_table.o
+ pagevec.o snapshot.o string_table.o \
+ journaler.o cls_journaler_client.o
new file mode 100644
@@ -0,0 +1,513 @@
+// SPDX-License-Identifier: GPL-2.0
+
+#include <linux/ceph/ceph_debug.h>
+#include <linux/ceph/ceph_features.h>
+#include <linux/ceph/cls_journaler_client.h>
+#include <linux/ceph/journaler.h>
+#include <linux/ceph/libceph.h>
+#include <linux/ceph/osd_client.h>
+
+#include <linux/crc32c.h>
+#include <linux/module.h>
+
+static char *object_oid_prefix(int pool_id, const char *journal_id)
+{
+ char *prefix = NULL;
+ ssize_t len = snprintf(NULL, 0, "%s%d.%s.", JOURNAL_OBJECT_PREFIX,
+ pool_id, journal_id);
+
+ prefix = kzalloc(len + 1, GFP_KERNEL);
+
+ if (!prefix)
+ return NULL;
+
+ WARN_ON(snprintf(prefix, len + 1, "%s%d.%s.", JOURNAL_OBJECT_PREFIX,
+ pool_id, journal_id) != len);
+
+ return prefix;
+}
+
+static void watch_cb_func(struct work_struct *work);
+
+struct ceph_journaler *ceph_journaler_create(struct ceph_osd_client *osdc,
+ struct ceph_object_locator *oloc,
+ const char *journal_id,
+ const char *client_id)
+{
+ struct ceph_journaler *journaler = NULL;
+ int ret = 0;
+
+ journaler = kzalloc(sizeof(struct ceph_journaler), GFP_KERNEL);
+ if (!journaler)
+ return NULL;
+
+ ceph_oid_init(&journaler->header_oid);
+ ret = ceph_oid_aprintf(&journaler->header_oid, GFP_KERNEL, "%s%s",
+ JOURNAL_HEADER_PREFIX, journal_id);
+ if (ret) {
+ pr_err("aprintf error : %d", ret);
+ goto err_free_journaler;
+ }
+
+ journaler->client_id = kstrdup(client_id, GFP_KERNEL);
+ if (!journaler->client_id) {
+ ret = -ENOMEM;
+ goto err_free_header_oid;;
+ }
+
+ journaler->task_wq = alloc_ordered_workqueue("journaler-tasks",
+ WQ_MEM_RECLAIM);
+ if (!journaler->task_wq)
+ goto err_free_client_id;
+
+
+ journaler->finish_wq = alloc_ordered_workqueue("journaler-finish",
+ WQ_MEM_RECLAIM);
+ if (!journaler->finish_wq)
+ goto err_destroy_task_wq;
+
+ journaler->notify_wq = alloc_ordered_workqueue("journaler-notify",
+ WQ_MEM_RECLAIM);
+ if (!journaler->notify_wq)
+ goto err_destroy_finish_wq;
+ ceph_oloc_init(&journaler->header_oloc);
+ ceph_oloc_copy(&journaler->header_oloc, oloc);
+ journaler->object_oid_prefix = object_oid_prefix(journaler->header_oloc.pool,
+ journal_id);
+
+ if (!journaler->object_oid_prefix)
+ goto err_destroy_header_oloc;
+
+ journaler->osdc = osdc;
+ ceph_oloc_init(&journaler->data_oloc);
+ INIT_LIST_HEAD(&journaler->clients);
+ INIT_LIST_HEAD(&journaler->entry_tids);
+ INIT_LIST_HEAD(&journaler->object_positions_pending);
+ INIT_LIST_HEAD(&journaler->ctx_list);
+ INIT_LIST_HEAD(&journaler->watch_cb_list);
+ journaler->commit_entries = RB_ROOT;
+ journaler->commit_tid = 0;
+ journaler->client = NULL;
+ journaler->clients_array = NULL;
+ journaler->prev_future = NULL;
+ journaler->closing = false;
+
+ spin_lock_init(&journaler->meta_lock);
+ spin_lock_init(&journaler->entry_tid_lock);
+ spin_lock_init(&journaler->commit_lock);
+ spin_lock_init(&journaler->finish_lock);
+
+ INIT_WORK(&journaler->watch_cb_work, watch_cb_func);
+
+ return journaler;
+
+err_destroy_header_oloc:
+ ceph_oloc_destroy(&journaler->header_oloc);
+ destroy_workqueue(journaler->notify_wq);
+err_destroy_finish_wq:
+ destroy_workqueue(journaler->finish_wq);
+err_destroy_task_wq:
+ destroy_workqueue(journaler->task_wq);
+err_free_client_id:
+ kfree(journaler->client_id);
+err_free_header_oid:
+ ceph_oid_destroy(&journaler->header_oid);
+err_free_journaler:
+ kfree(journaler);
+ return NULL;
+}
+EXPORT_SYMBOL(ceph_journaler_create);
+
+void ceph_journaler_destroy(struct ceph_journaler *journaler)
+{
+ ceph_oloc_destroy(&journaler->data_oloc);
+ kfree(journaler->object_oid_prefix);
+ ceph_oloc_destroy(&journaler->header_oloc);
+ destroy_workqueue(journaler->notify_wq);
+ destroy_workqueue(journaler->finish_wq);
+ destroy_workqueue(journaler->task_wq);
+ kfree(journaler->client_id);
+ ceph_oid_destroy(&journaler->header_oid);
+ kfree(journaler);
+}
+EXPORT_SYMBOL(ceph_journaler_destroy);
+
+static int refresh(struct ceph_journaler *journaler, bool init)
+{
+ int ret = 0;
+ int i = 0;
+ uint32_t client_num = 0;
+ struct ceph_journaler_client *clients = NULL;
+ struct ceph_journaler_client *client, *next;
+ uint64_t minimum_commit_set;
+ uint64_t minimum_set;
+ uint64_t active_set;
+ bool need_advance = false;
+
+ ret = ceph_cls_journaler_get_mutable_metas(journaler->osdc,
+ &journaler->header_oid, &journaler->header_oloc,
+ &minimum_set, &active_set);
+ if (ret)
+ return ret;
+
+ ret = ceph_cls_journaler_client_list(journaler->osdc, &journaler->header_oid,
+ &journaler->header_oloc, &clients, &client_num);
+ if (ret)
+ return ret;
+
+ spin_lock(&journaler->meta_lock);
+ // update clients.
+ list_for_each_entry_safe(client, next, &journaler->clients, node) {
+ list_del(&client->node);
+ destroy_client(client);
+ }
+ if (journaler->clients_array != NULL)
+ kfree(journaler->clients_array);
+
+ journaler->clients_array = clients;
+ for (i = 0; i < client_num; i++) {
+ struct ceph_journaler_client *client = &clients[i];
+
+ list_add_tail(&client->node, &journaler->clients);
+ if (!strcmp(client->id, journaler->client_id)) {
+ journaler->client = client;
+ }
+ }
+
+ if (init) {
+ journaler->active_set = active_set;
+ journaler->minimum_set = minimum_set;
+ } else {
+ // check for advance active_set.
+ need_advance = active_set > journaler->active_set;
+ journaler->minimum_set = minimum_set;
+ }
+
+ // calculate the minimum_commit_set.
+ minimum_commit_set = journaler->active_set;
+ list_for_each_entry(client, &journaler->clients, node) {
+ //TODO check the state of client
+ struct ceph_journaler_object_pos *pos;
+ list_for_each_entry(pos, &client->object_positions, node) {
+ uint64_t object_set = pos->object_num / journaler->splay_width;
+ if (object_set < minimum_commit_set) {
+ minimum_commit_set = object_set;
+ }
+ }
+ }
+ spin_unlock(&journaler->meta_lock);
+
+ return 0;
+
+}
+
+static void watch_cb_func(struct work_struct *work)
+{
+ struct ceph_journaler *journaler = container_of(work, struct ceph_journaler,
+ watch_cb_work);
+ LIST_HEAD(tmp_cb_data_list);
+ struct watch_cb_data *data, *next_data;
+ int ret;
+
+ spin_lock(&journaler->meta_lock);
+ list_splice_tail_init(&journaler->watch_cb_list, &tmp_cb_data_list);
+ spin_unlock(&journaler->meta_lock);
+
+ list_for_each_entry_safe(data, next_data, &tmp_cb_data_list, node) {
+ list_del(&data->node);
+ ret = ceph_osdc_notify_ack(journaler->osdc, &journaler->header_oid,
+ &journaler->header_oloc, data->notify_id,
+ data->cookie, NULL, 0);
+
+ kfree(data);
+ if (ret)
+ pr_err("acknowledge_notify failed: %d", ret);
+ }
+
+ ret = refresh(journaler, false);
+ if (ret < 0) {
+ pr_err("%s: failed to refresh journaler: %d", __func__, ret);
+ }
+}
+
+static void journaler_watch_cb(void *arg, u64 notify_id, u64 cookie,
+ u64 notifier_id, void *data, size_t data_len)
+{
+ struct ceph_journaler *journaler = arg;
+ struct watch_cb_data *cb_data = NULL;
+
+ cb_data = kzalloc(sizeof(struct watch_cb_data), GFP_KERNEL);
+ if (!cb_data)
+ return;
+
+ cb_data->notify_id = notify_id;
+ cb_data->cookie = cookie;
+ INIT_LIST_HEAD(&cb_data->node);
+
+ spin_lock(&journaler->meta_lock);
+ if (journaler->closing) {
+ kfree(cb_data);
+ spin_unlock(&journaler->meta_lock);
+ return;
+ }
+ list_add_tail(&journaler->watch_cb_list, &cb_data->node);
+ queue_work(journaler->finish_wq, &journaler->watch_cb_work);
+ spin_unlock(&journaler->meta_lock);
+}
+
+static void journaler_watch_errcb(void *arg, u64 cookie, int err)
+{
+ pr_err("journaler watch error: %d", err);
+}
+
+static int journaler_watch(struct ceph_journaler *journaler)
+{
+ struct ceph_osd_client *osdc = journaler->osdc;
+ struct ceph_osd_linger_request *handle;
+
+ handle = ceph_osdc_watch(osdc, &journaler->header_oid,
+ &journaler->header_oloc, journaler_watch_cb,
+ journaler_watch_errcb, journaler);
+ if (IS_ERR(handle))
+ return PTR_ERR(handle);
+
+ journaler->watch_handle = handle;
+ return 0;
+}
+
+static void journaler_unwatch(struct ceph_journaler *journaler)
+{
+ struct ceph_osd_client *osdc = journaler->osdc;
+ int ret = 0;
+
+ ret = ceph_osdc_unwatch(osdc, journaler->watch_handle);
+ if (ret)
+ pr_err("%s: failed to unwatch: %d", __func__, ret);
+
+ journaler->watch_handle = NULL;
+}
+
+static int copy_object_pos(struct ceph_journaler_object_pos *pos,
+ struct ceph_journaler_object_pos **new_pos)
+{
+ struct ceph_journaler_object_pos *temp_pos;
+
+ temp_pos = kzalloc(sizeof(*temp_pos), GFP_KERNEL);
+ if (temp_pos == NULL) {
+ return -ENOMEM;
+ }
+ INIT_LIST_HEAD(&temp_pos->node);
+ temp_pos->object_num = pos->object_num;
+ temp_pos->tag_tid = pos->tag_tid;
+ temp_pos->entry_tid = pos->entry_tid;
+
+ *new_pos = temp_pos;
+
+ return 0;
+}
+
+int ceph_journaler_open(struct ceph_journaler *journaler)
+{
+ uint8_t order, splay_width;
+ int64_t pool_id;
+ int i = 0, ret = 0;
+ struct ceph_journaler_object_pos *pos = NULL, *next_pos = NULL;
+ struct ceph_journaler_client *client = NULL, *next_client = NULL;
+
+ ret = ceph_cls_journaler_get_immutable_metas(journaler->osdc,
+ &journaler->header_oid,
+ &journaler->header_oloc,
+ &order,
+ &splay_width,
+ &pool_id);
+ if (ret) {
+ pr_err("failed to get immutable metas.");;
+ goto out;
+ }
+
+ spin_lock(&journaler->meta_lock);
+ // set the immutable metas.
+ journaler->order = order;
+ journaler->splay_width = splay_width;
+ journaler->pool_id = pool_id;
+
+ if (journaler->pool_id == -1) {
+ ceph_oloc_copy(&journaler->data_oloc, &journaler->header_oloc);
+ journaler->pool_id = journaler->data_oloc.pool;
+ } else {
+ journaler->data_oloc.pool = journaler->pool_id;
+ }
+
+ // initialize ->obj_recorders and ->obj_replayers.
+ journaler->obj_recorders = kzalloc(sizeof(struct object_recorder) *
+ journaler->splay_width, GFP_KERNEL);
+
+ if (!journaler->obj_recorders) {
+ spin_unlock(&journaler->meta_lock);
+ goto out;
+ }
+
+ journaler->obj_replayers = kzalloc(sizeof(struct object_replayer) *
+ journaler->splay_width, GFP_KERNEL);
+
+ if (!journaler->obj_replayers) {
+ spin_unlock(&journaler->meta_lock);
+ goto free_recorders;
+ }
+
+ for (i = 0; i < journaler->splay_width; i++) {
+ struct object_recorder *obj_recorder = &journaler->obj_recorders[i];
+ struct object_replayer *obj_replayer = &journaler->obj_replayers[i];
+
+ spin_lock_init(&obj_recorder->lock);
+ obj_recorder->overflowed = false;
+ obj_recorder->splay_offset = i;
+ obj_recorder->inflight_append = 0;
+ INIT_LIST_HEAD(&obj_recorder->append_list);
+ INIT_LIST_HEAD(&obj_recorder->overflow_list);
+
+ spin_lock_init(&obj_replayer->lock);
+ obj_replayer->object_num = i;
+ obj_replayer->pos = NULL;
+ INIT_LIST_HEAD(&obj_replayer->entry_list);
+ }
+ spin_unlock(&journaler->meta_lock);
+
+ ret = refresh(journaler, true);
+ if (ret)
+ goto free_replayers;
+
+ spin_lock(&journaler->meta_lock);
+ list_for_each_entry(pos, &journaler->client->object_positions, node) {
+ struct ceph_journaler_object_pos *new_pos = NULL;
+
+ ret = copy_object_pos(pos, &new_pos);
+ if (ret) {
+ spin_unlock(&journaler->meta_lock);
+ goto destroy_clients;
+ }
+
+ list_add_tail(&new_pos->node, &journaler->object_positions_pending);
+ }
+ spin_unlock(&journaler->meta_lock);
+
+ ret = journaler_watch(journaler);
+ if (ret) {
+ pr_err("journaler_watch error: %d", ret);
+ goto free_pos_pending;
+ }
+ return 0;
+
+free_pos_pending:
+ list_for_each_entry_safe(pos, next_pos,
+ &journaler->object_positions_pending, node) {
+ list_del(&pos->node);
+ kfree(pos);
+ }
+destroy_clients:
+ list_for_each_entry_safe(client, next_client,
+ &journaler->clients, node) {
+ list_del(&client->node);
+ destroy_client(client);
+ }
+
+ if (journaler->clients_array != NULL)
+ kfree(journaler->clients_array);
+free_replayers:
+ kfree(journaler->obj_replayers);
+free_recorders:
+ kfree(journaler->obj_recorders);
+out:
+ return ret;
+}
+EXPORT_SYMBOL(ceph_journaler_open);
+
+DEFINE_RB_INSDEL_FUNCS(commit_entry, struct commit_entry, commit_tid, r_node)
+
+void ceph_journaler_close(struct ceph_journaler *journaler)
+{
+ struct ceph_journaler_client *client = NULL, *next = NULL;
+ struct commit_entry *entry = NULL;
+ struct entry_tid *entry_tid = NULL, *entry_tid_next = NULL;
+ struct ceph_journaler_object_pos *pos = NULL, *next_pos = NULL;
+ struct rb_node *n;
+ int i = 0;
+
+ journaler_unwatch(journaler);
+
+ spin_lock(&journaler->meta_lock);
+ journaler->closing = true;
+ spin_unlock(&journaler->meta_lock);
+
+ queue_work(journaler->task_wq, &journaler->flush_work);
+ flush_workqueue(journaler->finish_wq);
+ flush_workqueue(journaler->task_wq);
+
+ list_for_each_entry_safe(pos, next_pos,
+ &journaler->object_positions_pending, node) {
+ list_del(&pos->node);
+ kfree(pos);
+ }
+
+ list_for_each_entry_safe(client, next, &journaler->clients, node) {
+ list_del(&client->node);
+ destroy_client(client);
+ }
+
+ if (journaler->clients_array != NULL)
+ kfree(journaler->clients_array);
+
+ for (n = rb_first(&journaler->commit_entries); n;) {
+ entry = rb_entry(n, struct commit_entry, r_node);
+
+ n = rb_next(n);
+ erase_commit_entry(&journaler->commit_entries, entry);
+ kfree(entry);
+ }
+
+ for (i = 0; i < journaler->splay_width; i++) {
+ struct object_recorder *obj_recorder = &journaler->obj_recorders[i];
+ struct object_replayer *obj_replayer = &journaler->obj_replayers[i];
+
+ spin_lock(&obj_recorder->lock);
+ BUG_ON(!list_empty(&obj_recorder->append_list) ||
+ !list_empty(&obj_recorder->overflow_list));
+ spin_unlock(&obj_recorder->lock);
+
+ spin_lock(&obj_replayer->lock);
+ BUG_ON(!list_empty(&obj_replayer->entry_list));
+ spin_unlock(&obj_replayer->lock);
+ }
+
+ kfree(journaler->obj_recorders);
+ kfree(journaler->obj_replayers);
+
+ list_for_each_entry_safe(entry_tid, entry_tid_next,
+ &journaler->entry_tids, node) {
+ list_del(&entry_tid->node);
+ kfree(entry_tid);
+ }
+
+ WARN_ON(!list_empty(&journaler->watch_cb_list));
+
+ return;
+}
+EXPORT_SYMBOL(ceph_journaler_close);
+
+int ceph_journaler_get_cached_client(struct ceph_journaler *journaler, char *client_id,
+ struct ceph_journaler_client **client_result)
+{
+ struct ceph_journaler_client *client = NULL;
+ int ret = -ENOENT;
+
+ list_for_each_entry(client, &journaler->clients, node) {
+ if (!strcmp(client->id, client_id)) {
+ *client_result = client;
+ ret = 0;
+ break;
+ }
+ }
+
+ return ret;
+}
+EXPORT_SYMBOL(ceph_journaler_get_cached_client);
This commit introduce the generic journaling. This is a initial commit, which only includes some generic functions, such as journaler_create|destroy() and journaler_open|close(). Signed-off-by: Dongsheng Yang <dongsheng.yang@easystack.cn> --- include/linux/ceph/journaler.h | 162 +++++++++++++ net/ceph/Makefile | 3 +- net/ceph/journaler.c | 513 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 677 insertions(+), 1 deletion(-) create mode 100644 include/linux/ceph/journaler.h create mode 100644 net/ceph/journaler.c