@@ -36,11 +36,26 @@ struct rbd_tcm_reset_event {
u64 notify_id;
};
+#define RBD_TCM_CAW_LOCK_POLL_HZ (1 * HZ)
+#define RBD_TCM_CAW_LOCK_TOUT_HZ (5 * HZ)
+
+struct rbd_tcm_caw_locker {
+ struct delayed_work caw_work;
+ struct delayed_work caw_tout_work;
+ struct completion lock_finished;
+ int lock_rc;
+ int retries;
+ struct rbd_tcm_device *rbd_tcm_dev;
+};
+
struct rbd_tcm_device {
struct rbd_device *rbd_dev;
struct se_device *se_dev;
struct rbd_tcm_reset_event reset_evt;
+
+ struct rbd_tcm_caw_locker *caw_locker;
+ struct workqueue_struct *caw_wq;
};
static int rbd_tcm_start_reset(struct se_device *se_dev, u32 timeout)
@@ -92,6 +107,8 @@ static int rbd_tcm_detach_device(struct se_device *se_dev)
struct request_queue *q = ibock_se_device_to_q(se_dev);
struct rbd_tcm_device *rbd_tcm_dev = se_dev->cluster_dev_data;
+ destroy_workqueue(rbd_tcm_dev->caw_wq);
+
cancel_work_sync(&rbd_tcm_dev->reset_evt.work);
se_dev->cluster_dev_data = NULL;
rbd_detach_tcm_dev(q->queuedata);
@@ -112,16 +129,207 @@ static int rbd_tcm_attach_device(struct se_device *se_dev)
INIT_WORK(&rbd_tcm_dev->reset_evt.work, rbd_tcm_reset_event_workfn);
rbd_tcm_dev->reset_evt.rbd_tcm_dev = rbd_tcm_dev;
+ /* work queue to serialise COMPARE AND WRITE handling */
+ rbd_tcm_dev->caw_wq = alloc_workqueue("caw-rbd",
+ WQ_MEM_RECLAIM | WQ_UNBOUND, 1);
+ if (!rbd_tcm_dev->caw_wq) {
+ rbd_warn(rbd_tcm_dev->rbd_dev,
+ "Unable to create CAW workqueue for rbd");
+ kfree(rbd_tcm_dev);
+ return -ENOMEM;
+ }
+
se_dev->cluster_dev_data = rbd_tcm_dev;
return rbd_attach_tcm_dev(q->queuedata, rbd_tcm_dev);
}
+static void
+rbd_tcm_caw_lock_dispatch(struct work_struct *work)
+{
+ struct rbd_tcm_caw_locker *caw_locker
+ = container_of(work, struct rbd_tcm_caw_locker, caw_work.work);
+ struct rbd_device *rbd_dev = caw_locker->rbd_tcm_dev->rbd_dev;
+ int ret;
+
+ pr_debug("CAW lock dispatch running\n");
+
+ ret = rbd_dev_lock(rbd_dev, "rbd_tcm_caw", 1, "cookie", "",
+ "exclusive COMPARE AND wRITE lock", 0);
+ if (ret == -EEXIST) {
+ rbd_warn(rbd_dev, "CAW lock conflict, deferring operation");
+ /* TODO use unlock notification, instead of polling */
+ caw_locker->retries++;
+ queue_delayed_work(caw_locker->rbd_tcm_dev->caw_wq,
+ &caw_locker->caw_work,
+ RBD_TCM_CAW_LOCK_POLL_HZ);
+ return;
+ } else if (ret < 0) {
+ rbd_warn(rbd_dev, "failed to obtain CAW lock: %d", ret);
+ caw_locker->lock_rc = ret;
+ complete(&caw_locker->lock_finished);
+ return;
+ }
+
+ pr_debug("acquired COMPARE AND WRITE lock after %d retries\n",
+ caw_locker->retries);
+
+ cancel_delayed_work_sync(&caw_locker->caw_tout_work);
+ caw_locker->lock_rc = 0;
+ complete(&caw_locker->lock_finished);
+}
+
+static void
+rbd_tcm_caw_lock_timeout(struct work_struct *work)
+{
+ struct rbd_tcm_caw_locker *caw_locker
+ = container_of(work, struct rbd_tcm_caw_locker,
+ caw_tout_work.work);
+
+ pr_warn("CAW lock timeout running\n");
+
+ cancel_delayed_work_sync(&caw_locker->caw_work);
+ caw_locker->lock_rc = -ETIMEDOUT;
+ complete(&caw_locker->lock_finished);
+}
+
+/*
+ * Ensure cluster wide exclusive COMPARE AND WRITE access via an RBD device
+ * lock. Local exclusivity is handled via dev->saw_sem.
+ * In future, we may be able to offload the entire atomic read->compare->write
+ * sequence to the OSDs, which would make this interface pretty useless.
+ */
+static int
+rbd_tcm_caw_lock(struct se_device *se_dev)
+{
+ struct rbd_tcm_device *rbd_tcm_dev = se_dev->cluster_dev_data;
+ struct rbd_tcm_caw_locker *caw_locker;
+ int ret;
+
+ ret = down_interruptible(&se_dev->caw_sem);
+ if (ret != 0) {
+ pr_err("failed to obtain local semaphore\n");
+ return ret;
+ }
+
+ BUG_ON(rbd_tcm_dev->caw_locker != NULL);
+
+ pr_debug("got local CAW semaphore\n");
+
+ caw_locker = kzalloc(sizeof(*caw_locker), GFP_KERNEL);
+ if (!caw_locker) {
+ pr_err("Unable to allocate caw_locker\n");
+ ret = -ENOMEM;;
+ goto err_sem_up;
+ }
+
+ init_completion(&caw_locker->lock_finished);
+ /* whichever work finishes first cancels the other */
+ INIT_DELAYED_WORK(&caw_locker->caw_work, rbd_tcm_caw_lock_dispatch);
+ INIT_DELAYED_WORK(&caw_locker->caw_tout_work, rbd_tcm_caw_lock_timeout);
+ caw_locker->lock_rc = -EINTR;
+ caw_locker->rbd_tcm_dev = rbd_tcm_dev;
+
+ rbd_tcm_dev->caw_locker = caw_locker;
+
+ queue_delayed_work(rbd_tcm_dev->caw_wq, &caw_locker->caw_work, 0);
+ queue_delayed_work(rbd_tcm_dev->caw_wq, &caw_locker->caw_tout_work,
+ RBD_TCM_CAW_LOCK_TOUT_HZ);
+ pr_debug("work queued, awaiting completion\n");
+ wait_for_completion(&caw_locker->lock_finished);
+ if (caw_locker->lock_rc < 0) {
+ ret = caw_locker->lock_rc;
+ goto err_locker_free;
+ }
+
+ /* caw_locker freed following unlock */
+
+ return 0;
+
+err_locker_free:
+ kfree(caw_locker);
+ rbd_tcm_dev->caw_locker = NULL;
+err_sem_up:
+ up(&se_dev->caw_sem);
+ pr_debug("dropped local CAW semaphore on failure\n");
+ return ret;
+}
+
+static void
+rbd_tcm_caw_unlock_dispatch(struct work_struct *work)
+{
+ struct rbd_tcm_caw_locker *caw_locker
+ = container_of(work, struct rbd_tcm_caw_locker, caw_work.work);
+ struct rbd_device *rbd_dev = caw_locker->rbd_tcm_dev->rbd_dev;
+ int ret;
+
+ pr_debug("CAW unlock dispatch running\n");
+
+ ret = rbd_dev_unlock(rbd_dev, "rbd_tcm_caw", "cookie");
+ if (ret < 0)
+ rbd_warn(rbd_dev, "failed to drop CAW lock: %d", ret);
+ else {
+ pr_debug("dropped RBD COMPARE AND WRITE lock\n");
+ }
+
+ cancel_delayed_work_sync(&caw_locker->caw_tout_work);
+ caw_locker->lock_rc = ret;
+ complete(&caw_locker->lock_finished);
+}
+
+static void
+rbd_tcm_caw_unlock_timeout(struct work_struct *work)
+{
+ struct rbd_tcm_caw_locker *caw_locker
+ = container_of(work, struct rbd_tcm_caw_locker,
+ caw_tout_work.work);
+
+ pr_warn("CAW unlock timeout running\n");
+
+ cancel_delayed_work_sync(&caw_locker->caw_work);
+ caw_locker->lock_rc = -ETIMEDOUT;
+ complete(&caw_locker->lock_finished);
+}
+
+static void
+rbd_tcm_caw_unlock(struct se_device *se_dev)
+{
+ struct rbd_tcm_device *rbd_tcm_dev = se_dev->cluster_dev_data;
+ struct rbd_tcm_caw_locker *caw_locker = rbd_tcm_dev->caw_locker;
+
+ /* set if lock was successfull */
+ BUG_ON(caw_locker == NULL);
+
+ init_completion(&caw_locker->lock_finished);
+ INIT_DELAYED_WORK(&caw_locker->caw_work, rbd_tcm_caw_unlock_dispatch);
+ INIT_DELAYED_WORK(&caw_locker->caw_tout_work, rbd_tcm_caw_unlock_timeout);
+ caw_locker->lock_rc = -EINTR;
+ caw_locker->retries = 0;
+ caw_locker->rbd_tcm_dev = rbd_tcm_dev;
+
+ queue_delayed_work(rbd_tcm_dev->caw_wq, &caw_locker->caw_work, 0);
+ queue_delayed_work(rbd_tcm_dev->caw_wq, &caw_locker->caw_tout_work,
+ RBD_TCM_CAW_LOCK_TOUT_HZ);
+ pr_debug("work queued, awaiting completion\n");
+ wait_for_completion(&caw_locker->lock_finished);
+ if (caw_locker->lock_rc < 0) {
+ pr_warn("leaving stale RBD CAW lock");
+ }
+
+ kfree(caw_locker);
+ rbd_tcm_dev->caw_locker = NULL;
+
+ up(&se_dev->caw_sem);
+ pr_debug("dropped local CAW semaphore\n");
+}
+
static struct se_cluster_api rbd_tcm_template = {
.name = "rbd",
.owner = THIS_MODULE,
.reset_device = rbd_tcm_start_reset,
.attach_device = rbd_tcm_attach_device,
.detach_device = rbd_tcm_detach_device,
+ .caw_lock = rbd_tcm_caw_lock,
+ .caw_unlock = rbd_tcm_caw_unlock,
};
int rbd_tcm_register(void)
@@ -23,6 +23,10 @@ struct se_cluster_api {
* takes longer than timeout seconds then -ETIMEDOUT should be returned.
*/
int (*reset_device)(struct se_device *dev, u32 timeout);
+
+ /* exclusive device locking for atomic COMPARE AND WRITE */
+ int (*caw_lock)(struct se_device *se_dev);
+ void (*caw_unlock)(struct se_device *se_dev);
};
extern int core_cluster_api_register(struct se_cluster_api *);
Use an RBD device lock in addition to a local device semaphore to ensure that COMPARE AND WRITE operations are handled in an atomic fashion. Lock acquisition is handled via a new workqueue, which currently polls periodically for the RBD device lock until it can be acquired. This polling behaviour should be replaced in future with watch/notify functionality similar to that used for RBD_NOTIFY_OP_SCSI_LUN_RESET. Signed-off-by: David Disseldorp <ddiss@suse.de> --- drivers/block/rbd_tcm.c | 208 +++++++++++++++++++++++++++++++++++ include/target/target_core_cluster.h | 4 + 2 files changed, 212 insertions(+)