===================================================================
@@ -0,0 +1,649 @@
+/*
+ * Copyright (C) 2009 Red Hat, Inc. All rights reserved.
+ *
+ * This file is released under the GPL.
+ */
+#include <linux/module.h>
+#include <linux/slab.h>
+#include <linux/mempool.h>
+#include <linux/workqueue.h>
+#include <linux/dlm.h>
+#include <linux/device-mapper.h>
+#include <linux/fs.h> /* For READ/WRITE macros only */
+
+#include "dm-cluster-locking.h"
+
+#define DM_MSG_PREFIX "dm-cluster-locking"
+#define DMCL_MEMPOOL_LOCK_COUNT 32 /* Arbitrary */
+
+struct dmcl_lockspace {
+ struct list_head list;
+
+ char *name;
+ uint32_t name_index;
+
+ dlm_lockspace_t *lockspace;
+};
+
+struct dmcl_lockspace *dmcl_default_lockspace = NULL;
+static LIST_HEAD(lockspace_list_head);
+static DEFINE_SPINLOCK(lockspace_list_lock);
+
+struct dmcl_lock {
+ struct list_head list;
+ struct dmcl_lockspace *ls;
+
+ char *name;
+ uint32_t name_index;
+
+ uint32_t flags; /* DMCL_CACHE_[READ|WRITE]_LOCKS */
+
+ struct mutex mutex;
+ int dlm_mode;
+ int local_mode;
+ int bast_mode; /* The mode another machine is requesting */
+
+ struct dlm_lksb lksb;
+ struct completion dlm_completion;
+
+ uint64_t local_counter;
+ uint64_t dlm_counter;
+};
+
+struct dmcl_bast_assist_s {
+ struct list_head bast_list;
+ spinlock_t lock;
+
+ struct work_struct ws;
+};
+static struct dmcl_bast_assist_s dmcl_bast_assist;
+
+/*
+ * dmcl_alloc_lockspace
+ * @name: Unique cluster-wide name for the lockspace
+ *
+ * This function is used to create new lockspaces from which
+ * locks can be generated. For now, there is only one default
+ * lock space, "dm-cluster-locking". If there is a need in
+ * the future (due to lock name collisions) for users to have
+ * their own lockspaces, then I can export this function.
+ *
+ * Returns: ptr or ERR_PTR
+ */
+static struct dmcl_lockspace *dmcl_alloc_lockspace(char *name)
+{
+ int len, r;
+ struct dmcl_lockspace *ls, *tmp;
+
+ ls = kzalloc(sizeof(*ls), GFP_KERNEL);
+ if (!ls)
+ return ERR_PTR(-ENOMEM);
+
+ len = strlen(name) + 1;
+ ls->name = kzalloc(len, GFP_KERNEL);
+ if (!ls->name) {
+ kfree(ls);
+ return ERR_PTR(-ENOMEM);
+ }
+ strcpy(ls->name, name);
+
+ /*
+ * We allow 'name' to be any length the user wants, but
+ * with the DLM, we can only create a lockspace with a
+ * name that is DLM_RESNAME_MAXLEN in size. So, we will
+ * use the last DLM_RESNAME_MAXLEN characters given as the
+ * lockspace name and check for conflicts.
+ */
+ ls->name_index = (len > DLM_RESNAME_MAXLEN) ?
+ len - DLM_RESNAME_MAXLEN : 0;
+
+ spin_lock(&lockspace_list_lock);
+ list_for_each_entry(tmp, &lockspace_list_head, list)
+ if (!strcmp(tmp->name + tmp->name_index,
+ ls->name + ls->name_index)) {
+ kfree(ls->name);
+ kfree(ls);
+
+ spin_unlock(&lockspace_list_lock);
+ return ERR_PTR(-EBUSY);
+ }
+ list_add(&ls->list, &lockspace_list_head);
+ spin_unlock(&lockspace_list_lock);
+
+ r = dlm_new_lockspace(ls->name + ls->name_index,
+ strlen(ls->name + ls->name_index),
+ &ls->lockspace, 0, sizeof(uint64_t));
+ if (r) {
+ DMERR("Failed to create lockspace: %s", name);
+ spin_lock(&lockspace_list_lock);
+ list_del(&ls->list);
+ spin_unlock(&lockspace_list_lock);
+ kfree(ls->name);
+ kfree(ls);
+ return ERR_PTR(r);
+ }
+
+ return ls;
+}
+
+/*
+ * dmcl_free_lockspace
+ *
+ * Exportable w/ dmcl_alloc_lockspace if necessary.
+ */
+static void dmcl_free_lockspace(struct dmcl_lockspace *ls)
+{
+ spin_lock(&lockspace_list_lock);
+ list_del(&ls->list);
+ spin_unlock(&lockspace_list_lock);
+
+ dlm_release_lockspace(ls->lockspace, 1);
+ kfree(ls->name);
+ kfree(ls);
+}
+
+static int lock_return_value(struct dmcl_lock *l)
+{
+ int r = 0;
+ uint64_t old = l->local_counter;
+
+ if (l->lksb.sb_status)
+ return l->lksb.sb_status;
+
+ l->local_counter = l->dlm_counter;
+
+ /*
+ * If the counters differ, then someone else has
+ * acquired the lock exclusively while it has been
+ * unlocked for us.
+ */
+ if ((old == (uint64_t)-1) || (old != l->dlm_counter))
+ r = 1;
+
+ return r;
+}
+
+/*
+ * dmcl_ast_callback
+ * @context: dmcl_lock ptr
+ *
+ * This function is called asynchronously by the DLM to
+ * notify the completion of a lock operation.
+ */
+static void dmcl_ast_callback(void *context)
+{
+ struct dmcl_lock *l = context;
+
+ BUG_ON(!l);
+
+ complete(&l->dlm_completion);
+}
+
+/*
+ * dmcl_bast_callback
+ * @context: dmcl_lock ptr
+ * @mode: The mode needed by another node in the cluster
+ *
+ * This function is called asynchronously by the DLM when another
+ * node in the cluster is requesting a lock in such a way that
+ * our possession of the same lock is blocking that request. (For
+ * example, the other node may want an EX lock and we are holding/caching
+ * it as SH.
+ */
+static void dmcl_bast_callback(void *context, int mode)
+{
+ struct dmcl_lock *l = context;
+
+ l->bast_mode = mode;
+
+ spin_lock(&(dmcl_bast_assist.lock));
+ list_add(&l->list, &(dmcl_bast_assist.bast_list));
+ spin_unlock(&(dmcl_bast_assist.lock));
+
+ /* FIXME: It might be better if we had our own work queue */
+ schedule_work(&(dmcl_bast_assist.ws));
+}
+
+/*
+ * release_cached_lock
+ * @l
+ * @mode
+ *
+ * This function down-converts a lock into a mode that is compatible
+ * with 'mode'. (E.g. If we are caching the lock EX and the lock
+ * has been requested SH, then we must at least down-convert to SH.)
+ */
+static int release_cached_lock(struct dmcl_lock *l, int mode)
+{
+ int r;
+ int old_mode;
+
+ mutex_lock(&l->mutex);
+ old_mode = l->dlm_mode;
+
+ /*
+ * If the local representation of the lock is not DLM_LOCK_NL,
+ * then we must set the dlm value to DLM_LOCK_NL. This will
+ * force us to put the dlm lock into DLM_LOCK_NL when the lock
+ * is locally released later.
+ */
+ if (l->local_mode != DLM_LOCK_NL) {
+ l->dlm_mode = DLM_LOCK_NL;
+ mutex_unlock(&l->mutex);
+ return 0;
+ }
+
+ /*
+ * If the local representation of the lock is not
+ * held (i.e. DLM_LOCK_NL), then we can down-convert the DLM
+ * to whatever is compatible. If compatible, I convert the
+ * DLM lock to DLM_LOCK_CR - this way, we still have the lock
+ * cached for reads. It may prove to be better to simply drop
+ * the lock entirely though...
+ */
+ if (mode == DLM_LOCK_EX) {
+ /* Another machine needs EX, must drop lock */
+ r = dlm_lock(l->ls->lockspace, DLM_LOCK_NL, &l->lksb,
+ DLM_LKF_CONVERT | DLM_LKF_VALBLK,
+ l->name + l->name_index,
+ strlen(l->name + l->name_index), 0,
+ dmcl_ast_callback, l, dmcl_bast_callback);
+ if (unlikely(r)) {
+ DMERR("Failed to convert lock \"%s\" to DLM_LOCK_NL",
+ l->name);
+ mutex_unlock(&l->mutex);
+ return r;
+ }
+ l->dlm_mode = DLM_LOCK_NL;
+ } else if (l->dlm_mode == DLM_LOCK_EX) {
+ /* Convert the lock to SH, and it will be compatible */
+ r = dlm_lock(l->ls->lockspace, DLM_LOCK_CR, &l->lksb,
+ DLM_LKF_CONVERT | DLM_LKF_VALBLK,
+ l->name + l->name_index,
+ strlen(l->name + l->name_index), 0,
+ dmcl_ast_callback, l, dmcl_bast_callback);
+ if (unlikely(r)) {
+ DMERR("Failed to convert lock \"%s\" to DLM_LOCK_CR",
+ l->name);
+ mutex_unlock(&l->mutex);
+ return r;
+ }
+ l->dlm_mode = DLM_LOCK_CR;
+ } else {
+ DMERR("LOCK SHOULD ALREADY BE COMPATIBLE!");
+ BUG();
+ }
+
+ /*
+ * FIXME: It would be better not to wait here. The
+ * calling function is processing a list. Would be
+ * better to use an async callback to put the lock
+ * back on the bast list and reprocess in the event
+ * of an unlikely failure.
+ *
+ * This would make the mutex handling a little more
+ * complicated, but it would probably be worth it for
+ * performance.
+ */
+ wait_for_completion(&l->dlm_completion);
+ r = lock_return_value(l);
+
+ /*
+ * Failure of the DLM to make the conversion means the lock
+ * is still in the state we meant to change it from. Reset that.
+ */
+ if (r < 0)
+ l->dlm_mode = old_mode;
+
+ mutex_unlock(&l->mutex);
+ return (r < 0) ? r : 0;
+}
+
+/*
+ * dmcl_bast_process_requests
+ * @work
+ *
+ * This function processes the outstanding requests to release
+ * locks that we may have cached.
+ */
+static void dmcl_process_bast_requests(struct work_struct *work)
+{
+ int r, wake = 0;
+ LIST_HEAD(l);
+ struct dmcl_lock *lock, *tmp;
+ struct dmcl_bast_assist_s *bast_assist;
+
+ bast_assist = container_of(work, struct dmcl_bast_assist_s, ws);
+
+ spin_lock(&bast_assist->lock);
+ list_splice_init(&bast_assist->bast_list, &l);
+ spin_unlock(&bast_assist->lock);
+
+ list_for_each_entry_safe(lock, tmp, &l, list) {
+ r = release_cached_lock(lock, lock->bast_mode);
+ if (r) {
+ DMERR("Failed to complete 'bast' request on %s/%s",
+ lock->ls->name, lock->name);
+
+ /*
+ * Leave the lock on the list so we can attempt
+ * to unlock it again later.
+ */
+ wake = 1;
+ continue;
+ }
+ lock->bast_mode = 0;
+ list_del(&lock->list);
+ }
+
+ if (wake)
+ schedule_work(&bast_assist->ws);
+}
+
+static struct dmcl_lock *_allocate_lock(struct dmcl_lockspace *ls,
+ const char *lock_name, uint64_t flags)
+{
+ size_t len = strlen(lock_name);
+ struct dmcl_lock *new;
+
+ if (!ls) {
+ DMERR("No valid lockspace given!");
+ return NULL;
+ }
+
+ new = kzalloc(sizeof(*new), GFP_NOIO);
+ if (!new)
+ return NULL;
+
+ new->name = kzalloc(len + 1, GFP_NOIO);
+ if (!new->name) {
+ kfree(new);
+ return NULL;
+ }
+
+ INIT_LIST_HEAD(&new->list);
+ new->ls = ls;
+
+ strcpy(new->name, lock_name);
+ new->name_index = (len > DLM_RESNAME_MAXLEN) ?
+ len - DLM_RESNAME_MAXLEN : 0;
+
+ new->flags = flags;
+
+ mutex_init(&new->mutex);
+ new->dlm_mode = DLM_LOCK_NL;
+ new->local_mode = DLM_LOCK_NL;
+ init_completion(&new->dlm_completion);
+ new->local_counter = (uint64_t)-1;
+ new->lksb.sb_lvbptr = (char *)&new->dlm_counter;
+
+ return new;
+}
+
+/*
+ * dmcl_alloc_lock_via_lockspace
+ * ls: lockspace to allocate lock from. If NULL, use default lock space.
+ * lock_name: Unique cluster-wide lock name
+ * flags: Set attributes of the lock, like caching
+ *
+ * This function allocates locks from a particular lockspace. It is not
+ * exported right now. We assume the default lockspace (by calling
+ * 'dmcl_alloc_lock'). Exportable w/ dmcl_alloc_lockspace if necessary.
+ *
+ * Returns: ptr or ERR_PTR
+ */
+static struct dmcl_lock *
+dmcl_alloc_lock_via_lockspace(struct dmcl_lockspace *ls,
+ const char *lock_name, uint64_t flags)
+{
+ int r;
+ struct dmcl_lock *l;
+
+ if (!ls) {
+ if (unlikely(!dmcl_default_lockspace)) {
+ ls = dmcl_alloc_lockspace(DM_MSG_PREFIX);
+ if (IS_ERR(ls))
+ return (void *)ls;
+ dmcl_default_lockspace = ls;
+ }
+ ls = dmcl_default_lockspace;
+ }
+ l = _allocate_lock(ls, lock_name, flags);
+ if (!l)
+ return ERR_PTR(-ENOMEM);
+
+ r = dlm_lock(ls->lockspace, DLM_LOCK_NL, &l->lksb,
+ DLM_LKF_EXPEDITE | DLM_LKF_VALBLK,
+ l->name + l->name_index, strlen(l->name + l->name_index),
+ 0, dmcl_ast_callback, l, dmcl_bast_callback);
+ if (r) {
+ DMERR("dlm_lock failure: %d", r);
+ return ERR_PTR(r);
+ }
+
+ wait_for_completion(&l->dlm_completion);
+ r = lock_return_value(l);
+ if (r < 0) {
+ DMERR("Asynchronous dlm_lock failure: %d", r);
+ return ERR_PTR(r);
+ }
+ return l;
+}
+
+/*
+ * dmcl_alloc_lock
+ * @lock_name
+ * @flags
+ *
+ * Shorthand for 'dmcl_alloc_lock_via_lockspace(NULL, lock_name, flags)'
+ *
+ * Returns: ptr or ERR_PTR
+ */
+struct dmcl_lock *dmcl_alloc_lock(const char *lock_name, uint64_t flags)
+{
+ return dmcl_alloc_lock_via_lockspace(NULL, lock_name, flags);
+}
+EXPORT_SYMBOL(dmcl_alloc_lock);
+
+void dmcl_free_lock(struct dmcl_lock *l)
+{
+ int r;
+
+ BUG_ON(l->local_mode != DLM_LOCK_NL);
+
+ /*
+ * Free all DLM lock structures. Doesn't matter if the
+ * dlm_mode is DLM_LOCK_NL, DLM_LOCK_CR, or DLM_LOCK_EX
+ */
+ r = dlm_unlock(l->ls->lockspace, l->lksb.sb_lkid,
+ DLM_LKF_FORCEUNLOCK, NULL, l);
+
+ /* Force release should never fail */
+ BUG_ON(r);
+
+ wait_for_completion(&l->dlm_completion);
+ if (lock_return_value(l) != -DLM_EUNLOCK)
+ DMERR("dlm_unlock failed on %s/%s: %d",
+ l->ls->name, l->name, lock_return_value(l));
+
+ kfree(l->name);
+ kfree(l);
+}
+EXPORT_SYMBOL(dmcl_free_lock);
+
+int dmcl_lock(struct dmcl_lock *l, int rw)
+{
+ int r;
+ int mode;
+
+ BUG_ON(!l);
+
+ if ((rw != WRITE) && (rw != READ)) {
+ DMERR("Lock attempt where mode != READ/WRITE");
+ BUG();
+ }
+ mode = (rw == WRITE) ? DLM_LOCK_EX : DLM_LOCK_CR;
+
+ if (l->local_mode != DLM_LOCK_NL) {
+ DMERR("Locks cannot be acquired multiple times");
+ BUG();
+ }
+
+ mutex_lock(&l->mutex);
+ /*
+ * Is the lock already cached in the needed state?
+ */
+ if (mode == l->dlm_mode) {
+ l->local_mode = mode;
+
+ mutex_unlock(&l->mutex);
+ return 0;
+ }
+
+ /*
+ * At this point local_mode is DLM_LOCK_NL. Given that the DLM
+ * lock can be cached, we can have any of the following:
+ * dlm_mode (desired) mode solution
+ * ======== ==== ========
+ * DLM_LOCK_NL DLM_LOCK_CR direct convert
+ * DLM_LOCK_NL DLM_LOCK_EX direct convert
+ * DLM_LOCK_CR DLM_LOCK_CR returned already
+ * DLM_LOCK_CR DLM_LOCK_EX first convert to DLM_LOCK_NL
+ * DLM_LOCK_EX DLM_LOCK_CR direct convert
+ * DLM_LOCK_EX DLM_LOCK_EX returned already
+ */
+ if (l->dlm_mode == DLM_LOCK_CR) {
+ r = dlm_lock(l->ls->lockspace, DLM_LOCK_NL, &l->lksb,
+ DLM_LKF_CONVERT | DLM_LKF_VALBLK,
+ l->name + l->name_index,
+ strlen(l->name + l->name_index),
+ 0, dmcl_ast_callback, l, dmcl_bast_callback);
+ if (r) {
+ DMERR("Failed CR->NL convertion for lock %s",
+ l->name);
+ mutex_unlock(&l->mutex);
+ return r;
+ }
+ }
+ r = dlm_lock(l->ls->lockspace, mode, &l->lksb,
+ DLM_LKF_CONVERT | DLM_LKF_VALBLK,
+ l->name + l->name_index, strlen(l->name + l->name_index),
+ 0, dmcl_ast_callback, l, dmcl_bast_callback);
+ if (r) {
+ DMERR("Failed to issue DLM lock operation: %d", r);
+ mutex_unlock(&l->mutex);
+ return r;
+ }
+
+ wait_for_completion(&l->dlm_completion);
+ r = lock_return_value(l);
+ if (r < 0) {
+ DMERR("DLM lock operation failed: %d", r);
+ mutex_unlock(&l->mutex);
+ return r;
+ }
+
+ l->local_mode = mode;
+ l->dlm_mode = mode;
+
+ mutex_unlock(&l->mutex);
+
+ return r;
+}
+EXPORT_SYMBOL(dmcl_lock);
+
+int dmcl_unlock(struct dmcl_lock *l)
+{
+ int r = 0;
+
+ mutex_lock(&l->mutex);
+
+ if (l->local_mode == DLM_LOCK_NL) {
+ DMERR("FATAL: Lock %s/%s is already unlocked",
+ l->ls->name, l->name);
+
+ /*
+ * If you are hitting this bug, it is likely you have made
+ * one of the two following mistakes:
+ * 1) You have two locks with the same name in your lockspace
+ * 2) You have unlocked the same lock twice in a row
+ */
+ BUG();
+ }
+
+ if (l->local_mode == DLM_LOCK_EX) {
+ l->local_counter++;
+ l->dlm_counter = l->local_counter;
+ }
+ l->local_mode = DLM_LOCK_NL;
+
+ if ((l->dlm_mode == DLM_LOCK_EX) && (l->flags & DMCL_CACHE_WRITE_LOCKS))
+ goto out;
+
+ if ((l->dlm_mode == DLM_LOCK_CR) && (l->flags & DMCL_CACHE_READ_LOCKS))
+ goto out;
+
+ /*
+ * If no caching has been specified or the DLM lock is needed
+ * elsewhere (indicated by dlm_mode == DLM_LOCK_NL), then
+ * we immediately put the lock into a non-conflicting state.
+ */
+ r = dlm_lock(l->ls->lockspace, DLM_LOCK_NL, &l->lksb,
+ DLM_LKF_CONVERT | DLM_LKF_VALBLK,
+ l->name + l->name_index, strlen(l->name + l->name_index),
+ 0, dmcl_ast_callback, l, dmcl_bast_callback);
+ if (r)
+ goto fail;
+
+ wait_for_completion(&l->dlm_completion);
+ r = lock_return_value(l);
+
+ if (r < 0)
+ goto fail;
+
+ l->dlm_mode = DLM_LOCK_NL;
+
+out:
+ mutex_unlock(&l->mutex);
+ return 0;
+
+fail:
+ DMERR("dlm_lock conversion of %s/%s failed: %d",
+ l->ls->name, l->name, r);
+ mutex_unlock(&l->mutex);
+ return r;
+}
+EXPORT_SYMBOL(dmcl_unlock);
+
+static int __init dm_cluster_lock_module_init(void)
+{
+ INIT_LIST_HEAD(&(dmcl_bast_assist.bast_list));
+ spin_lock_init(&(dmcl_bast_assist.lock));
+ INIT_WORK(&(dmcl_bast_assist.ws), dmcl_process_bast_requests);
+
+ dmcl_default_lockspace = dmcl_alloc_lockspace(DM_MSG_PREFIX);
+ if (IS_ERR(dmcl_default_lockspace)) {
+ if (PTR_ERR(dmcl_default_lockspace) == -ENOTCONN) {
+ DMWARN("DLM not ready yet. Delaying initialization.");
+ dmcl_default_lockspace = NULL;
+ } else {
+ DMERR("Failed to create default lockspace: %d",
+ (int)PTR_ERR(dmcl_default_lockspace));
+ return PTR_ERR(dmcl_default_lockspace);
+ }
+ }
+
+ return 0;
+}
+
+static void __exit dm_cluster_lock_module_exit(void)
+{
+ dmcl_free_lockspace(dmcl_default_lockspace);
+}
+
+module_init(dm_cluster_lock_module_init);
+module_exit(dm_cluster_lock_module_exit);
+
+MODULE_DESCRIPTION("DM Cluster Locking module");
+MODULE_AUTHOR("Jonathan Brassow");
+MODULE_LICENSE("GPL");
===================================================================
@@ -319,4 +319,13 @@ config DM_UEVENT
---help---
Generate udev events for DM events.
+config DM_CLUSTER_LOCKING
+ tristate "DM Cluster Locking module (EXPERIMENTAL)"
+ select DLM
+ ---help---
+ The DM Cluster Locking module provides a simple set of
+ cluster locking commands. It is a wrapper around the
+ more versatile (but more complex) DLM - which is also
+ found in the kernel.
+
endif # MD
===================================================================
@@ -44,6 +44,7 @@ obj-$(CONFIG_DM_SNAPSHOT) += dm-snapshot
obj-$(CONFIG_DM_MIRROR) += dm-mirror.o dm-log.o dm-region-hash.o
obj-$(CONFIG_DM_LOG_USERSPACE) += dm-log-userspace.o
obj-$(CONFIG_DM_ZERO) += dm-zero.o
+obj-$(CONFIG_DM_CLUSTER_LOCKING) += dm-cluster-locking.o
quiet_cmd_unroll = UNROLL $@
cmd_unroll = $(AWK) -f$(srctree)/$(src)/unroll.awk -vN=$(UNROLL) \
===================================================================
@@ -0,0 +1,79 @@
+/*
+ * Copyright (C) 2009 Red Hat, Inc. All rights reserved.
+ *
+ * This file is released under the GPL.
+ */
+#ifndef __DM_CLUSTER_LOCKING_DOT_H__
+#define __DM_CLUSTER_LOCKING_DOT_H__
+
+#define DMCL_CACHE_READ_LOCKS 1
+#define DMCL_CACHE_WRITE_LOCKS 2
+
+struct dmcl_lock;
+
+/**
+ * dmcl_alloc_lock
+ * @name: Unique cluster-wide name for lock
+ * @flags: DMCL_CACHE_READ_LOCKS | DMCL_CACHE_WRITE_LOCKS
+ *
+ * Allocate necessary lock structures, set attributes, and
+ * establish communication with the DLM.
+ *
+ * This operation can block.
+ *
+ * Returns: ptr or ERR_PTR
+ **/
+struct dmcl_lock *dmcl_alloc_lock(const char *name, uint64_t flags);
+
+/**
+ * dmcl_free_lock
+ * @l
+ *
+ * Free all associated memory for the given lock and sever
+ * all ties with the DLM.
+ *
+ * This operation can block.
+ **/
+void dmcl_free_lock(struct dmcl_lock *l);
+
+/**
+ * dmcl_lock
+ * @l
+ * @rw: specify READ or WRITE lock
+ *
+ * Acquire a lock READ(SHARED) or WRITE(EXCLUSIVE). Specify the
+ * distinction with the common 'READ' or 'WRITE' macros. Possible
+ * return values are:
+ * 1: The lock was acquired successfully /and/ the lock was
+ * granted in WRITE/EXLUSIVE mode to another machine since
+ * the last time the lock was held locally.
+ * Useful for determining the validity of a cached resource
+ * that is protected by the lock.
+ * 0: The lock was acquired successfully and no other machine
+ * had acquired the lock WRITE(EXCLUSIVE) since the last time
+ * the lock was acquired.
+ * -EXXX: Error acquiring the lock.
+ *
+ * This operation can block.
+ *
+ * Returns: 1, 0, -EXXX
+ **/
+int dmcl_lock(struct dmcl_lock *l, int rw);
+
+/**
+ * dmcl_unlock
+ * @l
+ *
+ * Unlock a lock. Whether the lock is continued to be held with
+ * respect to the DLM ("cached" unless needed by another machine),
+ * is determined by the flags used during the allocation of the
+ * lock. It is possible that this action fail if the DLM fails
+ * to release the lock as needed.
+ *
+ * This operation can block.
+ *
+ * Returns: 0, -EXXX
+ **/
+int dmcl_unlock(struct dmcl_lock *l);
+
+#endif /* __DM_CLUSTER_LOCKING_DOT_H__ */