@@ -433,6 +433,8 @@ xlog_cil_insert_items(
struct xfs_log_item *lip;
int len = 0;
int iovhdr_res = 0, split_res = 0, ctx_res = 0;
+ int space_used;
+ struct xlog_cil_pcp *cilpcp;
ASSERT(tp);
@@ -469,8 +471,9 @@ xlog_cil_insert_items(
*
* This can steal more than we need, but that's OK.
*/
+ space_used = atomic_read(&ctx->space_used);
if (atomic_read(&cil->xc_iclog_hdrs) > 0 ||
- ctx->space_used + len >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log)) {
+ space_used + len >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log)) {
int split_res = log->l_iclog_hsize +
sizeof(struct xlog_op_header);
if (ctx_res)
@@ -480,16 +483,34 @@ xlog_cil_insert_items(
atomic_sub(tp->t_ticket->t_iclog_hdrs, &cil->xc_iclog_hdrs);
}
+ /*
+ * Update the CIL percpu pointer. This updates the global counter when
+ * over the percpu batch size or when the CIL is over the space limit.
+ * This means low lock overhead for normal updates, and when over the
+ * limit the space used is immediately accounted. This makes enforcing
+ * the hard limit much more accurate. The per cpu fold threshold is
+ * based on how close we are to the hard limit.
+ */
+ cilpcp = get_cpu_ptr(cil->xc_pcp);
+ cilpcp->space_used += len;
+ if (space_used >= XLOG_CIL_SPACE_LIMIT(log) ||
+ cilpcp->space_used >
+ ((XLOG_CIL_BLOCKING_SPACE_LIMIT(log) - space_used) /
+ num_online_cpus())) {
+ atomic_add(cilpcp->space_used, &ctx->space_used);
+ cilpcp->space_used = 0;
+ }
+ put_cpu_ptr(cilpcp);
+
spin_lock(&cil->xc_cil_lock);
- tp->t_ticket->t_curr_res -= ctx_res + len;
ctx->ticket->t_unit_res += ctx_res;
ctx->ticket->t_curr_res += ctx_res;
- ctx->space_used += len;
/*
* If we've overrun the reservation, dump the tx details before we move
* the log items. Shutdown is imminent...
*/
+ tp->t_ticket->t_curr_res -= ctx_res + len;
if (WARN_ON(tp->t_ticket->t_curr_res < 0)) {
xfs_warn(log->l_mp, "Transaction log reservation overrun:");
xfs_warn(log->l_mp,
@@ -768,12 +789,20 @@ xlog_cil_push_work(
xfs_lsn_t push_seq;
DECLARE_COMPLETION_ONSTACK(bdev_flush);
bool commit_iclog_sync = false;
+ int cpu;
+ struct xlog_cil_pcp *cilpcp;
new_ctx = xlog_cil_ctx_alloc();
new_ctx->ticket = xlog_cil_ticket_alloc(log);
down_write(&cil->xc_ctx_lock);
+ /* Reset the CIL pcp counters */
+ for_each_online_cpu(cpu) {
+ cilpcp = per_cpu_ptr(cil->xc_pcp, cpu);
+ cilpcp->space_used = 0;
+ }
+
spin_lock(&cil->xc_push_lock);
push_seq = cil->xc_push_seq;
ASSERT(push_seq <= ctx->sequence);
@@ -1037,6 +1066,7 @@ xlog_cil_push_background(
struct xlog *log) __releases(cil->xc_ctx_lock)
{
struct xfs_cil *cil = log->l_cilp;
+ int space_used = atomic_read(&cil->xc_ctx->space_used);
/*
* The cil won't be empty because we are called while holding the
@@ -1049,7 +1079,7 @@ xlog_cil_push_background(
* Don't do a background push if we haven't used up all the
* space available yet.
*/
- if (cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log)) {
+ if (space_used < XLOG_CIL_SPACE_LIMIT(log)) {
up_read(&cil->xc_ctx_lock);
return;
}
@@ -1078,10 +1108,10 @@ xlog_cil_push_background(
* The ctx->xc_push_lock provides the serialisation necessary for safely
* using the lockless waitqueue_active() check in this context.
*/
- if (cil->xc_ctx->space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log) ||
+ if (space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log) ||
waitqueue_active(&cil->xc_push_wait)) {
trace_xfs_log_cil_wait(log, cil->xc_ctx->ticket);
- ASSERT(cil->xc_ctx->space_used < log->l_logsize);
+ ASSERT(space_used < log->l_logsize);
xlog_wait(&cil->xc_push_wait, &cil->xc_push_lock);
return;
}
@@ -222,7 +222,7 @@ struct xfs_cil_ctx {
xfs_lsn_t commit_lsn; /* chkpt commit record lsn */
struct xlog_ticket *ticket; /* chkpt ticket */
int nvecs; /* number of regions */
- int space_used; /* aggregate size of regions */
+ atomic_t space_used; /* aggregate size of regions */
struct list_head busy_extents; /* busy extents in chkpt */
struct xfs_log_vec *lv_chain; /* logvecs being pushed */
struct list_head iclog_entry;