@@ -10,6 +10,7 @@
#include <linux/module.h>
#include <linux/kthread.h>
#include <linux/sched.h>
+#include <linux/workqueue.h>
#include <target/target_core_base.h>
#include "dlm_ckv.h"
@@ -33,8 +34,17 @@ struct dlm_ckv_bucket {
int nodeid[64];
void *userarg;
struct completion sync_compl;
+ struct workqueue_struct *notify_wq;
};
+struct dlm_ckv_notify {
+ dlm_ckv_notify_cb notify_cb;
+ char name[DLM_RESNAME_MAXLEN];
+ struct dlm_ckv_lock pre_n;
+ struct dlm_ckv_lock post_n;
+ struct work_struct pre_n_work;
+ struct work_struct post_n_work;
+};
#define DLM_CKV_LVB_SIZE 256
@@ -230,6 +240,179 @@ dlm_ckv_lock_release(struct dlm_ckv_lock *ckv_lock)
}
EXPORT_SYMBOL(dlm_ckv_lock_release);
+static void dlm_cvk_pre_n_bast(void *astarg, int mode)
+{
+ struct dlm_ckv_lksb *lksb = astarg;
+ struct dlm_ckv_lock *ckv_lock;
+ struct dlm_ckv_bucket *bucket;
+ struct dlm_ckv_notify *notify;
+
+ ckv_lock = container_of(lksb, struct dlm_ckv_lock, lksb);
+ notify = container_of(ckv_lock, struct dlm_ckv_notify, pre_n);
+ bucket = ckv_lock->bucket;
+
+ queue_work(bucket->notify_wq, ¬ify->pre_n_work);
+}
+
+static void dlm_cvk_post_n_bast(void *astarg, int mode)
+{
+ struct dlm_ckv_lksb *lksb = astarg;
+ struct dlm_ckv_lock *ckv_lock;
+ struct dlm_ckv_bucket *bucket;
+ struct dlm_ckv_notify *notify;
+
+ ckv_lock = container_of(lksb, struct dlm_ckv_lock, lksb);
+ notify = container_of(ckv_lock, struct dlm_ckv_notify, post_n);
+ bucket = ckv_lock->bucket;
+
+ queue_work(bucket->notify_wq, ¬ify->post_n_work);
+}
+
+static void
+dlm_cvk_pre_n_work(struct work_struct *work)
+{
+ struct dlm_ckv_notify *notify = container_of(work,
+ struct dlm_ckv_notify, pre_n_work);
+ struct dlm_ckv_bucket *bucket = notify->pre_n.bucket;
+
+ dlm_ckv_lock_wait(bucket->ls, DLM_LOCK_EX,
+ ¬ify->post_n.lksb,
+ DLM_LKF_CONVERT,
+ notify->post_n.name, dlm_cvk_post_n_bast);
+ dlm_ckv_lock_wait(bucket->ls, DLM_LOCK_NL,
+ ¬ify->pre_n.lksb,
+ DLM_LKF_CONVERT,
+ notify->pre_n.name, dlm_cvk_pre_n_bast);
+}
+
+static void
+dlm_cvk_post_n_work(struct work_struct *work)
+{
+ struct dlm_ckv_notify *notify = container_of(work,
+ struct dlm_ckv_notify, post_n_work);
+ struct dlm_ckv_bucket *bucket = notify->post_n.bucket;
+
+ dlm_ckv_lock_wait(bucket->ls, DLM_LOCK_EX,
+ ¬ify->pre_n.lksb,
+ DLM_LKF_CONVERT,
+ notify->pre_n.name, dlm_cvk_pre_n_bast);
+
+ notify->notify_cb(bucket->userarg);
+
+ dlm_ckv_lock_wait(bucket->ls, DLM_LOCK_NL,
+ ¬ify->post_n.lksb,
+ DLM_LKF_CONVERT,
+ notify->post_n.name, dlm_cvk_post_n_bast);
+}
+
+struct dlm_ckv_notify *
+dlm_ckv_create_notification(struct dlm_ckv_bucket *bucket, const char *name,
+ dlm_ckv_notify_cb cb)
+{
+ char lockname[DLM_RESNAME_MAXLEN];
+ struct dlm_ckv_notify *notify;
+ int res;
+
+ notify = kzalloc(sizeof(struct dlm_ckv_bucket), GFP_KERNEL);
+ if (!notify)
+ return NULL;
+
+ INIT_WORK(¬ify->post_n_work, dlm_cvk_post_n_work);
+ INIT_WORK(¬ify->pre_n_work, dlm_cvk_pre_n_work);
+
+ strscpy(notify->name, name, DLM_RESNAME_MAXLEN);
+ notify->notify_cb = cb;
+
+ kref_get(&bucket->refcount);
+
+ sprintf(lockname, "pre.%s.%d", name, bucket->local_nodeid);
+ dlm_ckv_lock_init(¬ify->pre_n, bucket, lockname);
+
+ sprintf(lockname, "post.%s.%d", name, bucket->local_nodeid);
+ dlm_ckv_lock_init(¬ify->post_n, bucket, lockname);
+
+ res = dlm_ckv_lock_wait(bucket->ls, DLM_LOCK_EX,
+ ¬ify->pre_n.lksb, 0,
+ notify->pre_n.name, dlm_cvk_pre_n_bast);
+ if (res)
+ goto fail_locks;
+
+ res = dlm_ckv_lock_wait(bucket->ls, DLM_LOCK_NL,
+ ¬ify->post_n.lksb, 0,
+ notify->post_n.name, dlm_cvk_post_n_bast);
+ if (res)
+ goto fail_locks;
+
+ return notify;
+
+fail_locks:
+ kref_put(&bucket->refcount, bucket_release);
+ kfree(notify);
+
+ return NULL;
+}
+EXPORT_SYMBOL(dlm_ckv_create_notification);
+
+void
+dlm_ckv_free_notification(struct dlm_ckv_notify *notify)
+{
+ struct dlm_ckv_bucket *bucket = notify->pre_n.bucket;
+
+ cancel_work_sync(¬ify->pre_n_work);
+ cancel_work_sync(¬ify->post_n_work);
+
+ kfree(notify);
+ kref_put(&bucket->refcount, bucket_release);
+}
+EXPORT_SYMBOL(dlm_ckv_free_notification);
+
+static void
+dlm_ckv_toggle_lock(struct dlm_ckv_bucket *bucket, const char *name_prefix)
+{
+ char lockname[DLM_RESNAME_MAXLEN];
+ struct dlm_ckv_lksb lksb;
+ int res;
+ int i;
+
+ init_completion(&lksb.compl);
+
+ for (i = 0; i < bucket->num_nodes; ++i) {
+ if (bucket->nodeid[i] == bucket->local_nodeid)
+ continue;
+ snprintf(lockname, sizeof(lockname), "%s.%d", name_prefix,
+ bucket->nodeid[i]);
+
+ lksb.lksb.sb_lkid = 0;
+ res = dlm_ckv_lock_wait(bucket->ls, DLM_LOCK_PR,
+ &lksb, 0, lockname, NULL);
+ if (res < 0)
+ pr_warn("Locking %s failed (%d)", lockname, res);
+ if (!lksb.lksb.sb_lkid)
+ continue;
+
+ dlm_ckv_lock_wait(bucket->ls, DLM_LOCK_NL, &lksb,
+ DLM_LKF_CONVERT, lockname, NULL);
+ dlm_ckv_unlock_wait(bucket->ls, &lksb);
+ }
+}
+
+int dlm_ckv_notify(struct dlm_ckv_notify *notify)
+{
+ struct dlm_ckv_bucket *bucket = notify->pre_n.bucket;
+ char lockname[DLM_RESNAME_MAXLEN];
+
+ snprintf(lockname, sizeof(lockname), "post.%s", notify->name);
+ dlm_ckv_toggle_lock(bucket, lockname);
+
+ snprintf(lockname, sizeof(lockname), "pre.%s", notify->name);
+ dlm_ckv_toggle_lock(bucket, lockname);
+
+ snprintf(lockname, sizeof(lockname), "post.%s", notify->name);
+ dlm_ckv_toggle_lock(bucket, lockname);
+
+ return 0;
+}
+EXPORT_SYMBOL(dlm_ckv_notify);
static void bucket_release(struct kref *ref)
{
@@ -237,6 +420,8 @@ static void bucket_release(struct kref *ref)
refcount);
int res;
+ destroy_workqueue(bucket->notify_wq);
+
res = dlm_release_lockspace(bucket->ls, 2);
if (res)
pr_err("forcibly releasing lockspace failed: %d\n",
@@ -264,6 +449,7 @@ dlm_ckv_open_bucket(const char *name, const char *cluster_name, void *userarg)
bucket->userarg = userarg;
init_completion(&bucket->sync_compl);
+ bucket->notify_wq = alloc_ordered_workqueue("notify_wq-%s", 0, name);
err = dlm_new_lockspace(name, cluster_name,
DLM_LSFL_FS | DLM_LSFL_NEWEXCL, DLM_CKV_LVB_SIZE,
@@ -5,6 +5,8 @@
struct dlm_ckv_bucket;
struct dlm_ckv_lock;
+typedef void (*dlm_ckv_notify_cb)(void *userarg);
+
struct dlm_ckv_bucket *dlm_ckv_open_bucket(const char *name,
const char *cluster_name,
void *userarg);
@@ -16,4 +18,10 @@ void dlm_ckv_free_lock(struct dlm_ckv_lock *ckv_lock);
int dlm_ckv_lock_get(struct dlm_ckv_lock *ckv_lock);
int dlm_ckv_lock_release(struct dlm_ckv_lock *ckv_lock);
+struct dlm_ckv_notify *
+dlm_ckv_create_notification(struct dlm_ckv_bucket *bucket, const char *name,
+ dlm_ckv_notify_cb cb);
+void dlm_ckv_free_notification(struct dlm_ckv_notify *notify);
+int dlm_ckv_notify(struct dlm_ckv_notify *notify);
+
#endif
Notification broadcasts over DLM cluster. They do not have any payload. Used to be used to notify other nodes to read an updated data. Signed-off-by: Dmitry Bogdanov <d.bogdanov@yadro.com> --- drivers/target/dlm_ckv.c | 186 +++++++++++++++++++++++++++++++++++++++ drivers/target/dlm_ckv.h | 8 ++ 2 files changed, 194 insertions(+)