diff mbox series

[RFC,33/48] dlm_ckv: add key-value storage service

Message ID 20220803162857.27770-34-d.bogdanov@yadro.com (mailing list archive)
State New, archived
Headers show
Series Target cluster implementation over DLM | expand

Commit Message

Dmitry Bogdanov Nov. 22, 2021, 5:12 p.m. UTC
Simple key-value cluster storage.

Signed-off-by: Dmitry Bogdanov <d.bogdanov@yadro.com>
---
 drivers/target/dlm_ckv.c | 128 +++++++++++++++++++++++++++++++++++++++
 drivers/target/dlm_ckv.h |   9 +++
 2 files changed, 137 insertions(+)
diff mbox series

Patch

diff --git a/drivers/target/dlm_ckv.c b/drivers/target/dlm_ckv.c
index cffe4f2dcb82..22c5f0827595 100644
--- a/drivers/target/dlm_ckv.c
+++ b/drivers/target/dlm_ckv.c
@@ -25,6 +25,10 @@  struct dlm_ckv_lock {
 	char name[DLM_RESNAME_MAXLEN];
 };
 
+struct dlm_ckv_kv {
+	struct dlm_ckv_lock lock;
+};
+
 struct dlm_ckv_bucket {
 	dlm_lockspace_t *ls;
 	struct kref refcount;
@@ -240,6 +244,130 @@  dlm_ckv_lock_release(struct dlm_ckv_lock *ckv_lock)
 }
 EXPORT_SYMBOL(dlm_ckv_lock_release);
 
