@@ -448,6 +448,7 @@ struct ceph_connection {
struct mutex mutex;
/* out queue */
+ spinlock_t out_queue_lock; /* protects out_queue */
struct list_head out_queue;
struct list_head out_sent; /* sending or sent but unacked */
u64 out_seq; /* last message queued for send */
@@ -633,6 +633,7 @@ void ceph_con_init(struct ceph_connection *con, void *private,
con_sock_state_init(con);
mutex_init(&con->mutex);
+ spin_lock_init(&con->out_queue_lock);
INIT_LIST_HEAD(&con->out_queue);
INIT_LIST_HEAD(&con->out_sent);
INIT_DELAYED_WORK(&con->work, ceph_con_workfn);
@@ -691,6 +692,7 @@ void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
u64 seq;
dout("%s con %p reconnect_seq %llu\n", __func__, con, reconnect_seq);
+ spin_lock(&con->out_queue_lock);
while (!list_empty(&con->out_queue)) {
msg = list_first_entry(&con->out_queue, struct ceph_msg,
list_head);
@@ -704,6 +706,7 @@ void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
msg, seq);
ceph_msg_remove(msg);
}
+ spin_unlock(&con->out_queue_lock);
}
#ifdef CONFIG_BLOCK
@@ -1601,16 +1604,19 @@ static void con_fault(struct ceph_connection *con)
}
/* Requeue anything that hasn't been acked */
+ spin_lock(&con->out_queue_lock);
list_splice_init(&con->out_sent, &con->out_queue);
/* If there are no messages queued or keepalive pending, place
* the connection in a STANDBY state */
if (list_empty(&con->out_queue) &&
!ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) {
+ spin_unlock(&con->out_queue_lock);
dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
con->state = CEPH_CON_S_STANDBY;
} else {
+ spin_unlock(&con->out_queue_lock);
/* retry after a delay. */
con->state = CEPH_CON_S_PREOPEN;
if (!con->delay) {
@@ -1691,19 +1697,18 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
msg->needs_out_seq = true;
- mutex_lock(&con->mutex);
-
- if (con->state == CEPH_CON_S_CLOSED) {
+ if (READ_ONCE(con->state) == CEPH_CON_S_CLOSED) {
dout("con_send %p closed, dropping %p\n", con, msg);
ceph_msg_put(msg);
- mutex_unlock(&con->mutex);
return;
}
msg_con_set(msg, con);
BUG_ON(!list_empty(&msg->list_head));
+ spin_lock(&con->out_queue_lock);
list_add_tail(&msg->list_head, &con->out_queue);
+ spin_unlock(&con->out_queue_lock);
dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
@@ -1712,7 +1717,6 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
le32_to_cpu(msg->hdr.data_len));
ceph_con_flag_set(con, CEPH_CON_F_CLEAR_STANDBY);
- mutex_unlock(&con->mutex);
/* if there wasn't anything waiting to send before, queue
* new work */
@@ -2058,6 +2062,8 @@ void ceph_con_get_out_msg(struct ceph_connection *con)
{
struct ceph_msg *msg;
+ lockdep_assert_held(&con->out_queue_lock);
+
BUG_ON(list_empty(&con->out_queue));
msg = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
WARN_ON(msg->con != con);
@@ -194,25 +194,9 @@ static void prepare_write_message_footer(struct ceph_connection *con)
*/
static void prepare_write_message(struct ceph_connection *con)
{
- struct ceph_msg *m;
+ struct ceph_msg *m = con->out_msg;
u32 crc;
- con_out_kvec_reset(con);
- con->v1.out_msg_done = false;
-
- /* Sneak an ack in there first? If we can get it into the same
- * TCP packet that's a good thing. */
- if (con->in_seq > con->in_seq_acked) {
- con->in_seq_acked = con->in_seq;
- con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
- con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
- con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
- &con->v1.out_temp_ack);
- }
-
- ceph_con_get_out_msg(con);
- m = con->out_msg;
-
dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
m, con->out_seq, le16_to_cpu(m->hdr.type),
le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
@@ -1427,10 +1411,27 @@ int ceph_con_v1_try_write(struct ceph_connection *con)
goto more;
}
/* is anything else pending? */
+ spin_lock(&con->out_queue_lock);
if (!list_empty(&con->out_queue)) {
+ con_out_kvec_reset(con);
+ con->v1.out_msg_done = false;
+
+ /* Sneak an ack in there first? If we can get it into the same
+ * TCP packet that's a good thing. */
+ if (con->in_seq > con->in_seq_acked) {
+ con->in_seq_acked = con->in_seq;
+ con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
+ con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
+ con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
+ &con->v1.out_temp_ack);
+ }
+
+ ceph_con_get_out_msg(con);
+ spin_unlock(&con->out_queue_lock);
prepare_write_message(con);
goto more;
}
+ spin_unlock(&con->out_queue_lock);
if (con->in_seq > con->in_seq_acked) {
prepare_write_ack(con);
goto more;
@@ -3001,7 +3001,9 @@ static int populate_out_iter(struct ceph_connection *con)
}
WARN_ON(con->v2.out_state != OUT_S_GET_NEXT);
+ spin_lock(&con->out_queue_lock);
if (ceph_con_flag_test_and_clear(con, CEPH_CON_F_KEEPALIVE_PENDING)) {
+ spin_unlock(&con->out_queue_lock);
ret = prepare_keepalive2(con);
if (ret) {
pr_err("prepare_keepalive2 failed: %d\n", ret);
@@ -3009,18 +3011,21 @@ static int populate_out_iter(struct ceph_connection *con)
}
} else if (!list_empty(&con->out_queue)) {
ceph_con_get_out_msg(con);
+ spin_unlock(&con->out_queue_lock);
ret = prepare_message(con);
if (ret) {
pr_err("prepare_message failed: %d\n", ret);
return ret;
}
} else if (con->in_seq > con->in_seq_acked) {
+ spin_unlock(&con->out_queue_lock);
ret = prepare_ack(con);
if (ret) {
pr_err("prepare_ack failed: %d\n", ret);
return ret;
}
} else {
+ spin_unlock(&con->out_queue_lock);
goto nothing_pending;
}
Currently, the out_queue is protected by the con->mutex. ceph_con_send takes the mutex but just does some in-memory operations, followed by kicking the workqueue job to do the actual send. This means that while the workqueue job is operating, any task that wants to send a new message will end up blocked. Given that none of ceph_con_send's operations aside from the mutex acquisition will block, we should be able to allow tasks to submit new messages under a spinlock rather than taking the mutex, which should reduce this contention and (hopefully) improve throughput for both cephfs and rbd in highly contended situations. Add a new spinlock to protect the out_queue, and ensure we take it while holding the con->mutex when accessing the out_queue. Stop taking the con->mutex in ceph_con_send, and instead just take the spinlock around the list_add to the out_queue. Signed-off-by: Jeff Layton <jlayton@kernel.org> --- include/linux/ceph/messenger.h | 1 + net/ceph/messenger.c | 16 +++++++++++----- net/ceph/messenger_v1.c | 35 +++++++++++++++++----------------- net/ceph/messenger_v2.c | 5 +++++ 4 files changed, 35 insertions(+), 22 deletions(-)