diff mbox

[23/32] net/ceph: make ceph_msgr_wq non-reentrant

Message ID 1294062595-30097-24-git-send-email-tj@kernel.org (mailing list archive)
State New, archived
Headers show

Commit Message

Tejun Heo Jan. 3, 2011, 1:49 p.m. UTC
None
diff mbox

Patch

diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index a108b42..c3011be 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -110,17 +110,12 @@  struct ceph_msg_pos {
 
 /*
  * ceph_connection state bit flags
- *
- * QUEUED and BUSY are used together to ensure that only a single
- * thread is currently opening, reading or writing data to the socket.
  */
 #define LOSSYTX         0  /* we can close channel or drop messages on errors */
 #define CONNECTING	1
 #define NEGOTIATING	2
 #define KEEPALIVE_PENDING      3
 #define WRITE_PENDING	4  /* we have data ready to send */
-#define QUEUED          5  /* there is work queued on this connection */
-#define BUSY            6  /* work is being done */
 #define STANDBY		8  /* no outgoing messages, socket closed.  we keep
 			    * the ceph_connection around to maintain shared
 			    * state with the peer. */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index b6ff4a1..dff633d 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -96,7 +96,7 @@  struct workqueue_struct *ceph_msgr_wq;
 
 int ceph_msgr_init(void)
 {
-	ceph_msgr_wq = create_workqueue("ceph-msgr");
+	ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
 	if (!ceph_msgr_wq) {
 		pr_err("msgr_init failed to create workqueue\n");
 		return -ENOMEM;
@@ -1920,20 +1920,6 @@  bad_tag:
 /*
  * Atomically queue work on a connection.  Bump @con reference to
  * avoid races with connection teardown.
- *
- * There is some trickery going on with QUEUED and BUSY because we
- * only want a _single_ thread operating on each connection at any
- * point in time, but we want to use all available CPUs.
- *
- * The worker thread only proceeds if it can atomically set BUSY.  It
- * clears QUEUED and does it's thing.  When it thinks it's done, it
- * clears BUSY, then rechecks QUEUED.. if it's set again, it loops
- * (tries again to set BUSY).
- *
- * To queue work, we first set QUEUED, _then_ if BUSY isn't set, we
- * try to queue work.  If that fails (work is already queued, or BUSY)
- * we give up (work also already being done or is queued) but leave QUEUED
- * set so that the worker thread will loop if necessary.
  */
 static void queue_con(struct ceph_connection *con)
 {
@@ -1948,11 +1934,7 @@  static void queue_con(struct ceph_connection *con)
 		return;
 	}
 
-	set_bit(QUEUED, &con->state);
-	if (test_bit(BUSY, &con->state)) {
-		dout("queue_con %p - already BUSY\n", con);
-		con->ops->put(con);
-	} else if (!queue_work(ceph_msgr_wq, &con->work.work)) {
+	if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) {
 		dout("queue_con %p - already queued\n", con);
 		con->ops->put(con);
 	} else {
@@ -1967,15 +1949,6 @@  static void con_work(struct work_struct *work)
 {
 	struct ceph_connection *con = container_of(work, struct ceph_connection,
 						   work.work);
-	int backoff = 0;
-
-more:
-	if (test_and_set_bit(BUSY, &con->state) != 0) {
-		dout("con_work %p BUSY already set\n", con);
-		goto out;
-	}
-	dout("con_work %p start, clearing QUEUED\n", con);
-	clear_bit(QUEUED, &con->state);
 
 	mutex_lock(&con->mutex);
 
@@ -1994,28 +1967,13 @@  more:
 	    try_read(con) < 0 ||
 	    try_write(con) < 0) {
 		mutex_unlock(&con->mutex);
-		backoff = 1;
 		ceph_fault(con);     /* error/fault path */
 		goto done_unlocked;
 	}
 
 done:
 	mutex_unlock(&con->mutex);
-
 done_unlocked:
-	clear_bit(BUSY, &con->state);
-	dout("con->state=%lu\n", con->state);
-	if (test_bit(QUEUED, &con->state)) {
-		if (!backoff || test_bit(OPENING, &con->state)) {
-			dout("con_work %p QUEUED reset, looping\n", con);
-			goto more;
-		}
-		dout("con_work %p QUEUED reset, but just faulted\n", con);
-		clear_bit(QUEUED, &con->state);
-	}
-	dout("con_work %p done\n", con);
-
-out:
 	con->ops->put(con);
 }