@@ -58,9 +58,10 @@ static struct orangefs_kernel_op_s *orangefs_devreq_remove_op(__u64 tag)
next,
&htable_ops_in_progress[index],
list) {
- if (op->tag == tag && !op_state_purged(op)) {
+ if (op->tag == tag && !op_state_purged(op) &&
+ !op_state_given_up(op)) {
list_del_init(&op->list);
- get_op(op); /* increase ref count. */
+ op->op_state |= OP_VFS_STATE_COPYING;
spin_unlock(&htable_ops_in_progress_lock);
return op;
}
@@ -133,7 +134,7 @@ restart:
__s32 fsid;
/* This lock is held past the end of the loop when we break. */
spin_lock(&op->lock);
- if (unlikely(op_state_purged(op))) {
+ if (unlikely(op_state_purged(op) || op_state_given_up(op))) {
spin_unlock(&op->lock);
continue;
}
@@ -205,7 +206,7 @@ restart:
return -EAGAIN;
}
list_del_init(&cur_op->list);
- get_op(op);
+ op->op_state |= OP_VFS_STATE_COPYING;
spin_unlock(&orangefs_request_list_lock);
spin_unlock(&cur_op->lock);
@@ -227,10 +228,11 @@ restart:
spin_lock(&htable_ops_in_progress_lock);
spin_lock(&cur_op->lock);
+ cur_op->op_state &= ~OP_VFS_STATE_COPYING;
if (unlikely(op_state_given_up(cur_op))) {
spin_unlock(&cur_op->lock);
spin_unlock(&htable_ops_in_progress_lock);
- op_release(cur_op);
+ complete(&cur_op->waitq);
goto restart;
}
@@ -242,7 +244,6 @@ restart:
orangefs_devreq_add_op(cur_op);
spin_unlock(&cur_op->lock);
spin_unlock(&htable_ops_in_progress_lock);
- op_release(cur_op);
/* The client only asks to read one size buffer. */
return MAX_DEV_REQ_UPSIZE;
@@ -255,13 +256,16 @@ error:
gossip_err("orangefs: Failed to copy data to user space\n");
spin_lock(&orangefs_request_list_lock);
spin_lock(&cur_op->lock);
+ cur_op->op_state &= ~OP_VFS_STATE_COPYING;
if (likely(!op_state_given_up(cur_op))) {
set_op_state_waiting(cur_op);
list_add(&cur_op->list, &orangefs_request_list);
+ spin_unlock(&cur_op->lock);
+ } else {
+ spin_unlock(&cur_op->lock);
+ complete(&cur_op->waitq);
}
- spin_unlock(&cur_op->lock);
spin_unlock(&orangefs_request_list_lock);
- op_release(cur_op);
return -EFAULT;
}
@@ -333,8 +337,7 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
n = copy_from_iter(&op->downcall, downcall_size, iter);
if (n != downcall_size) {
gossip_err("%s: failed to copy downcall.\n", __func__);
- ret = -EFAULT;
- goto Broken;
+ goto Efault;
}
if (op->downcall.status)
@@ -354,8 +357,7 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
downcall_size,
op->downcall.trailer_size,
total);
- ret = -EFAULT;
- goto Broken;
+ goto Efault;
}
/* Only READDIR operations should have trailers. */
@@ -364,8 +366,7 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
gossip_err("%s: %x operation with trailer.",
__func__,
op->downcall.type);
- ret = -EFAULT;
- goto Broken;
+ goto Efault;
}
/* READDIR operations should always have trailers. */
@@ -374,8 +375,7 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
gossip_err("%s: %x operation with no trailer.",
__func__,
op->downcall.type);
- ret = -EFAULT;
- goto Broken;
+ goto Efault;
}
if (op->downcall.type != ORANGEFS_VFS_OP_READDIR)
@@ -386,8 +386,7 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
if (op->downcall.trailer_buf == NULL) {
gossip_err("%s: failed trailer vmalloc.\n",
__func__);
- ret = -ENOMEM;
- goto Broken;
+ goto Enomem;
}
memset(op->downcall.trailer_buf, 0, op->downcall.trailer_size);
n = copy_from_iter(op->downcall.trailer_buf,
@@ -396,8 +395,7 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
if (n != op->downcall.trailer_size) {
gossip_err("%s: failed to copy trailer.\n", __func__);
vfree(op->downcall.trailer_buf);
- ret = -EFAULT;
- goto Broken;
+ goto Efault;
}
wakeup:
@@ -406,38 +404,28 @@ wakeup:
* that this op is done
*/
spin_lock(&op->lock);
- if (unlikely(op_state_given_up(op))) {
+ op->op_state &= ~OP_VFS_STATE_COPYING;
+ if (unlikely(op_is_cancel(op))) {
spin_unlock(&op->lock);
- goto out;
- }
- set_op_state_serviced(op);
- spin_unlock(&op->lock);
-
- /*
- * If this operation is an I/O operation we need to wait
- * for all data to be copied before we can return to avoid
- * buffer corruption and races that can pull the buffers
- * out from under us.
- *
- * Essentially we're synchronizing with other parts of the
- * vfs implicitly by not allowing the user space
- * application reading/writing this device to return until
- * the buffers are done being used.
- */
-out:
- if (unlikely(op_is_cancel(op)))
put_cancel(op);
- op_release(op);
- return ret;
-
-Broken:
- spin_lock(&op->lock);
- if (!op_state_given_up(op)) {
- op->downcall.status = ret;
+ } else if (unlikely(op_state_given_up(op))) {
+ spin_unlock(&op->lock);
+ complete(&op->waitq);
+ } else {
set_op_state_serviced(op);
+ spin_unlock(&op->lock);
}
- spin_unlock(&op->lock);
- goto out;
+ return ret;
+
+Efault:
+ op->downcall.status = -(ORANGEFS_ERROR_BIT | 9);
+ ret = -EFAULT;
+ goto wakeup;
+
+Enomem:
+ op->downcall.status = -(ORANGEFS_ERROR_BIT | 8);
+ ret = -ENOMEM;
+ goto wakeup;
}
/* Returns whether any FS are still pending remounted */
@@ -120,8 +120,6 @@ struct orangefs_kernel_op_s *op_alloc(__s32 type)
spin_lock_init(&new_op->lock);
init_completion(&new_op->waitq);
- atomic_set(&new_op->ref_count, 1);
-
new_op->upcall.type = ORANGEFS_VFS_OP_INVALID;
new_op->downcall.type = ORANGEFS_VFS_OP_INVALID;
new_op->downcall.status = -1;
@@ -149,7 +147,7 @@ struct orangefs_kernel_op_s *op_alloc(__s32 type)
return new_op;
}
-void __op_release(struct orangefs_kernel_op_s *orangefs_op)
+void op_release(struct orangefs_kernel_op_s *orangefs_op)
{
if (orangefs_op) {
gossip_debug(GOSSIP_CACHE_DEBUG,
@@ -98,6 +98,7 @@ enum orangefs_vfs_op_states {
OP_VFS_STATE_SERVICED = 4,
OP_VFS_STATE_PURGED = 8,
OP_VFS_STATE_GIVEN_UP = 16,
+ OP_VFS_STATE_COPYING = 32,
};
/*
@@ -205,8 +206,6 @@ struct orangefs_kernel_op_s {
struct completion waitq;
spinlock_t lock;
- atomic_t ref_count;
-
/* VFS aio fields */
int attempts;
@@ -230,23 +229,7 @@ static inline void set_op_state_serviced(struct orangefs_kernel_op_s *op)
#define op_state_given_up(op) ((op)->op_state & OP_VFS_STATE_GIVEN_UP)
#define op_is_cancel(op) ((op)->downcall.type == ORANGEFS_VFS_OP_CANCEL)
-static inline void get_op(struct orangefs_kernel_op_s *op)
-{
- atomic_inc(&op->ref_count);
- gossip_debug(GOSSIP_DEV_DEBUG,
- "(get) Alloced OP (%p:%llu)\n", op, llu(op->tag));
-}
-
-void __op_release(struct orangefs_kernel_op_s *op);
-
-static inline void op_release(struct orangefs_kernel_op_s *op)
-{
- if (atomic_dec_and_test(&op->ref_count)) {
- gossip_debug(GOSSIP_DEV_DEBUG,
- "(put) Releasing OP (%p:%llu)\n", op, llu((op)->tag));
- __op_release(op);
- }
-}
+void op_release(struct orangefs_kernel_op_s *op);
extern void orangefs_bufmap_put(int);
static inline void put_cancel(struct orangefs_kernel_op_s *op)
@@ -200,6 +200,7 @@ bool orangefs_cancel_op_in_progress(struct orangefs_kernel_op_s *op)
static void orangefs_clean_up_interrupted_operation(struct orangefs_kernel_op_s *op)
{
+ bool copying;
/*
* handle interrupted cases depending on what state we were in when
* the interruption is detected. there is a coarse grained lock
@@ -208,6 +209,7 @@ static void orangefs_clean_up_interrupted_operation(struct orangefs_kernel_op_s
* Called with op->lock held.
*/
op->op_state |= OP_VFS_STATE_GIVEN_UP;
+ copying = op->op_state & OP_VFS_STATE_COPYING;
if (op_state_waiting(op)) {
/*
@@ -243,6 +245,8 @@ static void orangefs_clean_up_interrupted_operation(struct orangefs_kernel_op_s
gossip_err("%s: can't get here.\n", __func__);
spin_unlock(&op->lock);
}
+ if (copying)
+ wait_for_completion(&op->waitq);
reinit_completion(&op->waitq);
}