+struct dlm_ckv_kv *
+dlm_ckv_create_kv(struct dlm_ckv_bucket *bucket, const char *key)
+{
+	struct dlm_ckv_lock *ckv_lock;
+	void *ptr;
+	int res;
+
+	ptr = kzalloc(DLM_CKV_LVB_SIZE, GFP_KERNEL);
+	if (!ptr)
+		return NULL;
+
+	ckv_lock = dlm_ckv_create_lock(bucket, key);
+	if (!ckv_lock)
+		goto create_fail;
+
+	ckv_lock->lksb.lksb.sb_lvbptr = ptr;
+
+	res = dlm_ckv_lock_wait(ckv_lock->bucket->ls, DLM_LOCK_NL,
+				&ckv_lock->lksb, 0, ckv_lock->name, NULL);
+	if (res)
+		goto lock_failed;
+
+	return (struct dlm_ckv_kv *)ckv_lock;
+
+lock_failed:
+	dlm_ckv_free_lock(ckv_lock);
+create_fail:
+	kfree(ptr);
+	return NULL;
+}
+EXPORT_SYMBOL(dlm_ckv_create_kv);
+
+void
+dlm_ckv_free_kv(struct dlm_ckv_kv *kv)
+{
+	struct dlm_ckv_bucket *bucket = kv->lock.bucket;
+	struct dlm_ckv_lock *ckv_lock = &kv->lock;
+
+	dlm_ckv_unlock_wait(bucket->ls, &ckv_lock->lksb);
+
+	kfree(ckv_lock->lksb.lksb.sb_lvbptr);
+	kfree(ckv_lock);
+
+	kref_put(&bucket->refcount, bucket_release);
+
+}
+EXPORT_SYMBOL(dlm_ckv_free_kv);
+
+int
+dlm_ckv_get(struct dlm_ckv_kv *kv, char *value, size_t len)
+{
+	struct dlm_ckv_lock *ckv_lock = &kv->lock;
+	struct dlm_ckv_bucket *bucket;
+	int res;
+
+	BUG_ON(!ckv_lock);
+	bucket = ckv_lock->bucket;
+
+	res = dlm_ckv_lock_wait(ckv_lock->bucket->ls, DLM_LOCK_CR,
+				&ckv_lock->lksb,
+				DLM_LKF_VALBLK | DLM_LKF_CONVERT,
+				ckv_lock->name, NULL);
+	if (res) {
+		pr_info("Can not get lock %s, rc=%d\n",
+			ckv_lock->name, res);
+		goto fail;
+	}
+
+	if (ckv_lock->lksb.lksb.sb_flags & DLM_SBF_VALNOTVALID) {
+		pr_info(" %s LVB was invalid\n", ckv_lock->name);
+		memset(value, 0, len);
+	} else
+		memcpy(value, ckv_lock->lksb.lksb.sb_lvbptr, len);
+
+	res = dlm_ckv_lock_wait(ckv_lock->bucket->ls, DLM_LOCK_NL,
+				&ckv_lock->lksb,
+				DLM_LKF_CONVERT,
+				ckv_lock->name, NULL);
+	if (res) {
+		pr_info("Can not release lock %s\n", ckv_lock->name);
+		goto fail;
+	}
+
+	return res;
+fail:
+	return res;
+}
+EXPORT_SYMBOL(dlm_ckv_get);
+
+int
+dlm_ckv_set(struct dlm_ckv_kv *kv, const char *value, size_t len)
+{
+	struct dlm_ckv_lock *ckv_lock = &kv->lock;
+	struct dlm_ckv_bucket *bucket;
+	int res;
+
+	BUG_ON(!ckv_lock);
+	bucket = ckv_lock->bucket;
+
+	res = dlm_ckv_lock_wait(ckv_lock->bucket->ls, DLM_LOCK_EX,
+				&ckv_lock->lksb,
+				DLM_LKF_CONVERT,
+				ckv_lock->name, NULL);
+	if (res) {
+		pr_info("Can not get lock %s\n", ckv_lock->name);
+		goto fail;
+	}
+
+	memcpy(ckv_lock->lksb.lksb.sb_lvbptr, value, len);
+	res = dlm_ckv_lock_wait(ckv_lock->bucket->ls, DLM_LOCK_NL,
+				&ckv_lock->lksb,
+				DLM_LKF_VALBLK | DLM_LKF_CONVERT,
+				ckv_lock->name, NULL);
+	if (res) {
+		pr_info("Can not release lock %s\n", ckv_lock->name);
+		goto fail;
+	}
+
+	return res;
+fail:
+	return res;
+}
+EXPORT_SYMBOL(dlm_ckv_set);
+
 static void dlm_cvk_pre_n_bast(void *astarg, int mode)
 {
 	struct dlm_ckv_lksb *lksb = astarg;
diff --git a/drivers/target/dlm_ckv.h b/drivers/target/dlm_ckv.h
index 080d9498f5f9..c01904313f1e 100644
--- a/drivers/target/dlm_ckv.h
+++ b/drivers/target/dlm_ckv.h
@@ -4,6 +4,9 @@ 
 
 struct dlm_ckv_bucket;
 struct dlm_ckv_lock;
+struct dlm_ckv_kv;
+
+#define DLM_CKV_VALUE_MAX_SIZE	255
 
 typedef void (*dlm_ckv_notify_cb)(void *userarg);
 
@@ -18,6 +21,12 @@  void dlm_ckv_free_lock(struct dlm_ckv_lock *ckv_lock);
 int dlm_ckv_lock_get(struct dlm_ckv_lock *ckv_lock);
 int dlm_ckv_lock_release(struct dlm_ckv_lock *ckv_lock);
 
+struct dlm_ckv_kv *
+dlm_ckv_create_kv(struct dlm_ckv_bucket *bucket, const char *key);
+void dlm_ckv_free_kv(struct dlm_ckv_kv *kv);
+int dlm_ckv_get(struct dlm_ckv_kv *kv, char *value, size_t len);
+int dlm_ckv_set(struct dlm_ckv_kv *kv, const char *value, size_t len);
+
 struct dlm_ckv_notify *
 dlm_ckv_create_notification(struct dlm_ckv_bucket *bucket, const char *name,
 			    dlm_ckv_notify_cb cb);