Message ID | 20210305051143.182133-30-david@fromorbit.com (mailing list archive) |
---|---|
State | Superseded |
Headers | show |
Series | xfs: consolidated log and optimisation changes | expand |
On Fri, Mar 05, 2021 at 04:11:27PM +1100, Dave Chinner wrote: > From: Dave Chinner <dchinner@redhat.com> > > Handle writing of a logvec chain into an iclog that doesn't have > enough space to fit it all. The iclog has already been changed to > WANT_SYNC by xlog_get_iclog_space(), so the entire remaining space > in the iclog is exclusively owned by this logvec chain. > > The difference between the single and partial cases is that > we end up with partial iovec writes in the iclog and have to split > a log vec regions across two iclogs. The state handling for this is > currently awful and so we're building up the pieces needed to > handle this more cleanly one at a time. > > Signed-off-by: Dave Chinner <dchinner@redhat.com> > --- > fs/xfs/xfs_log.c | 525 ++++++++++++++++++++++------------------------- > 1 file changed, 251 insertions(+), 274 deletions(-) > > diff --git a/fs/xfs/xfs_log.c b/fs/xfs/xfs_log.c > index 590c1e6db475..10916b99bf0f 100644 > --- a/fs/xfs/xfs_log.c > +++ b/fs/xfs/xfs_log.c > @@ -2099,166 +2099,250 @@ xlog_print_trans( > } > } > > -static xlog_op_header_t * > -xlog_write_setup_ophdr( > - struct xlog_op_header *ophdr, > - struct xlog_ticket *ticket) > -{ > - ophdr->oh_clientid = XFS_TRANSACTION; > - ophdr->oh_res2 = 0; > - ophdr->oh_flags = 0; > - return ophdr; > -} > - > /* > - * Set up the parameters of the region copy into the log. This has > - * to handle region write split across multiple log buffers - this > - * state is kept external to this function so that this code can > - * be written in an obvious, self documenting manner. > + * Write whole log vectors into a single iclog which is guaranteed to have > + * either sufficient space for the entire log vector chain to be written or > + * exclusive access to the remaining space in the iclog. > + * > + * Return the number of iovecs and data written into the iclog, as well as > + * a pointer to the logvec that doesn't fit in the log (or NULL if we hit the > + * end of the chain. > */ > -static int > -xlog_write_setup_copy( > +static struct xfs_log_vec * > +xlog_write_single( Ouch. Could you fix the previous patch to move this new function a little higher in the file (like above xlog_write_setup_ophdr) so that it doesn't get shredded like this? Sooo... I /think/ this looks all right, but this is a pretty long reorganization. I might revisit this in the morning. :/ (Skip to the second-to-last hunk, that's where the next comment is...) > + struct xfs_log_vec *log_vector, > struct xlog_ticket *ticket, > - struct xlog_op_header *ophdr, > - int space_available, > - int space_required, > - int *copy_off, > - int *copy_len, > - int *last_was_partial_copy, > - int *bytes_consumed) > -{ > - int still_to_copy; > - > - still_to_copy = space_required - *bytes_consumed; > - *copy_off = *bytes_consumed; > - > - if (still_to_copy <= space_available) { > - /* write of region completes here */ > - *copy_len = still_to_copy; > - ophdr->oh_len = cpu_to_be32(*copy_len); > - if (*last_was_partial_copy) > - ophdr->oh_flags |= (XLOG_END_TRANS|XLOG_WAS_CONT_TRANS); > - *last_was_partial_copy = 0; > - *bytes_consumed = 0; > - return 0; > - } > - > - /* partial write of region, needs extra log op header reservation */ > - *copy_len = space_available; > - ophdr->oh_len = cpu_to_be32(*copy_len); > - ophdr->oh_flags |= XLOG_CONTINUE_TRANS; > - if (*last_was_partial_copy) > - ophdr->oh_flags |= XLOG_WAS_CONT_TRANS; > - *bytes_consumed += *copy_len; > - (*last_was_partial_copy)++; > - > - /* account for new log op header */ > - ticket->t_curr_res -= sizeof(struct xlog_op_header); > - > - return sizeof(struct xlog_op_header); > -} > - > -static int > -xlog_write_copy_finish( > - struct xlog *log, > struct xlog_in_core *iclog, > - uint flags, > - int *record_cnt, > - int *data_cnt, > - int *partial_copy, > - int *partial_copy_len, > - int log_offset, > - struct xlog_in_core **commit_iclog) > + uint32_t *log_offset, > + uint32_t *len, > + uint32_t *record_cnt, > + uint32_t *data_cnt) > { > - int error; > + struct xfs_log_vec *lv = log_vector; > + void *ptr; > + int index; > > - if (*partial_copy) { > + ASSERT(*log_offset + *len <= iclog->ic_size || > + iclog->ic_state == XLOG_STATE_WANT_SYNC); > + > + ptr = iclog->ic_datap + *log_offset; > + for (lv = log_vector; lv; lv = lv->lv_next) { > /* > - * This iclog has already been marked WANT_SYNC by > - * xlog_state_get_iclog_space. > + * If the entire log vec does not fit in the iclog, punt it to > + * the partial copy loop which can handle this case. > */ > - spin_lock(&log->l_icloglock); > - xlog_state_finish_copy(log, iclog, *record_cnt, *data_cnt); > - *record_cnt = 0; > - *data_cnt = 0; > - goto release_iclog; > - } > + if (lv->lv_niovecs && > + lv->lv_bytes > iclog->ic_size - *log_offset) > + break; > > - *partial_copy = 0; > - *partial_copy_len = 0; > + /* > + * Ordered log vectors have no regions to write so this > + * loop will naturally skip them. > + */ > + for (index = 0; index < lv->lv_niovecs; index++) { > + struct xfs_log_iovec *reg = &lv->lv_iovecp[index]; > + struct xlog_op_header *ophdr = reg->i_addr; > > - if (iclog->ic_size - log_offset <= sizeof(xlog_op_header_t)) { > - /* no more space in this iclog - push it. */ > - spin_lock(&log->l_icloglock); > - xlog_state_finish_copy(log, iclog, *record_cnt, *data_cnt); > - *record_cnt = 0; > - *data_cnt = 0; > + ASSERT(reg->i_len % sizeof(int32_t) == 0); > + ASSERT((unsigned long)ptr % sizeof(int32_t) == 0); > > - if (iclog->ic_state == XLOG_STATE_ACTIVE) > - xlog_state_switch_iclogs(log, iclog, 0); > - else > - ASSERT(iclog->ic_state == XLOG_STATE_WANT_SYNC || > - iclog->ic_state == XLOG_STATE_IOERROR); > - if (!commit_iclog) > - goto release_iclog; > - spin_unlock(&log->l_icloglock); > - ASSERT(flags & XLOG_COMMIT_TRANS); > - *commit_iclog = iclog; > + ophdr->oh_tid = cpu_to_be32(ticket->t_tid); > + ophdr->oh_len = cpu_to_be32(reg->i_len - > + sizeof(struct xlog_op_header)); > + memcpy(ptr, reg->i_addr, reg->i_len); > + xlog_write_adv_cnt(&ptr, len, log_offset, reg->i_len); > + (*record_cnt)++; > + *data_cnt += reg->i_len; > + } > } > + ASSERT(*len == 0 || lv); > + return lv; > +} > > - return 0; > +static int > +xlog_write_get_more_iclog_space( > + struct xlog *log, > + struct xlog_ticket *ticket, > + struct xlog_in_core **iclogp, > + uint32_t *log_offset, > + uint32_t len, > + uint32_t *record_cnt, > + uint32_t *data_cnt, > + int *contwr) > +{ > + struct xlog_in_core *iclog = *iclogp; > + int error; > > -release_iclog: > + spin_lock(&log->l_icloglock); > + xlog_state_finish_copy(log, iclog, *record_cnt, *data_cnt); > + ASSERT(iclog->ic_state == XLOG_STATE_WANT_SYNC || > + iclog->ic_state == XLOG_STATE_IOERROR); > error = xlog_state_release_iclog(log, iclog); > spin_unlock(&log->l_icloglock); > - return error; > + if (error) > + return error; > + > + error = xlog_state_get_iclog_space(log, len, &iclog, > + ticket, contwr, log_offset); > + if (error) > + return error; > + *record_cnt = 0; > + *data_cnt = 0; > + *iclogp = iclog; > + return 0; > } > > /* > - * Write log vectors into a single iclog which is guaranteed by the caller > - * to have enough space to write the entire log vector into. Return the number > - * of log vectors written into the iclog. > + * Write log vectors into a single iclog which is smaller than the current chain > + * length. We write until we cannot fit a full record into the remaining space > + * and then stop. We return the log vector that is to be written that cannot > + * wholly fit in the iclog. > */ > -static int > -xlog_write_single( > +static struct xfs_log_vec * > +xlog_write_partial( > + struct xlog *log, > struct xfs_log_vec *log_vector, > struct xlog_ticket *ticket, > - struct xlog_in_core *iclog, > - uint32_t log_offset, > - uint32_t len) > + struct xlog_in_core **iclogp, > + uint32_t *log_offset, > + uint32_t *len, > + uint32_t *record_cnt, > + uint32_t *data_cnt, > + int *contwr) > { > + struct xlog_in_core *iclog = *iclogp; > struct xfs_log_vec *lv = log_vector; > + struct xfs_log_iovec *reg; > + struct xlog_op_header *ophdr; > void *ptr; > int index = 0; > - int record_cnt = 0; > + uint32_t rlen; > + int error; > > - ASSERT(log_offset + len <= iclog->ic_size); > + /* walk the logvec, copying until we run out of space in the iclog */ > + ptr = iclog->ic_datap + *log_offset; > + for (index = 0; index < lv->lv_niovecs; index++) { > + uint32_t reg_offset = 0; > + > + reg = &lv->lv_iovecp[index]; > + ASSERT(reg->i_len % sizeof(int32_t) == 0); > > - ptr = iclog->ic_datap + log_offset; > - for (lv = log_vector; lv; lv = lv->lv_next) { > /* > - * Ordered log vectors have no regions to write so this > - * loop will naturally skip them. > + * The first region of a continuation must have a non-zero > + * length otherwise log recovery will just skip over it and > + * start recovering from the next opheader it finds. Because we > + * mark the next opheader as a continuation, recovery will then > + * incorrectly add the continuation to the previous region and > + * that breaks stuff. > + * > + * Hence if there isn't space for region data after the > + * opheader, then we need to start afresh with a new iclog. > */ > - for (index = 0; index < lv->lv_niovecs; index++) { > - struct xfs_log_iovec *reg = &lv->lv_iovecp[index]; > - struct xlog_op_header *ophdr = reg->i_addr; > + if (iclog->ic_size - *log_offset <= > + sizeof(struct xlog_op_header)) { > + error = xlog_write_get_more_iclog_space(log, ticket, > + &iclog, log_offset, *len, record_cnt, > + data_cnt, contwr); > + if (error) > + return ERR_PTR(error); > + ptr = iclog->ic_datap + *log_offset; > + } > > - ASSERT(reg->i_len % sizeof(int32_t) == 0); > - ASSERT((unsigned long)ptr % sizeof(int32_t) == 0); > + ophdr = reg->i_addr; > + rlen = min_t(uint32_t, reg->i_len, iclog->ic_size - *log_offset); > + > + ophdr->oh_tid = cpu_to_be32(ticket->t_tid); > + ophdr->oh_len = cpu_to_be32(rlen - sizeof(struct xlog_op_header)); > + if (rlen != reg->i_len) > + ophdr->oh_flags |= XLOG_CONTINUE_TRANS; > > + ASSERT((unsigned long)ptr % sizeof(int32_t) == 0); > + xlog_verify_dest_ptr(log, ptr); > + memcpy(ptr, reg->i_addr, rlen); > + xlog_write_adv_cnt(&ptr, len, log_offset, rlen); > + (*record_cnt)++; > + *data_cnt += rlen; > + > + if (rlen == reg->i_len) > + continue; > + > + /* > + * We now have a partially written iovec, but it can span > + * multiple iclogs so we loop here. First we release the iclog > + * we currently have, then we get a new iclog and add a new > + * opheader. Then we continue copying from where we were until > + * we either complete the iovec or fill the iclog. If we > + * complete the iovec, then we increment the index and go right > + * back to the top of the outer loop. if we fill the iclog, we > + * run the inner loop again. > + * > + * This is complicated by the tail of a region using all the > + * space in an iclog and hence requiring us to release the iclog > + * and get a new one before returning to the outer loop. We must > + * always guarantee that we exit this inner loop with at least > + * space for log transaction opheaders left in the current > + * iclog, hence we cannot just terminate the loop at the end > + * of the of the continuation. So we loop while there is no > + * space left in the current iclog, and check for the end of the > + * continuation after getting a new iclog. > + */ > + do { > + /* > + * Account for the continuation opheader before we get > + * a new iclog. This is necessary so that we reserve > + * space in the iclog for it. > + */ > + if (ophdr->oh_flags & XLOG_CONTINUE_TRANS) { > + *len += sizeof(struct xlog_op_header); > + ticket->t_curr_res -= sizeof(struct xlog_op_header); > + } > + error = xlog_write_get_more_iclog_space(log, ticket, > + &iclog, log_offset, *len, record_cnt, > + data_cnt, contwr); > + if (error) > + return ERR_PTR(error); > + ptr = iclog->ic_datap + *log_offset; > + > + ophdr = ptr; > ophdr->oh_tid = cpu_to_be32(ticket->t_tid); > - ophdr->oh_len = cpu_to_be32(reg->i_len - > + ophdr->oh_clientid = XFS_TRANSACTION; > + ophdr->oh_res2 = 0; > + ophdr->oh_flags = XLOG_WAS_CONT_TRANS; > + > + xlog_write_adv_cnt(&ptr, len, log_offset, > sizeof(struct xlog_op_header)); > - memcpy(ptr, reg->i_addr, reg->i_len); > - xlog_write_adv_cnt(&ptr, &len, &log_offset, reg->i_len); > - record_cnt++; > - } > + *data_cnt += sizeof(struct xlog_op_header); > + > + /* > + * If rlen fits in the iclog, then end the region > + * continuation. Otherwise we're going around again. > + */ > + reg_offset += rlen; > + rlen = reg->i_len - reg_offset; > + if (rlen <= iclog->ic_size - *log_offset) > + ophdr->oh_flags |= XLOG_END_TRANS; > + else > + ophdr->oh_flags |= XLOG_CONTINUE_TRANS; > + > + rlen = min_t(uint32_t, rlen, iclog->ic_size - *log_offset); > + ophdr->oh_len = cpu_to_be32(rlen); > + > + xlog_verify_dest_ptr(log, ptr); > + memcpy(ptr, reg->i_addr + reg_offset, rlen); > + xlog_write_adv_cnt(&ptr, len, log_offset, rlen); > + (*record_cnt)++; > + *data_cnt += rlen; > + > + } while (ophdr->oh_flags & XLOG_CONTINUE_TRANS); > } > - ASSERT(len == 0); > - return record_cnt; > -} > > + /* > + * No more iovecs remain in this logvec so return the next log vec to > + * the caller so it can go back to fast path copying. > + */ > + *iclogp = iclog; > + return lv->lv_next; > +} > > /* > * Write some region out to in-core log > @@ -2312,14 +2396,11 @@ xlog_write( > { > struct xlog_in_core *iclog = NULL; > struct xfs_log_vec *lv = log_vector; > - struct xfs_log_iovec *vecp = lv->lv_iovecp; > - int index = 0; > - int partial_copy = 0; > - int partial_copy_len = 0; > int contwr = 0; > int record_cnt = 0; > int data_cnt = 0; > int error = 0; > + int log_offset; > > if (ticket->t_curr_res < 0) { > xfs_alert_tag(log->l_mp, XFS_PTAG_LOGRES, > @@ -2328,157 +2409,52 @@ xlog_write( > xfs_force_shutdown(log->l_mp, SHUTDOWN_LOG_IO_ERROR); > } > > - if (start_lsn) > - *start_lsn = 0; > - while (lv && (!lv->lv_niovecs || index < lv->lv_niovecs)) { > - void *ptr; > - int log_offset; > - > - error = xlog_state_get_iclog_space(log, len, &iclog, ticket, > - &contwr, &log_offset); > - if (error) > - return error; > - > - ASSERT(log_offset <= iclog->ic_size - 1); > + error = xlog_state_get_iclog_space(log, len, &iclog, ticket, > + &contwr, &log_offset); > + if (error) > + return error; > > - /* Start_lsn is the first lsn written to. */ > - if (start_lsn && !*start_lsn) > - *start_lsn = be64_to_cpu(iclog->ic_header.h_lsn); > + /* start_lsn is the LSN of the first iclog written to. */ > + if (start_lsn) > + *start_lsn = be64_to_cpu(iclog->ic_header.h_lsn); > > - /* > - * iclogs containing commit records or unmount records need > - * to issue ordering cache flushes and commit immediately > - * to stable storage to guarantee journal vs metadata ordering > - * is correctly maintained in the storage media. > - */ > - if (optype & (XLOG_COMMIT_TRANS | XLOG_UNMOUNT_TRANS)) { > - iclog->ic_flags |= (XLOG_ICL_NEED_FLUSH | > - XLOG_ICL_NEED_FUA); > - } > + /* > + * iclogs containing commit records or unmount records need > + * to issue ordering cache flushes and commit immediately > + * to stable storage to guarantee journal vs metadata ordering > + * is correctly maintained in the storage media. This will always > + * fit in the iclog we have been already been passed. > + */ > + if (optype & (XLOG_COMMIT_TRANS | XLOG_UNMOUNT_TRANS)) { > + iclog->ic_flags |= (XLOG_ICL_NEED_FLUSH | XLOG_ICL_NEED_FUA); > + ASSERT(!contwr); > + } > > - /* If this is a single iclog write, go fast... */ > - if (!contwr && lv == log_vector) { > - record_cnt = xlog_write_single(lv, ticket, iclog, > - log_offset, len); > - len = 0; > - data_cnt = len; > + while (lv) { > + lv = xlog_write_single(lv, ticket, iclog, &log_offset, > + &len, &record_cnt, &data_cnt); > + if (!lv) > break; > - } > - > - /* > - * This loop writes out as many regions as can fit in the amount > - * of space which was allocated by xlog_state_get_iclog_space(). > - */ > - ptr = iclog->ic_datap + log_offset; > - while (lv && (!lv->lv_niovecs || index < lv->lv_niovecs)) { > - struct xfs_log_iovec *reg; > - struct xlog_op_header *ophdr; > - int copy_len; > - int copy_off; > - bool ordered = false; > - bool added_ophdr = false; > - > - /* ordered log vectors have no regions to write */ > - if (lv->lv_buf_len == XFS_LOG_VEC_ORDERED) { > - ASSERT(lv->lv_niovecs == 0); > - ordered = true; > - goto next_lv; > - } > - > - reg = &vecp[index]; > - ASSERT(reg->i_len % sizeof(int32_t) == 0); > - ASSERT((unsigned long)ptr % sizeof(int32_t) == 0); > - > - /* > - * Regions always have their ophdr at the start of the > - * region, except for: > - * - a transaction start which has a start record ophdr > - * before the first region ophdr; and > - * - the previous region didn't fully fit into an iclog > - * so needs a continuation ophdr to prepend the region > - * in this new iclog. > - */ > - ophdr = reg->i_addr; > - if (optype && index) { > - optype &= ~XLOG_START_TRANS; > - } else if (partial_copy) { > - ophdr = xlog_write_setup_ophdr(ptr, ticket); > - xlog_write_adv_cnt(&ptr, &len, &log_offset, > - sizeof(struct xlog_op_header)); > - added_ophdr = true; > - } > - ophdr->oh_tid = cpu_to_be32(ticket->t_tid); > - > - len += xlog_write_setup_copy(ticket, ophdr, > - iclog->ic_size-log_offset, > - reg->i_len, > - ©_off, ©_len, > - &partial_copy, > - &partial_copy_len); > - xlog_verify_dest_ptr(log, ptr); > - > > - /* > - * Wart: need to update length in embedded ophdr not > - * to include it's own length. > - */ > - if (!added_ophdr) { > - ophdr->oh_len = cpu_to_be32(copy_len - > - sizeof(struct xlog_op_header)); > - } > - > - ASSERT(copy_len > 0); > - memcpy(ptr, reg->i_addr + copy_off, copy_len); > - xlog_write_adv_cnt(&ptr, &len, &log_offset, copy_len); > - > - if (added_ophdr) > - copy_len += sizeof(struct xlog_op_header); > - record_cnt++; > - data_cnt += contwr ? copy_len : 0; > - > - error = xlog_write_copy_finish(log, iclog, optype, > - &record_cnt, &data_cnt, > - &partial_copy, > - &partial_copy_len, > - log_offset, > - commit_iclog); > - if (error) > - return error; > - > - /* > - * if we had a partial copy, we need to get more iclog > - * space but we don't want to increment the region > - * index because there is still more is this region to > - * write. > - * > - * If we completed writing this region, and we flushed > - * the iclog (indicated by resetting of the record > - * count), then we also need to get more log space. If > - * this was the last record, though, we are done and > - * can just return. > - */ > - if (partial_copy) > - break; > - > - if (++index == lv->lv_niovecs) { > -next_lv: > - lv = lv->lv_next; > - index = 0; > - if (lv) > - vecp = lv->lv_iovecp; > - } > - if (record_cnt == 0 && !ordered) { > - if (!lv) > - return 0; > - break; > - } > + ASSERT(!(optype & (XLOG_COMMIT_TRANS | XLOG_UNMOUNT_TRANS))); > + lv = xlog_write_partial(log, lv, ticket, &iclog, &log_offset, > + &len, &record_cnt, &data_cnt, &contwr); > + if (IS_ERR_OR_NULL(lv)) { > + error = PTR_ERR_OR_ZERO(lv); > + break; > } > } > + ASSERT((len == 0 && !lv) || error); > > - ASSERT(len == 0); > - > + /* > + * We've already been guaranteed that the last writes will fit inside > + * the current iclog, and hence it will already have the space used by > + * those writes accounted to it. Hence we do not need to update the > + * iclog with the number of bytes written here. > + */ > + ASSERT(!contwr || XLOG_FORCED_SHUTDOWN(log)); > spin_lock(&log->l_icloglock); > - xlog_state_finish_copy(log, iclog, record_cnt, data_cnt); > + xlog_state_finish_copy(log, iclog, record_cnt, 0); > if (commit_iclog) { > ASSERT(optype & XLOG_COMMIT_TRANS); > *commit_iclog = iclog; > @@ -2930,7 +2906,7 @@ xlog_state_get_iclog_space( > * xlog_write() algorithm assumes that at least 2 xlog_op_header_t's > * can fit into remaining data section. > */ > - if (iclog->ic_size - iclog->ic_offset < 2*sizeof(xlog_op_header_t)) { > + if (iclog->ic_size - iclog->ic_offset < 3*sizeof(xlog_op_header_t)) { Why does this change to 3? Does the comment need amending? --D > int error = 0; > > xlog_state_switch_iclogs(log, iclog, iclog->ic_size); > @@ -3633,11 +3609,12 @@ xlog_verify_iclog( > iclog->ic_header.h_cycle_data[idx]); > } > } > - if (clientid != XFS_TRANSACTION && clientid != XFS_LOG) > + if (clientid != XFS_TRANSACTION && clientid != XFS_LOG) { > xfs_warn(log->l_mp, > - "%s: invalid clientid %d op "PTR_FMT" offset 0x%lx", > - __func__, clientid, ophead, > + "%s: op %d invalid clientid %d op "PTR_FMT" offset 0x%lx", > + __func__, i, clientid, ophead, > (unsigned long)field_offset); > + } > > /* check length */ > p = &ophead->oh_len; > -- > 2.28.0 >
On Mon, Mar 08, 2021 at 06:59:32PM -0800, Darrick J. Wong wrote: > On Fri, Mar 05, 2021 at 04:11:27PM +1100, Dave Chinner wrote: > > From: Dave Chinner <dchinner@redhat.com> > > > > Handle writing of a logvec chain into an iclog that doesn't have > > enough space to fit it all. The iclog has already been changed to > > WANT_SYNC by xlog_get_iclog_space(), so the entire remaining space > > in the iclog is exclusively owned by this logvec chain. > > > > The difference between the single and partial cases is that > > we end up with partial iovec writes in the iclog and have to split > > a log vec regions across two iclogs. The state handling for this is > > currently awful and so we're building up the pieces needed to > > handle this more cleanly one at a time. > > > > Signed-off-by: Dave Chinner <dchinner@redhat.com> > > --- > > fs/xfs/xfs_log.c | 525 ++++++++++++++++++++++------------------------- > > 1 file changed, 251 insertions(+), 274 deletions(-) > > > > diff --git a/fs/xfs/xfs_log.c b/fs/xfs/xfs_log.c > > index 590c1e6db475..10916b99bf0f 100644 > > --- a/fs/xfs/xfs_log.c > > +++ b/fs/xfs/xfs_log.c > > @@ -2099,166 +2099,250 @@ xlog_print_trans( > > } > > } > > > > -static xlog_op_header_t * > > -xlog_write_setup_ophdr( > > - struct xlog_op_header *ophdr, > > - struct xlog_ticket *ticket) > > -{ > > - ophdr->oh_clientid = XFS_TRANSACTION; > > - ophdr->oh_res2 = 0; > > - ophdr->oh_flags = 0; > > - return ophdr; > > -} > > - > > /* > > - * Set up the parameters of the region copy into the log. This has > > - * to handle region write split across multiple log buffers - this > > - * state is kept external to this function so that this code can > > - * be written in an obvious, self documenting manner. > > + * Write whole log vectors into a single iclog which is guaranteed to have > > + * either sufficient space for the entire log vector chain to be written or > > + * exclusive access to the remaining space in the iclog. > > + * > > + * Return the number of iovecs and data written into the iclog, as well as > > + * a pointer to the logvec that doesn't fit in the log (or NULL if we hit the > > + * end of the chain. > > */ > > -static int > > -xlog_write_setup_copy( > > +static struct xfs_log_vec * > > +xlog_write_single( > > Ouch. Could you fix the previous patch to move this new function a > little higher in the file (like above xlog_write_setup_ophdr) so that it > doesn't get shredded like this? Not possible because xlog_write_setup_ophdr() is removed by this patch. I can't help it if the diffs are unreadable - I can't really control what git is doing here... > > @@ -2930,7 +2906,7 @@ xlog_state_get_iclog_space( > > * xlog_write() algorithm assumes that at least 2 xlog_op_header_t's > > * can fit into remaining data section. > > */ > > - if (iclog->ic_size - iclog->ic_offset < 2*sizeof(xlog_op_header_t)) { > > + if (iclog->ic_size - iclog->ic_offset < 3*sizeof(xlog_op_header_t)) { > > Why does this change to 3? Does the comment need amending? Ah, that was to do with the avoiding the need to split the start record/transaction header of across two iclogs. That was because the partial copy loop didn't have special handling for start records and so that log vector had to be wholly handled by the xlog_write_single() loop to set the iclog flush flags. However, with all the changes since then that have added explicit pre-flushes before the start record is formatted and the lifting of the iclog flush flags to the callers, we've removed all the special optype handling in xlog_write(). Hence we no longer need to guarantee the start record is handled by the single path, it now can be handled by this partial path just fine. So I can revert this hunk. Did I mention that this code was full of all sorts of subtle corner cases? :/ Cheers, Dave.
On Fri, Mar 05, 2021 at 04:11:27PM +1100, Dave Chinner wrote: > From: Dave Chinner <dchinner@redhat.com> > > Handle writing of a logvec chain into an iclog that doesn't have > enough space to fit it all. The iclog has already been changed to > WANT_SYNC by xlog_get_iclog_space(), so the entire remaining space > in the iclog is exclusively owned by this logvec chain. > > The difference between the single and partial cases is that > we end up with partial iovec writes in the iclog and have to split > a log vec regions across two iclogs. The state handling for this is > currently awful and so we're building up the pieces needed to > handle this more cleanly one at a time. > > Signed-off-by: Dave Chinner <dchinner@redhat.com> > --- FWIW, git --patience mode generates a more readable diff for this patch than what it generates by default. I'm referring to that locally and will try to leave feedback in the appropriate points here. > fs/xfs/xfs_log.c | 525 ++++++++++++++++++++++------------------------- > 1 file changed, 251 insertions(+), 274 deletions(-) > > diff --git a/fs/xfs/xfs_log.c b/fs/xfs/xfs_log.c > index 590c1e6db475..10916b99bf0f 100644 > --- a/fs/xfs/xfs_log.c > +++ b/fs/xfs/xfs_log.c > @@ -2099,166 +2099,250 @@ xlog_print_trans( > } > } > > -static xlog_op_header_t * > -xlog_write_setup_ophdr( > - struct xlog_op_header *ophdr, > - struct xlog_ticket *ticket) > -{ > - ophdr->oh_clientid = XFS_TRANSACTION; > - ophdr->oh_res2 = 0; > - ophdr->oh_flags = 0; > - return ophdr; > -} > - > /* > - * Set up the parameters of the region copy into the log. This has > - * to handle region write split across multiple log buffers - this > - * state is kept external to this function so that this code can > - * be written in an obvious, self documenting manner. > + * Write whole log vectors into a single iclog which is guaranteed to have > + * either sufficient space for the entire log vector chain to be written or > + * exclusive access to the remaining space in the iclog. > + * > + * Return the number of iovecs and data written into the iclog, as well as > + * a pointer to the logvec that doesn't fit in the log (or NULL if we hit the > + * end of the chain. > */ > -static int > -xlog_write_setup_copy( > +static struct xfs_log_vec * > +xlog_write_single( > + struct xfs_log_vec *log_vector, So xlog_write_single() was initially for single CIL xlog_write() calls and now it appears to be slightly different in that it writes as many full log vectors that fit in the current iclog and cycles through xlog_write_partial() (and back) to process log vectors that span iclogs differently from those that don't. > struct xlog_ticket *ticket, > - struct xlog_op_header *ophdr, > - int space_available, > - int space_required, > - int *copy_off, > - int *copy_len, > - int *last_was_partial_copy, > - int *bytes_consumed) > -{ > - int still_to_copy; > - > - still_to_copy = space_required - *bytes_consumed; > - *copy_off = *bytes_consumed; > - > - if (still_to_copy <= space_available) { > - /* write of region completes here */ > - *copy_len = still_to_copy; > - ophdr->oh_len = cpu_to_be32(*copy_len); > - if (*last_was_partial_copy) > - ophdr->oh_flags |= (XLOG_END_TRANS|XLOG_WAS_CONT_TRANS); > - *last_was_partial_copy = 0; > - *bytes_consumed = 0; > - return 0; > - } > - > - /* partial write of region, needs extra log op header reservation */ > - *copy_len = space_available; > - ophdr->oh_len = cpu_to_be32(*copy_len); > - ophdr->oh_flags |= XLOG_CONTINUE_TRANS; > - if (*last_was_partial_copy) > - ophdr->oh_flags |= XLOG_WAS_CONT_TRANS; > - *bytes_consumed += *copy_len; > - (*last_was_partial_copy)++; > - > - /* account for new log op header */ > - ticket->t_curr_res -= sizeof(struct xlog_op_header); > - > - return sizeof(struct xlog_op_header); > -} > - > -static int > -xlog_write_copy_finish( > - struct xlog *log, > struct xlog_in_core *iclog, > - uint flags, > - int *record_cnt, > - int *data_cnt, > - int *partial_copy, > - int *partial_copy_len, > - int log_offset, > - struct xlog_in_core **commit_iclog) > + uint32_t *log_offset, > + uint32_t *len, > + uint32_t *record_cnt, > + uint32_t *data_cnt) > { > - int error; > + struct xfs_log_vec *lv = log_vector; > + void *ptr; > + int index; > > - if (*partial_copy) { > + ASSERT(*log_offset + *len <= iclog->ic_size || > + iclog->ic_state == XLOG_STATE_WANT_SYNC); > + > + ptr = iclog->ic_datap + *log_offset; > + for (lv = log_vector; lv; lv = lv->lv_next) { > /* > - * This iclog has already been marked WANT_SYNC by > - * xlog_state_get_iclog_space. > + * If the entire log vec does not fit in the iclog, punt it to > + * the partial copy loop which can handle this case. > */ > - spin_lock(&log->l_icloglock); > - xlog_state_finish_copy(log, iclog, *record_cnt, *data_cnt); > - *record_cnt = 0; > - *data_cnt = 0; > - goto release_iclog; > - } > + if (lv->lv_niovecs && > + lv->lv_bytes > iclog->ic_size - *log_offset) > + break; > > - *partial_copy = 0; > - *partial_copy_len = 0; > + /* > + * Ordered log vectors have no regions to write so this > + * loop will naturally skip them. > + */ > + for (index = 0; index < lv->lv_niovecs; index++) { > + struct xfs_log_iovec *reg = &lv->lv_iovecp[index]; > + struct xlog_op_header *ophdr = reg->i_addr; > > - if (iclog->ic_size - log_offset <= sizeof(xlog_op_header_t)) { > - /* no more space in this iclog - push it. */ > - spin_lock(&log->l_icloglock); > - xlog_state_finish_copy(log, iclog, *record_cnt, *data_cnt); > - *record_cnt = 0; > - *data_cnt = 0; > + ASSERT(reg->i_len % sizeof(int32_t) == 0); > + ASSERT((unsigned long)ptr % sizeof(int32_t) == 0); > > - if (iclog->ic_state == XLOG_STATE_ACTIVE) > - xlog_state_switch_iclogs(log, iclog, 0); > - else > - ASSERT(iclog->ic_state == XLOG_STATE_WANT_SYNC || > - iclog->ic_state == XLOG_STATE_IOERROR); > - if (!commit_iclog) > - goto release_iclog; > - spin_unlock(&log->l_icloglock); > - ASSERT(flags & XLOG_COMMIT_TRANS); > - *commit_iclog = iclog; > + ophdr->oh_tid = cpu_to_be32(ticket->t_tid); > + ophdr->oh_len = cpu_to_be32(reg->i_len - > + sizeof(struct xlog_op_header)); > + memcpy(ptr, reg->i_addr, reg->i_len); > + xlog_write_adv_cnt(&ptr, len, log_offset, reg->i_len); > + (*record_cnt)++; > + *data_cnt += reg->i_len; > + } > } > + ASSERT(*len == 0 || lv); > + return lv; > +} > > - return 0; > +static int > +xlog_write_get_more_iclog_space( > + struct xlog *log, > + struct xlog_ticket *ticket, > + struct xlog_in_core **iclogp, > + uint32_t *log_offset, > + uint32_t len, > + uint32_t *record_cnt, > + uint32_t *data_cnt, > + int *contwr) > +{ > + struct xlog_in_core *iclog = *iclogp; > + int error; > > -release_iclog: > + spin_lock(&log->l_icloglock); > + xlog_state_finish_copy(log, iclog, *record_cnt, *data_cnt); > + ASSERT(iclog->ic_state == XLOG_STATE_WANT_SYNC || > + iclog->ic_state == XLOG_STATE_IOERROR); > error = xlog_state_release_iclog(log, iclog); > spin_unlock(&log->l_icloglock); > - return error; > + if (error) > + return error; > + > + error = xlog_state_get_iclog_space(log, len, &iclog, > + ticket, contwr, log_offset); > + if (error) > + return error; > + *record_cnt = 0; > + *data_cnt = 0; > + *iclogp = iclog; > + return 0; > } > > /* > - * Write log vectors into a single iclog which is guaranteed by the caller > - * to have enough space to write the entire log vector into. Return the number > - * of log vectors written into the iclog. > + * Write log vectors into a single iclog which is smaller than the current chain > + * length. We write until we cannot fit a full record into the remaining space > + * and then stop. We return the log vector that is to be written that cannot > + * wholly fit in the iclog. > */ > -static int > -xlog_write_single( > +static struct xfs_log_vec * > +xlog_write_partial( > + struct xlog *log, > struct xfs_log_vec *log_vector, > struct xlog_ticket *ticket, > - struct xlog_in_core *iclog, > - uint32_t log_offset, > - uint32_t len) > + struct xlog_in_core **iclogp, > + uint32_t *log_offset, > + uint32_t *len, > + uint32_t *record_cnt, > + uint32_t *data_cnt, > + int *contwr) > { > + struct xlog_in_core *iclog = *iclogp; > struct xfs_log_vec *lv = log_vector; The log_vector -> lv assignment seems spurious at this point since this function only processes lv and returns the next. > + struct xfs_log_iovec *reg; > + struct xlog_op_header *ophdr; > void *ptr; > int index = 0; > - int record_cnt = 0; > + uint32_t rlen; > + int error; > > - ASSERT(log_offset + len <= iclog->ic_size); > + /* walk the logvec, copying until we run out of space in the iclog */ > + ptr = iclog->ic_datap + *log_offset; > + for (index = 0; index < lv->lv_niovecs; index++) { > + uint32_t reg_offset = 0; > + > + reg = &lv->lv_iovecp[index]; > + ASSERT(reg->i_len % sizeof(int32_t) == 0); > > - ptr = iclog->ic_datap + log_offset; > - for (lv = log_vector; lv; lv = lv->lv_next) { > /* > - * Ordered log vectors have no regions to write so this > - * loop will naturally skip them. > + * The first region of a continuation must have a non-zero > + * length otherwise log recovery will just skip over it and > + * start recovering from the next opheader it finds. Because we > + * mark the next opheader as a continuation, recovery will then > + * incorrectly add the continuation to the previous region and > + * that breaks stuff. > + * > + * Hence if there isn't space for region data after the > + * opheader, then we need to start afresh with a new iclog. > */ > - for (index = 0; index < lv->lv_niovecs; index++) { > - struct xfs_log_iovec *reg = &lv->lv_iovecp[index]; > - struct xlog_op_header *ophdr = reg->i_addr; > + if (iclog->ic_size - *log_offset <= > + sizeof(struct xlog_op_header)) { > + error = xlog_write_get_more_iclog_space(log, ticket, > + &iclog, log_offset, *len, record_cnt, > + data_cnt, contwr); > + if (error) > + return ERR_PTR(error); > + ptr = iclog->ic_datap + *log_offset; > + } > > - ASSERT(reg->i_len % sizeof(int32_t) == 0); > - ASSERT((unsigned long)ptr % sizeof(int32_t) == 0); > + ophdr = reg->i_addr; > + rlen = min_t(uint32_t, reg->i_len, iclog->ic_size - *log_offset); > + > + ophdr->oh_tid = cpu_to_be32(ticket->t_tid); > + ophdr->oh_len = cpu_to_be32(rlen - sizeof(struct xlog_op_header)); > + if (rlen != reg->i_len) > + ophdr->oh_flags |= XLOG_CONTINUE_TRANS; > > + ASSERT((unsigned long)ptr % sizeof(int32_t) == 0); > + xlog_verify_dest_ptr(log, ptr); > + memcpy(ptr, reg->i_addr, rlen); > + xlog_write_adv_cnt(&ptr, len, log_offset, rlen); > + (*record_cnt)++; > + *data_cnt += rlen; > + /* if we fit the full region, jump to the next */ > + if (rlen == reg->i_len) > + continue; > + > + /* > + * We now have a partially written iovec, but it can span > + * multiple iclogs so we loop here. First we release the iclog > + * we currently have, then we get a new iclog and add a new > + * opheader. Then we continue copying from where we were until > + * we either complete the iovec or fill the iclog. If we > + * complete the iovec, then we increment the index and go right > + * back to the top of the outer loop. if we fill the iclog, we > + * run the inner loop again. > + * > + * This is complicated by the tail of a region using all the > + * space in an iclog and hence requiring us to release the iclog > + * and get a new one before returning to the outer loop. We must > + * always guarantee that we exit this inner loop with at least > + * space for log transaction opheaders left in the current > + * iclog, hence we cannot just terminate the loop at the end > + * of the of the continuation. So we loop while there is no > + * space left in the current iclog, and check for the end of the > + * continuation after getting a new iclog. > + */ Ok, so we land in this function if an lv spans an iclog boundary. The upper loop writes full vectors until we hit said iclog boundary, then we fall into the inner loop... > + do { > + /* > + * Account for the continuation opheader before we get > + * a new iclog. This is necessary so that we reserve > + * space in the iclog for it. > + */ > + if (ophdr->oh_flags & XLOG_CONTINUE_TRANS) { (Is this ever not true here?) > + *len += sizeof(struct xlog_op_header); > + ticket->t_curr_res -= sizeof(struct xlog_op_header); > + } > + error = xlog_write_get_more_iclog_space(log, ticket, > + &iclog, log_offset, *len, record_cnt, > + data_cnt, contwr); > + if (error) > + return ERR_PTR(error); > + ptr = iclog->ic_datap + *log_offset; > + > + ophdr = ptr; > ophdr->oh_tid = cpu_to_be32(ticket->t_tid); > - ophdr->oh_len = cpu_to_be32(reg->i_len - > + ophdr->oh_clientid = XFS_TRANSACTION; > + ophdr->oh_res2 = 0; > + ophdr->oh_flags = XLOG_WAS_CONT_TRANS; > + > + xlog_write_adv_cnt(&ptr, len, log_offset, > sizeof(struct xlog_op_header)); > - memcpy(ptr, reg->i_addr, reg->i_len); > - xlog_write_adv_cnt(&ptr, &len, &log_offset, reg->i_len); > - record_cnt++; > - } > + *data_cnt += sizeof(struct xlog_op_header); > + ... which switches to the next iclog, writes the continuation header... > + /* > + * If rlen fits in the iclog, then end the region > + * continuation. Otherwise we're going around again. > + */ > + reg_offset += rlen; > + rlen = reg->i_len - reg_offset; > + if (rlen <= iclog->ic_size - *log_offset) > + ophdr->oh_flags |= XLOG_END_TRANS; > + else > + ophdr->oh_flags |= XLOG_CONTINUE_TRANS; > + > + rlen = min_t(uint32_t, rlen, iclog->ic_size - *log_offset); > + ophdr->oh_len = cpu_to_be32(rlen); > + > + xlog_verify_dest_ptr(log, ptr); > + memcpy(ptr, reg->i_addr + reg_offset, rlen); > + xlog_write_adv_cnt(&ptr, len, log_offset, rlen); > + (*record_cnt)++; > + *data_cnt += rlen; > + > + } while (ophdr->oh_flags & XLOG_CONTINUE_TRANS); ... writes more of the region (iclog space permitting), and then determines whether we need further continuations (and partial writes of the same region) or can move onto the next region, until we're done with the lv. I think I follow the high level flow and it seems reasonable from a functional standpoint, but this also seems like quite a bit of churn for not much reduction in overall complexity. The higher level loop is much more simple and I think the per lv/vector iteration is an improvement, but we also seem to have duplicate functionality throughout the updated code and have introduced new forms of complexity around the state expectations for the transitions between the different write modes and between each write mode and the higher level loop. I.e., xlog_write_single() implements a straighforward loop to write out full log vectors. That seems fine, but the outer loop of xlog_write_partial() reimplements nearly the same per-region functionality with some added flexibility to handle op header flags and the special iclog processing associated with the continuation case. The inner loop factors out the continuation iclog management bits and op header injection, which I think is an improvement, but then duplicates region copying (yet again) pretty much only to implement partial copies, which really just involves offset management (i.e., fairly trivial relative to the broader complexity of the function). I dunno. I'd certainly need to stare more at this to cover all of the details, but given the amount of swizzling going on in a single patch I'm kind of wondering if/why we couldn't land on a single iterator in the spirit of xlog_write_partial() in that it primarily iterates on regions and factors out the grotty reservation and continuation management bits, but doesn't unroll as much and leave so much duplicate functionality around. For example, it looks to me that xlog_write_partial() almost nearly already supports a high level algorithm along the lines of the following (pseudocode): xlog_write(len) { get_iclog_space(len) for_each_lv() { for_each_reg() { reg_offset = 0; cont_write: /* write as much as will fit in the iclog, return count, * and set ophdr cont flag based on write result */ reg_offset += write_region(reg, &len, ®_offset, ophdr, ...); /* handle continuation writes */ if (reg_offset != reg->i_len) { get_more_iclog_space(len); /* stamp a WAS_CONT op hdr, set END if rlen fits * into new space, then continue with the same region */ stamp_cont_op_hdr(); goto cont_write; } if (need_more_iclog_space(len)) get_more_iclog_space(len); } } } That puts the whole thing back into a single high level walk and thus reintroduces the need for some of the continuation vs. non-continuation tracking wrt to the op header and iclog, but ISTM that complexity can be managed by the continuation abstraction you've already started to introduce (as opposed to the current scheme of conditionally accumulating data_cnt). It might even be fine to dump some of the requisite state into a context struct to carry between iclog reservation and copy finish processing rather than pass around so many independent and poorly named variables like the current upstream implementation does, but that's probably getting too deep into the weeds. FWIW, I can also see an approach of moving from the implementation in this patch toward something like the above, but I'm not sure I'd want to subject to the upstream code to that process... Brian > } > - ASSERT(len == 0); > - return record_cnt; > -} > > + /* > + * No more iovecs remain in this logvec so return the next log vec to > + * the caller so it can go back to fast path copying. > + */ > + *iclogp = iclog; > + return lv->lv_next; > +} > > /* > * Write some region out to in-core log > @@ -2312,14 +2396,11 @@ xlog_write( > { > struct xlog_in_core *iclog = NULL; > struct xfs_log_vec *lv = log_vector; > - struct xfs_log_iovec *vecp = lv->lv_iovecp; > - int index = 0; > - int partial_copy = 0; > - int partial_copy_len = 0; > int contwr = 0; > int record_cnt = 0; > int data_cnt = 0; > int error = 0; > + int log_offset; > > if (ticket->t_curr_res < 0) { > xfs_alert_tag(log->l_mp, XFS_PTAG_LOGRES, > @@ -2328,157 +2409,52 @@ xlog_write( > xfs_force_shutdown(log->l_mp, SHUTDOWN_LOG_IO_ERROR); > } > > - if (start_lsn) > - *start_lsn = 0; > - while (lv && (!lv->lv_niovecs || index < lv->lv_niovecs)) { > - void *ptr; > - int log_offset; > - > - error = xlog_state_get_iclog_space(log, len, &iclog, ticket, > - &contwr, &log_offset); > - if (error) > - return error; > - > - ASSERT(log_offset <= iclog->ic_size - 1); > + error = xlog_state_get_iclog_space(log, len, &iclog, ticket, > + &contwr, &log_offset); > + if (error) > + return error; > > - /* Start_lsn is the first lsn written to. */ > - if (start_lsn && !*start_lsn) > - *start_lsn = be64_to_cpu(iclog->ic_header.h_lsn); > + /* start_lsn is the LSN of the first iclog written to. */ > + if (start_lsn) > + *start_lsn = be64_to_cpu(iclog->ic_header.h_lsn); > > - /* > - * iclogs containing commit records or unmount records need > - * to issue ordering cache flushes and commit immediately > - * to stable storage to guarantee journal vs metadata ordering > - * is correctly maintained in the storage media. > - */ > - if (optype & (XLOG_COMMIT_TRANS | XLOG_UNMOUNT_TRANS)) { > - iclog->ic_flags |= (XLOG_ICL_NEED_FLUSH | > - XLOG_ICL_NEED_FUA); > - } > + /* > + * iclogs containing commit records or unmount records need > + * to issue ordering cache flushes and commit immediately > + * to stable storage to guarantee journal vs metadata ordering > + * is correctly maintained in the storage media. This will always > + * fit in the iclog we have been already been passed. > + */ > + if (optype & (XLOG_COMMIT_TRANS | XLOG_UNMOUNT_TRANS)) { > + iclog->ic_flags |= (XLOG_ICL_NEED_FLUSH | XLOG_ICL_NEED_FUA); > + ASSERT(!contwr); > + } > > - /* If this is a single iclog write, go fast... */ > - if (!contwr && lv == log_vector) { > - record_cnt = xlog_write_single(lv, ticket, iclog, > - log_offset, len); > - len = 0; > - data_cnt = len; > + while (lv) { > + lv = xlog_write_single(lv, ticket, iclog, &log_offset, > + &len, &record_cnt, &data_cnt); > + if (!lv) > break; > - } > - > - /* > - * This loop writes out as many regions as can fit in the amount > - * of space which was allocated by xlog_state_get_iclog_space(). > - */ > - ptr = iclog->ic_datap + log_offset; > - while (lv && (!lv->lv_niovecs || index < lv->lv_niovecs)) { > - struct xfs_log_iovec *reg; > - struct xlog_op_header *ophdr; > - int copy_len; > - int copy_off; > - bool ordered = false; > - bool added_ophdr = false; > - > - /* ordered log vectors have no regions to write */ > - if (lv->lv_buf_len == XFS_LOG_VEC_ORDERED) { > - ASSERT(lv->lv_niovecs == 0); > - ordered = true; > - goto next_lv; > - } > - > - reg = &vecp[index]; > - ASSERT(reg->i_len % sizeof(int32_t) == 0); > - ASSERT((unsigned long)ptr % sizeof(int32_t) == 0); > - > - /* > - * Regions always have their ophdr at the start of the > - * region, except for: > - * - a transaction start which has a start record ophdr > - * before the first region ophdr; and > - * - the previous region didn't fully fit into an iclog > - * so needs a continuation ophdr to prepend the region > - * in this new iclog. > - */ > - ophdr = reg->i_addr; > - if (optype && index) { > - optype &= ~XLOG_START_TRANS; > - } else if (partial_copy) { > - ophdr = xlog_write_setup_ophdr(ptr, ticket); > - xlog_write_adv_cnt(&ptr, &len, &log_offset, > - sizeof(struct xlog_op_header)); > - added_ophdr = true; > - } > - ophdr->oh_tid = cpu_to_be32(ticket->t_tid); > - > - len += xlog_write_setup_copy(ticket, ophdr, > - iclog->ic_size-log_offset, > - reg->i_len, > - ©_off, ©_len, > - &partial_copy, > - &partial_copy_len); > - xlog_verify_dest_ptr(log, ptr); > - > > - /* > - * Wart: need to update length in embedded ophdr not > - * to include it's own length. > - */ > - if (!added_ophdr) { > - ophdr->oh_len = cpu_to_be32(copy_len - > - sizeof(struct xlog_op_header)); > - } > - > - ASSERT(copy_len > 0); > - memcpy(ptr, reg->i_addr + copy_off, copy_len); > - xlog_write_adv_cnt(&ptr, &len, &log_offset, copy_len); > - > - if (added_ophdr) > - copy_len += sizeof(struct xlog_op_header); > - record_cnt++; > - data_cnt += contwr ? copy_len : 0; > - > - error = xlog_write_copy_finish(log, iclog, optype, > - &record_cnt, &data_cnt, > - &partial_copy, > - &partial_copy_len, > - log_offset, > - commit_iclog); > - if (error) > - return error; > - > - /* > - * if we had a partial copy, we need to get more iclog > - * space but we don't want to increment the region > - * index because there is still more is this region to > - * write. > - * > - * If we completed writing this region, and we flushed > - * the iclog (indicated by resetting of the record > - * count), then we also need to get more log space. If > - * this was the last record, though, we are done and > - * can just return. > - */ > - if (partial_copy) > - break; > - > - if (++index == lv->lv_niovecs) { > -next_lv: > - lv = lv->lv_next; > - index = 0; > - if (lv) > - vecp = lv->lv_iovecp; > - } > - if (record_cnt == 0 && !ordered) { > - if (!lv) > - return 0; > - break; > - } > + ASSERT(!(optype & (XLOG_COMMIT_TRANS | XLOG_UNMOUNT_TRANS))); > + lv = xlog_write_partial(log, lv, ticket, &iclog, &log_offset, > + &len, &record_cnt, &data_cnt, &contwr); > + if (IS_ERR_OR_NULL(lv)) { > + error = PTR_ERR_OR_ZERO(lv); > + break; > } > } > + ASSERT((len == 0 && !lv) || error); > > - ASSERT(len == 0); > - > + /* > + * We've already been guaranteed that the last writes will fit inside > + * the current iclog, and hence it will already have the space used by > + * those writes accounted to it. Hence we do not need to update the > + * iclog with the number of bytes written here. > + */ > + ASSERT(!contwr || XLOG_FORCED_SHUTDOWN(log)); > spin_lock(&log->l_icloglock); > - xlog_state_finish_copy(log, iclog, record_cnt, data_cnt); > + xlog_state_finish_copy(log, iclog, record_cnt, 0); > if (commit_iclog) { > ASSERT(optype & XLOG_COMMIT_TRANS); > *commit_iclog = iclog; > @@ -2930,7 +2906,7 @@ xlog_state_get_iclog_space( > * xlog_write() algorithm assumes that at least 2 xlog_op_header_t's > * can fit into remaining data section. > */ > - if (iclog->ic_size - iclog->ic_offset < 2*sizeof(xlog_op_header_t)) { > + if (iclog->ic_size - iclog->ic_offset < 3*sizeof(xlog_op_header_t)) { > int error = 0; > > xlog_state_switch_iclogs(log, iclog, iclog->ic_size); > @@ -3633,11 +3609,12 @@ xlog_verify_iclog( > iclog->ic_header.h_cycle_data[idx]); > } > } > - if (clientid != XFS_TRANSACTION && clientid != XFS_LOG) > + if (clientid != XFS_TRANSACTION && clientid != XFS_LOG) { > xfs_warn(log->l_mp, > - "%s: invalid clientid %d op "PTR_FMT" offset 0x%lx", > - __func__, clientid, ophead, > + "%s: op %d invalid clientid %d op "PTR_FMT" offset 0x%lx", > + __func__, i, clientid, ophead, > (unsigned long)field_offset); > + } > > /* check length */ > p = &ophead->oh_len; > -- > 2.28.0 >
On Thu, Mar 18, 2021 at 09:22:08AM -0400, Brian Foster wrote: > On Fri, Mar 05, 2021 at 04:11:27PM +1100, Dave Chinner wrote: > > From: Dave Chinner <dchinner@redhat.com> > > > > Handle writing of a logvec chain into an iclog that doesn't have > > enough space to fit it all. The iclog has already been changed to > > WANT_SYNC by xlog_get_iclog_space(), so the entire remaining space > > in the iclog is exclusively owned by this logvec chain. > > > > The difference between the single and partial cases is that > > we end up with partial iovec writes in the iclog and have to split > > a log vec regions across two iclogs. The state handling for this is > > currently awful and so we're building up the pieces needed to > > handle this more cleanly one at a time. > > > > Signed-off-by: Dave Chinner <dchinner@redhat.com> > > --- > > FWIW, git --patience mode generates a more readable diff for this patch > than what it generates by default. I'm referring to that locally and > will try to leave feedback in the appropriate points here. > > > fs/xfs/xfs_log.c | 525 ++++++++++++++++++++++------------------------- > > 1 file changed, 251 insertions(+), 274 deletions(-) > > > > diff --git a/fs/xfs/xfs_log.c b/fs/xfs/xfs_log.c > > index 590c1e6db475..10916b99bf0f 100644 > > --- a/fs/xfs/xfs_log.c > > +++ b/fs/xfs/xfs_log.c > > @@ -2099,166 +2099,250 @@ xlog_print_trans( > > } > > } > > > > -static xlog_op_header_t * > > -xlog_write_setup_ophdr( > > - struct xlog_op_header *ophdr, > > - struct xlog_ticket *ticket) > > -{ > > - ophdr->oh_clientid = XFS_TRANSACTION; > > - ophdr->oh_res2 = 0; > > - ophdr->oh_flags = 0; > > - return ophdr; > > -} > > - > > /* > > - * Set up the parameters of the region copy into the log. This has > > - * to handle region write split across multiple log buffers - this > > - * state is kept external to this function so that this code can > > - * be written in an obvious, self documenting manner. > > + * Write whole log vectors into a single iclog which is guaranteed to have > > + * either sufficient space for the entire log vector chain to be written or > > + * exclusive access to the remaining space in the iclog. > > + * > > + * Return the number of iovecs and data written into the iclog, as well as > > + * a pointer to the logvec that doesn't fit in the log (or NULL if we hit the > > + * end of the chain. > > */ > > -static int > > -xlog_write_setup_copy( > > +static struct xfs_log_vec * > > +xlog_write_single( > > + struct xfs_log_vec *log_vector, > > So xlog_write_single() was initially for single CIL xlog_write() calls > and now it appears to be slightly different in that it writes as many > full log vectors that fit in the current iclog and cycles through > xlog_write_partial() (and back) to process log vectors that span iclogs > differently from those that don't. Yes, that is what it does, but no, you've got the process and meaning backwards. I wrote xlog_write_single() it as it appears in this patch first, then split it out backwards to ease review. IOWs, "single" means "write everything that fits within this single iclog", not "only call this function if the entire lv chain fits inside a single iclog". The latter is what I split out to make it simpler to review, but it was not the reason it was called xlog_write_single().... > > + do { > > + /* > > + * Account for the continuation opheader before we get > > + * a new iclog. This is necessary so that we reserve > > + * space in the iclog for it. > > + */ > > + if (ophdr->oh_flags & XLOG_CONTINUE_TRANS) { > > (Is this ever not true here?) It is now, wasn't always. Fixed. > > > + *len += sizeof(struct xlog_op_header); > > + ticket->t_curr_res -= sizeof(struct xlog_op_header); > > + } > > + error = xlog_write_get_more_iclog_space(log, ticket, > > + &iclog, log_offset, *len, record_cnt, > > + data_cnt, contwr); > > + if (error) > > + return ERR_PTR(error); > > + ptr = iclog->ic_datap + *log_offset; > > + > > + ophdr = ptr; > > ophdr->oh_tid = cpu_to_be32(ticket->t_tid); > > - ophdr->oh_len = cpu_to_be32(reg->i_len - > > + ophdr->oh_clientid = XFS_TRANSACTION; > > + ophdr->oh_res2 = 0; > > + ophdr->oh_flags = XLOG_WAS_CONT_TRANS; > > + > > + xlog_write_adv_cnt(&ptr, len, log_offset, > > sizeof(struct xlog_op_header)); > > - memcpy(ptr, reg->i_addr, reg->i_len); > > - xlog_write_adv_cnt(&ptr, &len, &log_offset, reg->i_len); > > - record_cnt++; > > - } > > + *data_cnt += sizeof(struct xlog_op_header); > > + > > ... which switches to the next iclog, writes the continuation header... > > > + /* > > + * If rlen fits in the iclog, then end the region > > + * continuation. Otherwise we're going around again. > > + */ > > + reg_offset += rlen; > > + rlen = reg->i_len - reg_offset; > > + if (rlen <= iclog->ic_size - *log_offset) > > + ophdr->oh_flags |= XLOG_END_TRANS; > > + else > > + ophdr->oh_flags |= XLOG_CONTINUE_TRANS; > > + > > + rlen = min_t(uint32_t, rlen, iclog->ic_size - *log_offset); > > + ophdr->oh_len = cpu_to_be32(rlen); > > + > > + xlog_verify_dest_ptr(log, ptr); > > + memcpy(ptr, reg->i_addr + reg_offset, rlen); > > + xlog_write_adv_cnt(&ptr, len, log_offset, rlen); > > + (*record_cnt)++; > > + *data_cnt += rlen; > > + > > + } while (ophdr->oh_flags & XLOG_CONTINUE_TRANS); > > ... writes more of the region (iclog space permitting), and then > determines whether we need further continuations (and partial writes of > the same region) or can move onto the next region, until we're done with > the lv. Yup. > I think I follow the high level flow and it seems reasonable from a > functional standpoint, but this also seems like quite a bit of churn for > not much reduction in overall complexity. The higher level loop is much > more simple and I think the per lv/vector iteration is an improvement, > but we also seem to have duplicate functionality throughout the updated > code and have introduced new forms of complexity around the state > expectations for the transitions between the different write modes and > between each write mode and the higher level loop. Just getting untangling the code to get it to this point has been hard enough. I've held off doing more factoring and changing this code so I can actaully test it and find the bugs I might have left in it. Yes, it can be further improved by factoring the region copying stuff, but that's secondary to the major work of refactoring this code in the first place. The fact that you actually understood this fairly easily indicates just how much better this code already is compared to what is currently upstream.... > I.e., xlog_write_single() implements a straighforward loop to write out > full log vectors. That seems fine, but the outer loop of > xlog_write_partial() reimplements nearly the same per-region > functionality with some added flexibility to handle op header flags and > the special iclog processing associated with the continuation case. The > inner loop factors out the continuation iclog management bits and op > header injection, which I think is an improvement, but then duplicates > region copying (yet again) pretty much only to implement partial copies, > which really just involves offset management (i.e., fairly trivial > relative to the broader complexity of the function). > > I dunno. I'd certainly need to stare more at this to cover all of the > details, but given the amount of swizzling going on in a single patch > I'm kind of wondering if/why we couldn't land on a single iterator in > the spirit of xlog_write_partial() in that it primarily iterates on > regions and factors out the grotty reservation and continuation > management bits, but doesn't unroll as much and leave so much duplicate > functionality around. > > For example, it looks to me that xlog_write_partial() almost nearly > already supports a high level algorithm along the lines of the following > (pseudocode): > > xlog_write(len) > { > get_iclog_space(len) > > for_each_lv() { > for_each_reg() { > reg_offset = 0; > cont_write: > /* write as much as will fit in the iclog, return count, > * and set ophdr cont flag based on write result */ > reg_offset += write_region(reg, &len, ®_offset, ophdr, ...); > > /* handle continuation writes */ > if (reg_offset != reg->i_len) { > get_more_iclog_space(len); > /* stamp a WAS_CONT op hdr, set END if rlen fits > * into new space, then continue with the same region */ > stamp_cont_op_hdr(); > goto cont_write; > } > > if (need_more_iclog_space(len)) > get_more_iclog_space(len); > } > } > } Yeah, na. That is exactly the mess that I've just untangled. I don't want to rewrite this code again, and I don't want it more tightly tied to iclogs than it already is - I'm trying to move the code towards a common, simple fast path that knows nothing about iclogs and a slow path that handles the partial regions and obtaining a new buffer to write into. I want the two cases completely separate logic, because that makes both cases simpler to modify and reason about. Indeed, I want xlog_write to move away from iclogs because I want to use this code with direct mapped pmem regions, not just fixed memory buffers held in iclogs. IOWs, the code as it stands is a beginning, not an end. And even as a beginning, it works, is much better and faster than the current code, has been tested for some time now, can be further factored to make it simpler, easier to understand and provide infrastructure for new features. > That puts the whole thing back into a single high level walk and thus > reintroduces the need for some of the continuation vs. non-continuation > tracking wrt to the op header and iclog, but ISTM that complexity can be > managed by the continuation abstraction you've already started to > introduce (as opposed to the current scheme of conditionally > accumulating data_cnt). It might even be fine to dump some of the > requisite state into a context struct to carry between iclog reservation > and copy finish processing rather than pass around so many independent > and poorly named variables like the current upstream implementation > does, but that's probably getting too deep into the weeds. > > FWIW, I can also see an approach of moving from the implementation in > this patch toward something like the above, but I'm not sure I'd want to > subject to the upstream code to that process... This is exactly what upstream is for - iterative improvement via small steps. This is the first step of many, and what you propose takes the code in the wrong direction for the steps I've already taken and are planning to take. Perfect is the enemy of good, and if upstream is not the place to make iterative improvements like this that build towards a bigger picture goal, then where the hell are we supposed to do them? -Dave.
On Wed, May 19, 2021 at 02:49:03PM +1000, Dave Chinner wrote: > On Thu, Mar 18, 2021 at 09:22:08AM -0400, Brian Foster wrote: > > On Fri, Mar 05, 2021 at 04:11:27PM +1100, Dave Chinner wrote: > > > From: Dave Chinner <dchinner@redhat.com> > > > > > > Handle writing of a logvec chain into an iclog that doesn't have > > > enough space to fit it all. The iclog has already been changed to > > > WANT_SYNC by xlog_get_iclog_space(), so the entire remaining space > > > in the iclog is exclusively owned by this logvec chain. > > > > > > The difference between the single and partial cases is that > > > we end up with partial iovec writes in the iclog and have to split > > > a log vec regions across two iclogs. The state handling for this is > > > currently awful and so we're building up the pieces needed to > > > handle this more cleanly one at a time. > > > > > > Signed-off-by: Dave Chinner <dchinner@redhat.com> > > > --- > > > > FWIW, git --patience mode generates a more readable diff for this patch > > than what it generates by default. I'm referring to that locally and > > will try to leave feedback in the appropriate points here. > > > > > fs/xfs/xfs_log.c | 525 ++++++++++++++++++++++------------------------- > > > 1 file changed, 251 insertions(+), 274 deletions(-) > > > > > > diff --git a/fs/xfs/xfs_log.c b/fs/xfs/xfs_log.c > > > index 590c1e6db475..10916b99bf0f 100644 > > > --- a/fs/xfs/xfs_log.c > > > +++ b/fs/xfs/xfs_log.c > > > @@ -2099,166 +2099,250 @@ xlog_print_trans( > > > } > > > } > > > > > > -static xlog_op_header_t * > > > -xlog_write_setup_ophdr( > > > - struct xlog_op_header *ophdr, > > > - struct xlog_ticket *ticket) > > > -{ > > > - ophdr->oh_clientid = XFS_TRANSACTION; > > > - ophdr->oh_res2 = 0; > > > - ophdr->oh_flags = 0; > > > - return ophdr; > > > -} > > > - > > > /* > > > - * Set up the parameters of the region copy into the log. This has > > > - * to handle region write split across multiple log buffers - this > > > - * state is kept external to this function so that this code can > > > - * be written in an obvious, self documenting manner. > > > + * Write whole log vectors into a single iclog which is guaranteed to have > > > + * either sufficient space for the entire log vector chain to be written or > > > + * exclusive access to the remaining space in the iclog. > > > + * > > > + * Return the number of iovecs and data written into the iclog, as well as > > > + * a pointer to the logvec that doesn't fit in the log (or NULL if we hit the > > > + * end of the chain. > > > */ > > > -static int > > > -xlog_write_setup_copy( > > > +static struct xfs_log_vec * > > > +xlog_write_single( > > > + struct xfs_log_vec *log_vector, > > > > So xlog_write_single() was initially for single CIL xlog_write() calls > > and now it appears to be slightly different in that it writes as many > > full log vectors that fit in the current iclog and cycles through > > xlog_write_partial() (and back) to process log vectors that span iclogs > > differently from those that don't. > > Yes, that is what it does, but no, you've got the process and > meaning backwards. I wrote xlog_write_single() it as it appears in > this patch first, then split it out backwards to ease review. IOWs, > "single" means "write everything that fits within this single > iclog", not "only call this function if the entire lv chain fits > inside a single iclog". > > The latter is what I split out to make it simpler to review, but it > was not the reason it was called xlog_write_single().... > > > > + do { > > > + /* > > > + * Account for the continuation opheader before we get > > > + * a new iclog. This is necessary so that we reserve > > > + * space in the iclog for it. > > > + */ > > > + if (ophdr->oh_flags & XLOG_CONTINUE_TRANS) { > > > > (Is this ever not true here?) > > It is now, wasn't always. Fixed. > > > > > > + *len += sizeof(struct xlog_op_header); > > > + ticket->t_curr_res -= sizeof(struct xlog_op_header); > > > + } > > > + error = xlog_write_get_more_iclog_space(log, ticket, > > > + &iclog, log_offset, *len, record_cnt, > > > + data_cnt, contwr); > > > + if (error) > > > + return ERR_PTR(error); > > > + ptr = iclog->ic_datap + *log_offset; > > > + > > > + ophdr = ptr; > > > ophdr->oh_tid = cpu_to_be32(ticket->t_tid); > > > - ophdr->oh_len = cpu_to_be32(reg->i_len - > > > + ophdr->oh_clientid = XFS_TRANSACTION; > > > + ophdr->oh_res2 = 0; > > > + ophdr->oh_flags = XLOG_WAS_CONT_TRANS; > > > + > > > + xlog_write_adv_cnt(&ptr, len, log_offset, > > > sizeof(struct xlog_op_header)); > > > - memcpy(ptr, reg->i_addr, reg->i_len); > > > - xlog_write_adv_cnt(&ptr, &len, &log_offset, reg->i_len); > > > - record_cnt++; > > > - } > > > + *data_cnt += sizeof(struct xlog_op_header); > > > + > > > > ... which switches to the next iclog, writes the continuation header... > > > > > + /* > > > + * If rlen fits in the iclog, then end the region > > > + * continuation. Otherwise we're going around again. > > > + */ > > > + reg_offset += rlen; > > > + rlen = reg->i_len - reg_offset; > > > + if (rlen <= iclog->ic_size - *log_offset) > > > + ophdr->oh_flags |= XLOG_END_TRANS; > > > + else > > > + ophdr->oh_flags |= XLOG_CONTINUE_TRANS; > > > + > > > + rlen = min_t(uint32_t, rlen, iclog->ic_size - *log_offset); > > > + ophdr->oh_len = cpu_to_be32(rlen); > > > + > > > + xlog_verify_dest_ptr(log, ptr); > > > + memcpy(ptr, reg->i_addr + reg_offset, rlen); > > > + xlog_write_adv_cnt(&ptr, len, log_offset, rlen); > > > + (*record_cnt)++; > > > + *data_cnt += rlen; > > > + > > > + } while (ophdr->oh_flags & XLOG_CONTINUE_TRANS); > > > > ... writes more of the region (iclog space permitting), and then > > determines whether we need further continuations (and partial writes of > > the same region) or can move onto the next region, until we're done with > > the lv. > > Yup. > > > I think I follow the high level flow and it seems reasonable from a > > functional standpoint, but this also seems like quite a bit of churn for > > not much reduction in overall complexity. The higher level loop is much > > more simple and I think the per lv/vector iteration is an improvement, > > but we also seem to have duplicate functionality throughout the updated > > code and have introduced new forms of complexity around the state > > expectations for the transitions between the different write modes and > > between each write mode and the higher level loop. > > Just getting untangling the code to get it to this point > has been hard enough. I've held off doing more factoring and > changing this code so I can actaully test it and find the bugs I > might have left in it. > > Yes, it can be further improved by factoring the region copying > stuff, but that's secondary to the major work of refactoring this > code in the first place. The fact that you actually understood this > fairly easily indicates just how much better this code already is > compared to what is currently upstream.... > Heh. "You understood the patch, so it must be better!" :P I've paged much of this out in the 2 months or so since this review was posted, but my recollection is quite different. I use the existing code as a baseline to confirm behavior and assess readability of the updated code. > > I.e., xlog_write_single() implements a straighforward loop to write out > > full log vectors. That seems fine, but the outer loop of > > xlog_write_partial() reimplements nearly the same per-region > > functionality with some added flexibility to handle op header flags and > > the special iclog processing associated with the continuation case. The > > inner loop factors out the continuation iclog management bits and op > > header injection, which I think is an improvement, but then duplicates > > region copying (yet again) pretty much only to implement partial copies, > > which really just involves offset management (i.e., fairly trivial > > relative to the broader complexity of the function). > > > > I dunno. I'd certainly need to stare more at this to cover all of the > > details, but given the amount of swizzling going on in a single patch > > I'm kind of wondering if/why we couldn't land on a single iterator in > > the spirit of xlog_write_partial() in that it primarily iterates on > > regions and factors out the grotty reservation and continuation > > management bits, but doesn't unroll as much and leave so much duplicate > > functionality around. > > > > For example, it looks to me that xlog_write_partial() almost nearly > > already supports a high level algorithm along the lines of the following > > (pseudocode): > > > > xlog_write(len) > > { > > get_iclog_space(len) > > > > for_each_lv() { > > for_each_reg() { > > reg_offset = 0; > > cont_write: > > /* write as much as will fit in the iclog, return count, > > * and set ophdr cont flag based on write result */ > > reg_offset += write_region(reg, &len, ®_offset, ophdr, ...); > > > > /* handle continuation writes */ > > if (reg_offset != reg->i_len) { > > get_more_iclog_space(len); > > /* stamp a WAS_CONT op hdr, set END if rlen fits > > * into new space, then continue with the same region */ > > stamp_cont_op_hdr(); > > goto cont_write; > > } > > > > if (need_more_iclog_space(len)) > > get_more_iclog_space(len); > > } > > } > > } > > Yeah, na. That is exactly the mess that I've just untangled. > > I don't want to rewrite this code again, and I don't want it more > tightly tied to iclogs than it already is - I'm trying to move the > code towards a common, simple fast path that knows nothing about > iclogs and a slow path that handles the partial regions and > obtaining a new buffer to write into. I want the two cases > completely separate logic, because that makes both cases simpler to > modify and reason about. > Well, this review has been on the list for more than a couple months now. Given the response seems to have appeared after the next version of the series, I'm not sure it's worth digging my head back into the details to try and make a more detailed argument. Suffice it to say that I recall what I proposed as intended to be a fairly reasonable incremental step from what you ended up at to replace the large amount of resulting duplication with a single implementation that otherwise preserves the majority of the other cleanups. Not a rewrite or anything of the sort.. In any event, no single one of us is ultimately the authority on "better" or "simple." I'm just providing feedback that I didn't find the resulting factoring as a clear improvement, find it a bit annoying to have to dig through duplicate implementations to locate the subtle and unnecessary differences, and provided a suggestion on how to address that concern (that doesn't involve rewriting the thing) with specific details on how and why I think it improves readability. *shrug* Perhaps others will look at this, disagree with that assessment and find the separate functions more straightforward. > Indeed, I want xlog_write to move away from iclogs because I want to > use this code with direct mapped pmem regions, not just fixed memory > buffers held in iclogs. > That context and how that relates the proposed structure is not clear to me. That said, I _thought_ I looked through far enough into this series to grok how intertwined the resulting structure might have been with subsequent patches in order to provide thoughtful feedback, but I could be mistaken. > IOWs, the code as it stands is a beginning, not an end. And even as > a beginning, it works, is much better and faster than the current > code, has been tested for some time now, can be further factored to > make it simpler, easier to understand and provide infrastructure for > new features. > > > > That puts the whole thing back into a single high level walk and thus > > reintroduces the need for some of the continuation vs. non-continuation > > tracking wrt to the op header and iclog, but ISTM that complexity can be > > managed by the continuation abstraction you've already started to > > introduce (as opposed to the current scheme of conditionally > > accumulating data_cnt). It might even be fine to dump some of the > > requisite state into a context struct to carry between iclog reservation > > and copy finish processing rather than pass around so many independent > > and poorly named variables like the current upstream implementation > > does, but that's probably getting too deep into the weeds. > > > > FWIW, I can also see an approach of moving from the implementation in > > this patch toward something like the above, but I'm not sure I'd want to > > subject to the upstream code to that process... > > This is exactly what upstream is for - iterative improvement via > small steps. This is the first step of many, and what you propose > takes the code in the wrong direction for the steps I've already > taken and are planning to take. > > Perfect is the enemy of good, and if upstream is not the place to > make iterative improvements like this that build towards a bigger > picture goal, then where the hell are we supposed to do them? > Not every incremental development step is necessarily a suitable point for an upstream release. My comment above is basically to say that I think this refactoring is nearly to that point, but should go a bit further to reduce the duplication. If the argument against that step is dependence on future work, then propose the factoring close enough to that work such that sufficient context is available to review. Brian > -Dave. > -- > Dave Chinner > david@fromorbit.com >
On Thu, May 20, 2021 at 08:33:04AM -0400, Brian Foster wrote: <snipping the earlier comments out because I want only to respond to the discussion pertaining to handling of large patchsets> > > > I think I follow the high level flow and it seems reasonable from a > > > functional standpoint, but this also seems like quite a bit of churn for > > > not much reduction in overall complexity. The higher level loop is much > > > more simple and I think the per lv/vector iteration is an improvement, > > > but we also seem to have duplicate functionality throughout the updated > > > code and have introduced new forms of complexity around the state > > > expectations for the transitions between the different write modes and > > > between each write mode and the higher level loop. > > > > Just getting untangling the code to get it to this point > > has been hard enough. I've held off doing more factoring and > > changing this code so I can actaully test it and find the bugs I > > might have left in it. > > > > Yes, it can be further improved by factoring the region copying > > stuff, but that's secondary to the major work of refactoring this > > code in the first place. The fact that you actually understood this > > fairly easily indicates just how much better this code already is > > compared to what is currently upstream.... > > > > Heh. "You understood the patch, so it must be better!" :P > > I've paged much of this out in the 2 months or so since this review was > posted, but my recollection is quite different. I use the existing code > as a baseline to confirm behavior and assess readability of the updated > code. > > > > I.e., xlog_write_single() implements a straighforward loop to write out > > > full log vectors. That seems fine, but the outer loop of > > > xlog_write_partial() reimplements nearly the same per-region > > > functionality with some added flexibility to handle op header flags and > > > the special iclog processing associated with the continuation case. The > > > inner loop factors out the continuation iclog management bits and op > > > header injection, which I think is an improvement, but then duplicates > > > region copying (yet again) pretty much only to implement partial copies, > > > which really just involves offset management (i.e., fairly trivial > > > relative to the broader complexity of the function). > > > > > > I dunno. I'd certainly need to stare more at this to cover all of the > > > details, but given the amount of swizzling going on in a single patch > > > I'm kind of wondering if/why we couldn't land on a single iterator in > > > the spirit of xlog_write_partial() in that it primarily iterates on > > > regions and factors out the grotty reservation and continuation > > > management bits, but doesn't unroll as much and leave so much duplicate > > > functionality around. > > > > > > For example, it looks to me that xlog_write_partial() almost nearly > > > already supports a high level algorithm along the lines of the following > > > (pseudocode): > > > > > > xlog_write(len) > > > { > > > get_iclog_space(len) > > > > > > for_each_lv() { > > > for_each_reg() { > > > reg_offset = 0; > > > cont_write: > > > /* write as much as will fit in the iclog, return count, > > > * and set ophdr cont flag based on write result */ > > > reg_offset += write_region(reg, &len, ®_offset, ophdr, ...); > > > > > > /* handle continuation writes */ > > > if (reg_offset != reg->i_len) { > > > get_more_iclog_space(len); > > > /* stamp a WAS_CONT op hdr, set END if rlen fits > > > * into new space, then continue with the same region */ > > > stamp_cont_op_hdr(); > > > goto cont_write; > > > } > > > > > > if (need_more_iclog_space(len)) > > > get_more_iclog_space(len); > > > } > > > } > > > } > > > > Yeah, na. That is exactly the mess that I've just untangled. > > > > I don't want to rewrite this code again, and I don't want it more > > tightly tied to iclogs than it already is - I'm trying to move the > > code towards a common, simple fast path that knows nothing about > > iclogs and a slow path that handles the partial regions and > > obtaining a new buffer to write into. I want the two cases > > completely separate logic, because that makes both cases simpler to > > modify and reason about. > > > > Well, this review has been on the list for more than a couple months > now. Given the response seems to have appeared after the next version of > the series, I'm not sure it's worth digging my head back into the > details to try and make a more detailed argument. Suffice it to say that > I recall what I proposed as intended to be a fairly reasonable > incremental step from what you ended up at to replace the large amount > of resulting duplication with a single implementation that otherwise > preserves the majority of the other cleanups. Not a rewrite or anything > of the sort.. > > In any event, no single one of us is ultimately the authority on > "better" or "simple." I'm just providing feedback that I didn't find the > resulting factoring as a clear improvement, find it a bit annoying to > have to dig through duplicate implementations to locate the subtle and > unnecessary differences, and provided a suggestion on how to address > that concern (that doesn't involve rewriting the thing) with specific > details on how and why I think it improves readability. *shrug* Perhaps > others will look at this, disagree with that assessment and find the > separate functions more straightforward. Admittedly I did look at the: xlog_verify_dest_ptr(log, ptr); memcpy(ptr, reg->i_addr + reg_offset, rlen); xlog_write_adv_cnt(&ptr, len, log_offset, rlen); (*record_cnt)++; *data_cnt += rlen; sprinkled in three places and wondered why that couldn't have been a single function. Eh, well. Leaving the ophdr manipulations as separate clauses actually helps me to figure out /why/ they're different. > > > Indeed, I want xlog_write to move away from iclogs because I want to > > use this code with direct mapped pmem regions, not just fixed memory > > buffers held in iclogs. > > > > That context and how that relates the proposed structure is not clear to > me. That said, I _thought_ I looked through far enough into this series > to grok how intertwined the resulting structure might have been with > subsequent patches in order to provide thoughtful feedback, but I could > be mistaken. > > > IOWs, the code as it stands is a beginning, not an end. And even as > > a beginning, it works, is much better and faster than the current > > code, has been tested for some time now, can be further factored to > > make it simpler, easier to understand and provide infrastructure for > > new features. > > > > > > > That puts the whole thing back into a single high level walk and thus > > > reintroduces the need for some of the continuation vs. non-continuation > > > tracking wrt to the op header and iclog, but ISTM that complexity can be > > > managed by the continuation abstraction you've already started to > > > introduce (as opposed to the current scheme of conditionally > > > accumulating data_cnt). It might even be fine to dump some of the > > > requisite state into a context struct to carry between iclog reservation > > > and copy finish processing rather than pass around so many independent > > > and poorly named variables like the current upstream implementation > > > does, but that's probably getting too deep into the weeds. > > > > > > FWIW, I can also see an approach of moving from the implementation in > > > this patch toward something like the above, but I'm not sure I'd want to > > > subject to the upstream code to that process... > > > > This is exactly what upstream is for - iterative improvement via > > small steps. This is the first step of many, and what you propose > > takes the code in the wrong direction for the steps I've already > > taken and are planning to take. > > > > Perfect is the enemy of good, and if upstream is not the place to > > make iterative improvements like this that build towards a bigger > > picture goal, then where the hell are we supposed to do them? > > > > Not every incremental development step is necessarily a suitable point > for an upstream release. My comment above is basically to say that I > think this refactoring is nearly to that point, but should go a bit > further to reduce the duplication. If the argument against that step is > dependence on future work, then propose the factoring close enough to > that work such that sufficient context is available to review. For a short patchset I agree, but I don't think dumping the /next/ forty patches on the list as an RFC is going to help much. We're keyed to the kernel release cycle, which means (to me anyway) that the criteria is a little different for Gigantic Patchsets that are never going to land in a single cycle. Whereas for small patchsets I think it's reasonable to ask that all the weird warts get fixed by the end of review, for bigger things I think it's ok to lower that standard to "Can we understand it in case the author disappears; and does it not introduce obvious regressions"? I've applied the same principle to this really long story arc of adding parent pointers to the filesystem -- yes, the delayed xattrs series has some strange things in it structurally, but I was ok with only asking for obvious cleanups (like fixing the naming inconsistencies) so that we can get to the next series, which justifies all the slicing and dicing by turning the xattr state machine into a deferred log item. Posting the full set as a git branch somewhere so at least we can pull it and see the even bigger picture might, though. It's helped immensely for reviewing the delayed xattrs series and throwing some early feedback to Allison w.r.t. deferred xattrs. All right, back to the latest posting. --D > > Brian > > > -Dave. > > -- > > Dave Chinner > > david@fromorbit.com > > >
diff --git a/fs/xfs/xfs_log.c b/fs/xfs/xfs_log.c index 590c1e6db475..10916b99bf0f 100644 --- a/fs/xfs/xfs_log.c +++ b/fs/xfs/xfs_log.c @@ -2099,166 +2099,250 @@ xlog_print_trans( } } -static xlog_op_header_t * -xlog_write_setup_ophdr( - struct xlog_op_header *ophdr, - struct xlog_ticket *ticket) -{ - ophdr->oh_clientid = XFS_TRANSACTION; - ophdr->oh_res2 = 0; - ophdr->oh_flags = 0; - return ophdr; -} - /* - * Set up the parameters of the region copy into the log. This has - * to handle region write split across multiple log buffers - this - * state is kept external to this function so that this code can - * be written in an obvious, self documenting manner. + * Write whole log vectors into a single iclog which is guaranteed to have + * either sufficient space for the entire log vector chain to be written or + * exclusive access to the remaining space in the iclog. + * + * Return the number of iovecs and data written into the iclog, as well as + * a pointer to the logvec that doesn't fit in the log (or NULL if we hit the + * end of the chain. */ -static int -xlog_write_setup_copy( +static struct xfs_log_vec * +xlog_write_single( + struct xfs_log_vec *log_vector, struct xlog_ticket *ticket, - struct xlog_op_header *ophdr, - int space_available, - int space_required, - int *copy_off, - int *copy_len, - int *last_was_partial_copy, - int *bytes_consumed) -{ - int still_to_copy; - - still_to_copy = space_required - *bytes_consumed; - *copy_off = *bytes_consumed; - - if (still_to_copy <= space_available) { - /* write of region completes here */ - *copy_len = still_to_copy; - ophdr->oh_len = cpu_to_be32(*copy_len); - if (*last_was_partial_copy) - ophdr->oh_flags |= (XLOG_END_TRANS|XLOG_WAS_CONT_TRANS); - *last_was_partial_copy = 0; - *bytes_consumed = 0; - return 0; - } - - /* partial write of region, needs extra log op header reservation */ - *copy_len = space_available; - ophdr->oh_len = cpu_to_be32(*copy_len); - ophdr->oh_flags |= XLOG_CONTINUE_TRANS; - if (*last_was_partial_copy) - ophdr->oh_flags |= XLOG_WAS_CONT_TRANS; - *bytes_consumed += *copy_len; - (*last_was_partial_copy)++; - - /* account for new log op header */ - ticket->t_curr_res -= sizeof(struct xlog_op_header); - - return sizeof(struct xlog_op_header); -} - -static int -xlog_write_copy_finish( - struct xlog *log, struct xlog_in_core *iclog, - uint flags, - int *record_cnt, - int *data_cnt, - int *partial_copy, - int *partial_copy_len, - int log_offset, - struct xlog_in_core **commit_iclog) + uint32_t *log_offset, + uint32_t *len, + uint32_t *record_cnt, + uint32_t *data_cnt) { - int error; + struct xfs_log_vec *lv = log_vector; + void *ptr; + int index; - if (*partial_copy) { + ASSERT(*log_offset + *len <= iclog->ic_size || + iclog->ic_state == XLOG_STATE_WANT_SYNC); + + ptr = iclog->ic_datap + *log_offset; + for (lv = log_vector; lv; lv = lv->lv_next) { /* - * This iclog has already been marked WANT_SYNC by - * xlog_state_get_iclog_space. + * If the entire log vec does not fit in the iclog, punt it to + * the partial copy loop which can handle this case. */ - spin_lock(&log->l_icloglock); - xlog_state_finish_copy(log, iclog, *record_cnt, *data_cnt); - *record_cnt = 0; - *data_cnt = 0; - goto release_iclog; - } + if (lv->lv_niovecs && + lv->lv_bytes > iclog->ic_size - *log_offset) + break; - *partial_copy = 0; - *partial_copy_len = 0; + /* + * Ordered log vectors have no regions to write so this + * loop will naturally skip them. + */ + for (index = 0; index < lv->lv_niovecs; index++) { + struct xfs_log_iovec *reg = &lv->lv_iovecp[index]; + struct xlog_op_header *ophdr = reg->i_addr; - if (iclog->ic_size - log_offset <= sizeof(xlog_op_header_t)) { - /* no more space in this iclog - push it. */ - spin_lock(&log->l_icloglock); - xlog_state_finish_copy(log, iclog, *record_cnt, *data_cnt); - *record_cnt = 0; - *data_cnt = 0; + ASSERT(reg->i_len % sizeof(int32_t) == 0); + ASSERT((unsigned long)ptr % sizeof(int32_t) == 0); - if (iclog->ic_state == XLOG_STATE_ACTIVE) - xlog_state_switch_iclogs(log, iclog, 0); - else - ASSERT(iclog->ic_state == XLOG_STATE_WANT_SYNC || - iclog->ic_state == XLOG_STATE_IOERROR); - if (!commit_iclog) - goto release_iclog; - spin_unlock(&log->l_icloglock); - ASSERT(flags & XLOG_COMMIT_TRANS); - *commit_iclog = iclog; + ophdr->oh_tid = cpu_to_be32(ticket->t_tid); + ophdr->oh_len = cpu_to_be32(reg->i_len - + sizeof(struct xlog_op_header)); + memcpy(ptr, reg->i_addr, reg->i_len); + xlog_write_adv_cnt(&ptr, len, log_offset, reg->i_len); + (*record_cnt)++; + *data_cnt += reg->i_len; + } } + ASSERT(*len == 0 || lv); + return lv; +} - return 0; +static int +xlog_write_get_more_iclog_space( + struct xlog *log, + struct xlog_ticket *ticket, + struct xlog_in_core **iclogp, + uint32_t *log_offset, + uint32_t len, + uint32_t *record_cnt, + uint32_t *data_cnt, + int *contwr) +{ + struct xlog_in_core *iclog = *iclogp; + int error; -release_iclog: + spin_lock(&log->l_icloglock); + xlog_state_finish_copy(log, iclog, *record_cnt, *data_cnt); + ASSERT(iclog->ic_state == XLOG_STATE_WANT_SYNC || + iclog->ic_state == XLOG_STATE_IOERROR); error = xlog_state_release_iclog(log, iclog); spin_unlock(&log->l_icloglock); - return error; + if (error) + return error; + + error = xlog_state_get_iclog_space(log, len, &iclog, + ticket, contwr, log_offset); + if (error) + return error; + *record_cnt = 0; + *data_cnt = 0; + *iclogp = iclog; + return 0; } /* - * Write log vectors into a single iclog which is guaranteed by the caller - * to have enough space to write the entire log vector into. Return the number - * of log vectors written into the iclog. + * Write log vectors into a single iclog which is smaller than the current chain + * length. We write until we cannot fit a full record into the remaining space + * and then stop. We return the log vector that is to be written that cannot + * wholly fit in the iclog. */ -static int -xlog_write_single( +static struct xfs_log_vec * +xlog_write_partial( + struct xlog *log, struct xfs_log_vec *log_vector, struct xlog_ticket *ticket, - struct xlog_in_core *iclog, - uint32_t log_offset, - uint32_t len) + struct xlog_in_core **iclogp, + uint32_t *log_offset, + uint32_t *len, + uint32_t *record_cnt, + uint32_t *data_cnt, + int *contwr) { + struct xlog_in_core *iclog = *iclogp; struct xfs_log_vec *lv = log_vector; + struct xfs_log_iovec *reg; + struct xlog_op_header *ophdr; void *ptr; int index = 0; - int record_cnt = 0; + uint32_t rlen; + int error; - ASSERT(log_offset + len <= iclog->ic_size); + /* walk the logvec, copying until we run out of space in the iclog */ + ptr = iclog->ic_datap + *log_offset; + for (index = 0; index < lv->lv_niovecs; index++) { + uint32_t reg_offset = 0; + + reg = &lv->lv_iovecp[index]; + ASSERT(reg->i_len % sizeof(int32_t) == 0); - ptr = iclog->ic_datap + log_offset; - for (lv = log_vector; lv; lv = lv->lv_next) { /* - * Ordered log vectors have no regions to write so this - * loop will naturally skip them. + * The first region of a continuation must have a non-zero + * length otherwise log recovery will just skip over it and + * start recovering from the next opheader it finds. Because we + * mark the next opheader as a continuation, recovery will then + * incorrectly add the continuation to the previous region and + * that breaks stuff. + * + * Hence if there isn't space for region data after the + * opheader, then we need to start afresh with a new iclog. */ - for (index = 0; index < lv->lv_niovecs; index++) { - struct xfs_log_iovec *reg = &lv->lv_iovecp[index]; - struct xlog_op_header *ophdr = reg->i_addr; + if (iclog->ic_size - *log_offset <= + sizeof(struct xlog_op_header)) { + error = xlog_write_get_more_iclog_space(log, ticket, + &iclog, log_offset, *len, record_cnt, + data_cnt, contwr); + if (error) + return ERR_PTR(error); + ptr = iclog->ic_datap + *log_offset; + } - ASSERT(reg->i_len % sizeof(int32_t) == 0); - ASSERT((unsigned long)ptr % sizeof(int32_t) == 0); + ophdr = reg->i_addr; + rlen = min_t(uint32_t, reg->i_len, iclog->ic_size - *log_offset); + + ophdr->oh_tid = cpu_to_be32(ticket->t_tid); + ophdr->oh_len = cpu_to_be32(rlen - sizeof(struct xlog_op_header)); + if (rlen != reg->i_len) + ophdr->oh_flags |= XLOG_CONTINUE_TRANS; + ASSERT((unsigned long)ptr % sizeof(int32_t) == 0); + xlog_verify_dest_ptr(log, ptr); + memcpy(ptr, reg->i_addr, rlen); + xlog_write_adv_cnt(&ptr, len, log_offset, rlen); + (*record_cnt)++; + *data_cnt += rlen; + + if (rlen == reg->i_len) + continue; + + /* + * We now have a partially written iovec, but it can span + * multiple iclogs so we loop here. First we release the iclog + * we currently have, then we get a new iclog and add a new + * opheader. Then we continue copying from where we were until + * we either complete the iovec or fill the iclog. If we + * complete the iovec, then we increment the index and go right + * back to the top of the outer loop. if we fill the iclog, we + * run the inner loop again. + * + * This is complicated by the tail of a region using all the + * space in an iclog and hence requiring us to release the iclog + * and get a new one before returning to the outer loop. We must + * always guarantee that we exit this inner loop with at least + * space for log transaction opheaders left in the current + * iclog, hence we cannot just terminate the loop at the end + * of the of the continuation. So we loop while there is no + * space left in the current iclog, and check for the end of the + * continuation after getting a new iclog. + */ + do { + /* + * Account for the continuation opheader before we get + * a new iclog. This is necessary so that we reserve + * space in the iclog for it. + */ + if (ophdr->oh_flags & XLOG_CONTINUE_TRANS) { + *len += sizeof(struct xlog_op_header); + ticket->t_curr_res -= sizeof(struct xlog_op_header); + } + error = xlog_write_get_more_iclog_space(log, ticket, + &iclog, log_offset, *len, record_cnt, + data_cnt, contwr); + if (error) + return ERR_PTR(error); + ptr = iclog->ic_datap + *log_offset; + + ophdr = ptr; ophdr->oh_tid = cpu_to_be32(ticket->t_tid); - ophdr->oh_len = cpu_to_be32(reg->i_len - + ophdr->oh_clientid = XFS_TRANSACTION; + ophdr->oh_res2 = 0; + ophdr->oh_flags = XLOG_WAS_CONT_TRANS; + + xlog_write_adv_cnt(&ptr, len, log_offset, sizeof(struct xlog_op_header)); - memcpy(ptr, reg->i_addr, reg->i_len); - xlog_write_adv_cnt(&ptr, &len, &log_offset, reg->i_len); - record_cnt++; - } + *data_cnt += sizeof(struct xlog_op_header); + + /* + * If rlen fits in the iclog, then end the region + * continuation. Otherwise we're going around again. + */ + reg_offset += rlen; + rlen = reg->i_len - reg_offset; + if (rlen <= iclog->ic_size - *log_offset) + ophdr->oh_flags |= XLOG_END_TRANS; + else + ophdr->oh_flags |= XLOG_CONTINUE_TRANS; + + rlen = min_t(uint32_t, rlen, iclog->ic_size - *log_offset); + ophdr->oh_len = cpu_to_be32(rlen); + + xlog_verify_dest_ptr(log, ptr); + memcpy(ptr, reg->i_addr + reg_offset, rlen); + xlog_write_adv_cnt(&ptr, len, log_offset, rlen); + (*record_cnt)++; + *data_cnt += rlen; + + } while (ophdr->oh_flags & XLOG_CONTINUE_TRANS); } - ASSERT(len == 0); - return record_cnt; -} + /* + * No more iovecs remain in this logvec so return the next log vec to + * the caller so it can go back to fast path copying. + */ + *iclogp = iclog; + return lv->lv_next; +} /* * Write some region out to in-core log @@ -2312,14 +2396,11 @@ xlog_write( { struct xlog_in_core *iclog = NULL; struct xfs_log_vec *lv = log_vector; - struct xfs_log_iovec *vecp = lv->lv_iovecp; - int index = 0; - int partial_copy = 0; - int partial_copy_len = 0; int contwr = 0; int record_cnt = 0; int data_cnt = 0; int error = 0; + int log_offset; if (ticket->t_curr_res < 0) { xfs_alert_tag(log->l_mp, XFS_PTAG_LOGRES, @@ -2328,157 +2409,52 @@ xlog_write( xfs_force_shutdown(log->l_mp, SHUTDOWN_LOG_IO_ERROR); } - if (start_lsn) - *start_lsn = 0; - while (lv && (!lv->lv_niovecs || index < lv->lv_niovecs)) { - void *ptr; - int log_offset; - - error = xlog_state_get_iclog_space(log, len, &iclog, ticket, - &contwr, &log_offset); - if (error) - return error; - - ASSERT(log_offset <= iclog->ic_size - 1); + error = xlog_state_get_iclog_space(log, len, &iclog, ticket, + &contwr, &log_offset); + if (error) + return error; - /* Start_lsn is the first lsn written to. */ - if (start_lsn && !*start_lsn) - *start_lsn = be64_to_cpu(iclog->ic_header.h_lsn); + /* start_lsn is the LSN of the first iclog written to. */ + if (start_lsn) + *start_lsn = be64_to_cpu(iclog->ic_header.h_lsn); - /* - * iclogs containing commit records or unmount records need - * to issue ordering cache flushes and commit immediately - * to stable storage to guarantee journal vs metadata ordering - * is correctly maintained in the storage media. - */ - if (optype & (XLOG_COMMIT_TRANS | XLOG_UNMOUNT_TRANS)) { - iclog->ic_flags |= (XLOG_ICL_NEED_FLUSH | - XLOG_ICL_NEED_FUA); - } + /* + * iclogs containing commit records or unmount records need + * to issue ordering cache flushes and commit immediately + * to stable storage to guarantee journal vs metadata ordering + * is correctly maintained in the storage media. This will always + * fit in the iclog we have been already been passed. + */ + if (optype & (XLOG_COMMIT_TRANS | XLOG_UNMOUNT_TRANS)) { + iclog->ic_flags |= (XLOG_ICL_NEED_FLUSH | XLOG_ICL_NEED_FUA); + ASSERT(!contwr); + } - /* If this is a single iclog write, go fast... */ - if (!contwr && lv == log_vector) { - record_cnt = xlog_write_single(lv, ticket, iclog, - log_offset, len); - len = 0; - data_cnt = len; + while (lv) { + lv = xlog_write_single(lv, ticket, iclog, &log_offset, + &len, &record_cnt, &data_cnt); + if (!lv) break; - } - - /* - * This loop writes out as many regions as can fit in the amount - * of space which was allocated by xlog_state_get_iclog_space(). - */ - ptr = iclog->ic_datap + log_offset; - while (lv && (!lv->lv_niovecs || index < lv->lv_niovecs)) { - struct xfs_log_iovec *reg; - struct xlog_op_header *ophdr; - int copy_len; - int copy_off; - bool ordered = false; - bool added_ophdr = false; - - /* ordered log vectors have no regions to write */ - if (lv->lv_buf_len == XFS_LOG_VEC_ORDERED) { - ASSERT(lv->lv_niovecs == 0); - ordered = true; - goto next_lv; - } - - reg = &vecp[index]; - ASSERT(reg->i_len % sizeof(int32_t) == 0); - ASSERT((unsigned long)ptr % sizeof(int32_t) == 0); - - /* - * Regions always have their ophdr at the start of the - * region, except for: - * - a transaction start which has a start record ophdr - * before the first region ophdr; and - * - the previous region didn't fully fit into an iclog - * so needs a continuation ophdr to prepend the region - * in this new iclog. - */ - ophdr = reg->i_addr; - if (optype && index) { - optype &= ~XLOG_START_TRANS; - } else if (partial_copy) { - ophdr = xlog_write_setup_ophdr(ptr, ticket); - xlog_write_adv_cnt(&ptr, &len, &log_offset, - sizeof(struct xlog_op_header)); - added_ophdr = true; - } - ophdr->oh_tid = cpu_to_be32(ticket->t_tid); - - len += xlog_write_setup_copy(ticket, ophdr, - iclog->ic_size-log_offset, - reg->i_len, - ©_off, ©_len, - &partial_copy, - &partial_copy_len); - xlog_verify_dest_ptr(log, ptr); - - /* - * Wart: need to update length in embedded ophdr not - * to include it's own length. - */ - if (!added_ophdr) { - ophdr->oh_len = cpu_to_be32(copy_len - - sizeof(struct xlog_op_header)); - } - - ASSERT(copy_len > 0); - memcpy(ptr, reg->i_addr + copy_off, copy_len); - xlog_write_adv_cnt(&ptr, &len, &log_offset, copy_len); - - if (added_ophdr) - copy_len += sizeof(struct xlog_op_header); - record_cnt++; - data_cnt += contwr ? copy_len : 0; - - error = xlog_write_copy_finish(log, iclog, optype, - &record_cnt, &data_cnt, - &partial_copy, - &partial_copy_len, - log_offset, - commit_iclog); - if (error) - return error; - - /* - * if we had a partial copy, we need to get more iclog - * space but we don't want to increment the region - * index because there is still more is this region to - * write. - * - * If we completed writing this region, and we flushed - * the iclog (indicated by resetting of the record - * count), then we also need to get more log space. If - * this was the last record, though, we are done and - * can just return. - */ - if (partial_copy) - break; - - if (++index == lv->lv_niovecs) { -next_lv: - lv = lv->lv_next; - index = 0; - if (lv) - vecp = lv->lv_iovecp; - } - if (record_cnt == 0 && !ordered) { - if (!lv) - return 0; - break; - } + ASSERT(!(optype & (XLOG_COMMIT_TRANS | XLOG_UNMOUNT_TRANS))); + lv = xlog_write_partial(log, lv, ticket, &iclog, &log_offset, + &len, &record_cnt, &data_cnt, &contwr); + if (IS_ERR_OR_NULL(lv)) { + error = PTR_ERR_OR_ZERO(lv); + break; } } + ASSERT((len == 0 && !lv) || error); - ASSERT(len == 0); - + /* + * We've already been guaranteed that the last writes will fit inside + * the current iclog, and hence it will already have the space used by + * those writes accounted to it. Hence we do not need to update the + * iclog with the number of bytes written here. + */ + ASSERT(!contwr || XLOG_FORCED_SHUTDOWN(log)); spin_lock(&log->l_icloglock); - xlog_state_finish_copy(log, iclog, record_cnt, data_cnt); + xlog_state_finish_copy(log, iclog, record_cnt, 0); if (commit_iclog) { ASSERT(optype & XLOG_COMMIT_TRANS); *commit_iclog = iclog; @@ -2930,7 +2906,7 @@ xlog_state_get_iclog_space( * xlog_write() algorithm assumes that at least 2 xlog_op_header_t's * can fit into remaining data section. */ - if (iclog->ic_size - iclog->ic_offset < 2*sizeof(xlog_op_header_t)) { + if (iclog->ic_size - iclog->ic_offset < 3*sizeof(xlog_op_header_t)) { int error = 0; xlog_state_switch_iclogs(log, iclog, iclog->ic_size); @@ -3633,11 +3609,12 @@ xlog_verify_iclog( iclog->ic_header.h_cycle_data[idx]); } } - if (clientid != XFS_TRANSACTION && clientid != XFS_LOG) + if (clientid != XFS_TRANSACTION && clientid != XFS_LOG) { xfs_warn(log->l_mp, - "%s: invalid clientid %d op "PTR_FMT" offset 0x%lx", - __func__, clientid, ophead, + "%s: op %d invalid clientid %d op "PTR_FMT" offset 0x%lx", + __func__, i, clientid, ophead, (unsigned long)field_offset); + } /* check length */ p = &ophead->oh_len;