@@ -35,6 +35,12 @@ config TCM_PSCSI
Say Y here to enable the TCM/pSCSI subsystem plugin for non-buffered
passthrough access to Linux/SCSI device
+config DLM_CKV
+ tristate "Cluster key value storage over DLM"
+ depends on DLM
+ help
+ Say Y here to enable the cluster key value storage over DLM
+
config TCM_USER2
tristate "TCM/USER Subsystem Plugin for Linux"
depends on UIO && NET
@@ -30,3 +30,5 @@ obj-$(CONFIG_LOOPBACK_TARGET) += loopback/
obj-$(CONFIG_TCM_FC) += tcm_fc/
obj-$(CONFIG_ISCSI_TARGET) += iscsi/
obj-$(CONFIG_SBP_TARGET) += sbp/
+
+obj-$(CONFIG_DLM_CKV) += dlm_ckv.o
new file mode 100644
@@ -0,0 +1,323 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+
+#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
+
+#include <asm-generic/errno-base.h>
+#include <linux/kthread.h>
+#include <linux/dlmconstants.h>
+#include <linux/mutex.h>
+#include <linux/dlm.h>
+#include <linux/module.h>
+#include <linux/kthread.h>
+#include <linux/sched.h>
+#include <target/target_core_base.h>
+#include "dlm_ckv.h"
+
+struct dlm_ckv_lksb {
+ struct dlm_lksb lksb;
+ struct completion compl;
+};
+
+struct dlm_ckv_lock {
+ struct dlm_ckv_bucket *bucket;
+ struct dlm_ckv_lksb lksb;
+ char name[DLM_RESNAME_MAXLEN];
+};
+
+struct dlm_ckv_bucket {
+ dlm_lockspace_t *ls;
+ struct kref refcount;
+ u32 local_nodeid;
+ u32 local_slotid;
+ size_t num_nodes;
+ int nodeid[64];
+ void *userarg;
+ struct completion sync_compl;
+};
+
+
+#define DLM_CKV_LVB_SIZE 256
+
+static void bucket_release(struct kref *ref);
+
+/* dlm calls before it does lock recovery */
+
+static void dlm_ckv_recover_prep(void *arg)
+{
+
+}
+
+/* dlm calls after recover_prep has been completed on all lockspace members;
+ * identifies slot/jid of failed member
+ */
+
+static void dlm_ckv_recover_slot(void *arg, struct dlm_slot *slot)
+{
+ pr_info("nodeid %d left the cluster\n", slot->nodeid);
+}
+
+/* dlm calls after recover_slot and after it completes lock recovery */
+
+static void dlm_ckv_recover_done(void *arg, struct dlm_slot *slots, int num_slots,
+ int our_slot, uint32_t generation)
+{
+ struct dlm_ckv_bucket *bucket = arg;
+ int i;
+
+ for (i = 0; i < num_slots; i++) {
+ bucket->nodeid[i] = slots[i].nodeid;
+ if (slots[i].slot == our_slot)
+ bucket->local_nodeid = slots[i].nodeid;
+ }
+ bucket->local_slotid = our_slot;
+ bucket->num_nodes = num_slots;
+ complete(&bucket->sync_compl);
+}
+
+static const struct dlm_lockspace_ops dlm_ckv_lockspace_ops = {
+ .recover_prep = dlm_ckv_recover_prep,
+ .recover_slot = dlm_ckv_recover_slot,
+ .recover_done = dlm_ckv_recover_done,
+};
+
+static void dlm_ast(void *astarg)
+{
+ struct dlm_ckv_lksb *dlm_ckv_lksb = astarg;
+
+ complete(&dlm_ckv_lksb->compl);
+}
+
+/*
+ * dlm_ckv_cancel - Synchronously cancel a pending dlm_lock() operation
+ */
+static int dlm_ckv_cancel(dlm_lockspace_t *ls, struct dlm_ckv_lksb *lksb,
+ int flags, const char *name)
+{
+ int res;
+
+ res = dlm_unlock(ls, lksb->lksb.sb_lkid,
+ DLM_LKF_CANCEL | (flags & DLM_LKF_VALBLK),
+ &lksb->lksb, lksb);
+ if (res < 0)
+ goto out;
+ res = wait_for_completion_timeout(&lksb->compl, 10 * HZ);
+
+out:
+ return res;
+}
+
+/**
+ * dlm_ckv_lock_wait - Wait until a DLM lock has been granted
+ * @ls: DLM lock space.
+ * @mode: DLM lock mode.
+ * @lksb: DLM lock status block.
+ * @flags: DLM flags.
+ * @name: DLM lock name. Only required for non-conversion requests.
+ * @bast: AST to be invoked in case this lock blocks another one.
+ */
+static int dlm_ckv_lock_wait(dlm_lockspace_t *ls, int mode,
+ struct dlm_ckv_lksb *lksb, int flags,
+ const char *name, void (*bast)(void *, int))
+{
+ int res;
+
+ res = dlm_lock(ls, mode, &lksb->lksb, flags,
+ (void *)name, name ? strlen(name) : 0, 0,
+ dlm_ast, lksb, bast);
+ if (res < 0)
+ goto out;
+ res = wait_for_completion_timeout(&lksb->compl, 60 * HZ);
+ if (res > 0)
+ res = lksb->lksb.sb_status;
+ else if (res == 0)
+ res = -ETIMEDOUT;
+ if (res < 0) {
+ int res2 = dlm_ckv_cancel(ls, lksb, flags, name);
+
+ if (res2 < 0)
+ pr_warn("canceling lock %s / %08x failed: %d\n",
+ name ? : "?", lksb->lksb.sb_lkid, res2);
+ }
+
+out:
+ return res;
+}
+
+/*
+ * dlm_ckv_unlock_wait - Release a DLM lock
+ */
+static int dlm_ckv_unlock_wait(dlm_lockspace_t *ls, struct dlm_ckv_lksb *lksb)
+{
+ int res;
+
+ res = dlm_unlock(ls, lksb->lksb.sb_lkid, 0, &lksb->lksb, lksb);
+ if (res < 0)
+ goto out;
+ res = wait_for_completion_timeout(&lksb->compl, 60 * HZ);
+ if (res > 0) {
+ res = lksb->lksb.sb_status;
+ if (res == -DLM_EUNLOCK || res == -DLM_ECANCEL)
+ res = 0;
+ } else if (res == 0) {
+ res = -ETIMEDOUT;
+ }
+
+out:
+ return res;
+}
+
+static void
+dlm_ckv_lock_init(struct dlm_ckv_lock *ckv_lock,
+ struct dlm_ckv_bucket *bucket,
+ const char *name)
+{
+ init_completion(&ckv_lock->lksb.compl);
+ strscpy(ckv_lock->name, name, DLM_RESNAME_MAXLEN);
+ ckv_lock->bucket = bucket;
+}
+
+struct dlm_ckv_lock *
+dlm_ckv_create_lock(struct dlm_ckv_bucket *bucket, const char *name)
+{
+ struct dlm_ckv_lock *ckv_lock;
+
+ ckv_lock = kzalloc(sizeof(struct dlm_ckv_lock), GFP_KERNEL);
+ if (!ckv_lock)
+ return NULL;
+
+ kref_get(&bucket->refcount);
+ dlm_ckv_lock_init(ckv_lock, bucket, name);
+
+ return ckv_lock;
+}
+EXPORT_SYMBOL(dlm_ckv_create_lock);
+
+void
+dlm_ckv_free_lock(struct dlm_ckv_lock *ckv_lock)
+{
+ struct dlm_ckv_bucket *bucket = ckv_lock->bucket;
+
+ kfree(ckv_lock);
+
+ kref_put(&bucket->refcount, bucket_release);
+}
+EXPORT_SYMBOL(dlm_ckv_free_lock);
+
+int
+dlm_ckv_lock_get(struct dlm_ckv_lock *ckv_lock)
+{
+ int res;
+
+ BUG_ON(!ckv_lock);
+
+ res = dlm_ckv_lock_wait(ckv_lock->bucket->ls, DLM_LOCK_EX,
+ &ckv_lock->lksb, 0, ckv_lock->name, NULL);
+
+ return res;
+}
+EXPORT_SYMBOL(dlm_ckv_lock_get);
+
+int
+dlm_ckv_lock_release(struct dlm_ckv_lock *ckv_lock)
+{
+ int res;
+
+ BUG_ON(!ckv_lock);
+
+ res = dlm_ckv_unlock_wait(ckv_lock->bucket->ls, &ckv_lock->lksb);
+
+ return res;
+}
+EXPORT_SYMBOL(dlm_ckv_lock_release);
+
+
+static void bucket_release(struct kref *ref)
+{
+ struct dlm_ckv_bucket *bucket = container_of(ref, struct dlm_ckv_bucket,
+ refcount);
+ int res;
+
+ res = dlm_release_lockspace(bucket->ls, 2);
+ if (res)
+ pr_err("forcibly releasing lockspace failed: %d\n",
+ res);
+
+ kfree(bucket);
+}
+
+struct dlm_ckv_bucket *
+dlm_ckv_open_bucket(const char *name, const char *cluster_name, void *userarg)
+{
+ struct dlm_ckv_bucket *bucket;
+ int name_len = strlen(name);
+ int ops_result;
+ int err;
+
+ if (!name)
+ return ERR_PTR(-EINVAL);
+
+ if (name_len > DLM_LOCKSPACE_LEN)
+ return ERR_PTR(-EINVAL);
+
+ bucket = kzalloc(sizeof(struct dlm_ckv_bucket), GFP_KERNEL);
+ kref_init(&bucket->refcount);
+
+ bucket->userarg = userarg;
+ init_completion(&bucket->sync_compl);
+
+ err = dlm_new_lockspace(name, cluster_name,
+ DLM_LSFL_FS | DLM_LSFL_NEWEXCL, DLM_CKV_LVB_SIZE,
+ &dlm_ckv_lockspace_ops, bucket, &ops_result,
+ &bucket->ls);
+ if (err) {
+ pr_err("dlm_new_lockspace error %d\n", err);
+ goto fail_free;
+ }
+
+ if (ops_result < 0) {
+ pr_err("dlm does not support ops callbacks\n");
+ err = -EOPNOTSUPP;
+ goto fail_free;
+ }
+
+ wait_for_completion_timeout(&bucket->sync_compl, 10 * HZ);
+ if (bucket->num_nodes == 0) {
+ pr_err("Cluster joining timed out\n");
+ goto fail_init;
+ }
+
+ return bucket;
+
+fail_init:
+ dlm_release_lockspace(bucket->ls, 2);
+fail_free:
+ kfree(bucket);
+
+ return NULL;
+}
+EXPORT_SYMBOL(dlm_ckv_open_bucket);
+
+int dlm_ckv_close_bucket(struct dlm_ckv_bucket *bucket)
+{
+ kref_put(&bucket->refcount, bucket_release);
+
+ return 0;
+}
+EXPORT_SYMBOL(dlm_ckv_close_bucket);
+
+static int __init dlm_ckv_module_init(void)
+{
+ return 0;
+}
+
+static void __exit dlm_ckv_module_exit(void)
+{
+
+}
+
+MODULE_DESCRIPTION("Cluster KV storage over DLM");
+MODULE_AUTHOR("Dmitry Bogdanov <d.bogdanov@yadro.com>");
+MODULE_LICENSE("GPL");
+
+module_init(dlm_ckv_module_init);
+module_exit(dlm_ckv_module_exit);
new file mode 100644
@@ -0,0 +1,19 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef DLM_CKV_H
+#define DLM_CKV_H
+
+struct dlm_ckv_bucket;
+struct dlm_ckv_lock;
+
+struct dlm_ckv_bucket *dlm_ckv_open_bucket(const char *name,
+ const char *cluster_name,
+ void *userarg);
+int dlm_ckv_close_bucket(struct dlm_ckv_bucket *bucket);
+
+struct dlm_ckv_lock *
+dlm_ckv_create_lock(struct dlm_ckv_bucket *bucket, const char *name);
+void dlm_ckv_free_lock(struct dlm_ckv_lock *ckv_lock);
+int dlm_ckv_lock_get(struct dlm_ckv_lock *ckv_lock);
+int dlm_ckv_lock_release(struct dlm_ckv_lock *ckv_lock);
+
+#endif
Introduce the first version of DLM CKV module that could be used by different kernel subsystems to share some information in a cluster. This commit has just cluster level locks. Signed-off-by: Dmitry Bogdanov <d.bogdanov@yadro.com> --- drivers/target/Kconfig | 6 + drivers/target/Makefile | 2 + drivers/target/dlm_ckv.c | 323 +++++++++++++++++++++++++++++++++++++++ drivers/target/dlm_ckv.h | 19 +++ 4 files changed, 350 insertions(+) create mode 100644 drivers/target/dlm_ckv.c create mode 100644 drivers/target/dlm_ckv.h