@@ -2612,6 +2612,47 @@ xlog_get_lowest_lsn(
return lowest_lsn;
}
+/*
+ * Keep processing entries in the iclog callback list until we come around and
+ * it is empty. We need to atomically see that the list is empty and change the
+ * state to DIRTY so that we don't miss any more callbacks being added.
+ *
+ * This function is called with the icloglock held and returns with it held. We
+ * drop it while running callbacks, however, as holding it over thousands of
+ * callbacks is unnecessary and causes excessive contention if we do.
+ */
+static bool
+xlog_state_do_iclog_callbacks(
+ struct xlog *log,
+ struct xlog_in_core *iclog,
+ bool aborted)
+{
+ bool ret = false;
+
+ spin_unlock(&log->l_icloglock);
+
+ spin_lock(&iclog->ic_callback_lock);
+ while (!list_empty(&iclog->ic_callbacks)) {
+ LIST_HEAD(tmp);
+
+ list_splice_init(&iclog->ic_callbacks, &tmp);
+
+ spin_unlock(&iclog->ic_callback_lock);
+ xlog_cil_process_committed(&tmp, aborted);
+ spin_lock(&iclog->ic_callback_lock);
+ ret = true;
+ }
+
+ /*
+ * Pick up the icloglock while still holding the callback lock so we
+ * serialise against anyone trying to add more callbacks to this iclog
+ * now we've finished processing.
+ */
+ spin_lock(&log->l_icloglock);
+ spin_unlock(&iclog->ic_callback_lock);
+ return ret;
+}
+
#ifdef DEBUG
/*
* Make one last gasp attempt to see if iclogs are being left in limbo. If the
@@ -2784,31 +2825,9 @@ xlog_state_do_callback(
} else
ioerrors++;
- spin_unlock(&log->l_icloglock);
+ if (xlog_state_do_iclog_callbacks(log, iclog, aborted))
+ loopdidcallbacks++;
- /*
- * Keep processing entries in the callback list until
- * we come around and it is empty. We need to
- * atomically see that the list is empty and change the
- * state to DIRTY so that we don't miss any more
- * callbacks being added.
- */
- spin_lock(&iclog->ic_callback_lock);
- while (!list_empty(&iclog->ic_callbacks)) {
- LIST_HEAD(tmp);
-
- list_splice_init(&iclog->ic_callbacks, &tmp);
-
- spin_unlock(&iclog->ic_callback_lock);
- xlog_cil_process_committed(&tmp, aborted);
- spin_lock(&iclog->ic_callback_lock);
- }
-
- loopdidcallbacks++;
- funcdidcallbacks++;
-
- spin_lock(&log->l_icloglock);
- spin_unlock(&iclog->ic_callback_lock);
if (!(iclog->ic_state & XLOG_STATE_IOERROR))
iclog->ic_state = XLOG_STATE_DIRTY;
@@ -2824,6 +2843,8 @@ xlog_state_do_callback(
iclog = iclog->ic_next;
} while (first_iclog != iclog);
+ funcdidcallbacks += loopdidcallbacks;
+
if (repeats > 5000) {
flushcnt += repeats;
repeats = 0;