@@ -375,6 +375,15 @@ static inline int llog_next_block(const struct lu_env *env,
return rc;
}
+static inline int llog_max_idx(struct llog_log_hdr *lh)
+{
+ if (OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) &&
+ unlikely(lh->llh_flags & LLOG_F_IS_CAT))
+ return cfs_fail_val;
+ else
+ return LLOG_HDR_BITMAP_SIZE(lh) - 1;
+}
+
/* Determine if a llog plain of a catalog could be skiped based on record
* custom indexes.
* This assumes that indexes follow each other. The number of records to skip
@@ -391,6 +400,13 @@ static inline int llog_is_plain_skipable(struct llog_log_hdr *lh,
return (LLOG_HDR_BITMAP_SIZE(lh) - rec->lrh_index) < (start - curr);
}
+static inline bool llog_cat_is_wrapped(struct llog_handle *cat)
+{
+ struct llog_log_hdr *llh = cat->lgh_hdr;
+
+ return llh->llh_cat_idx >= cat->lgh_last_idx && llh->llh_count > 1;
+}
+
/* llog.c */
int lustre_process_log(struct super_block *sb, char *logname,
struct config_llog_instance *cfg);
@@ -458,6 +458,7 @@
/* was OBD_FAIL_LLOG_CATINFO_NET 0x1309 until 2.3 */
#define OBD_FAIL_MDS_SYNC_CAPA_SL 0x1310
#define OBD_FAIL_SEQ_ALLOC 0x1311
+#define OBD_FAIL_CAT_RECORDS 0x1312
#define OBD_FAIL_PLAIN_RECORDS 0x1319
#define OBD_FAIL_CATALOG_FULL_CHECK 0x131a
@@ -247,7 +247,7 @@ int llog_verify_record(const struct llog_handle *llh, struct llog_rec_hdr *rec)
else if (rec->lrh_len == 0 || rec->lrh_len > chunk_size)
LLOG_ERROR_REC(llh, rec, "bad record len, chunk size is %d",
chunk_size);
- else if (rec->lrh_index >= LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr))
+ else if (rec->lrh_index > llog_max_idx(llh->lgh_hdr))
LLOG_ERROR_REC(llh, rec, "index is too high");
else
return 0;
@@ -292,16 +292,21 @@ static int llog_process_thread(void *arg)
return 0;
}
+ last_index = llog_max_idx(llh);
if (cd) {
- last_called_index = cd->lpcd_first_idx;
+ if (cd->lpcd_first_idx >= llog_max_idx(llh)) {
+ /* End of the indexes -> Nothing to do */
+ rc = 0;
+ goto out;
+ }
index = cd->lpcd_first_idx + 1;
+ last_called_index = cd->lpcd_first_idx;
+ if (cd->lpcd_last_idx > 0 &&
+ cd->lpcd_last_idx <= llog_max_idx(llh))
+ last_index = cd->lpcd_last_idx;
+ else if (cd->lpcd_read_mode & LLOG_READ_MODE_RAW)
+ last_index = loghandle->lgh_last_idx;
}
- if (cd && cd->lpcd_last_idx)
- last_index = cd->lpcd_last_idx;
- else if (cd && (cd->lpcd_read_mode & LLOG_READ_MODE_RAW))
- last_index = loghandle->lgh_last_idx;
- else
- last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
while (rc == 0) {
unsigned int buf_offset = 0;
@@ -174,19 +174,25 @@ static int llog_cat_process_cb(const struct lu_env *env,
struct llog_handle *llh = NULL;
int rc;
+ /* Skip processing of the logs until startcat */
+ if (rec->lrh_index < d->lpd_startcat)
+ return 0;
+
rc = llog_cat_process_common(env, cat_llh, rec, &llh);
if (rc)
goto out;
- if (rec->lrh_index < d->lpd_startcat)
- /* Skip processing of the logs until startcat */
- rc = 0;
- else if (d->lpd_startidx > 0) {
- struct llog_process_cat_data cd;
-
- cd.lpcd_read_mode = LLOG_READ_MODE_NORMAL;
- cd.lpcd_first_idx = d->lpd_startidx;
- cd.lpcd_last_idx = 0;
+ if (d->lpd_startidx > 0) {
+ struct llog_process_cat_data cd = {
+ .lpcd_first_idx = 0,
+ .lpcd_last_idx = 0,
+ .lpcd_read_mode = LLOG_READ_MODE_NORMAL,
+ };
+
+ /* startidx is always associated with a catalog index */
+ if (d->lpd_startcat == rec->lrh_index)
+ cd.lpcd_first_idx = d->lpd_startidx;
+
rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
&cd, false);
/* Continue processing the next log from idx 0 */
@@ -208,57 +214,93 @@ static int llog_cat_process_or_fork(const struct lu_env *env,
void *data, int startcat,
int startidx, bool fork)
{
- struct llog_process_data d;
struct llog_log_hdr *llh = cat_llh->lgh_hdr;
+ struct llog_process_data d;
+ struct llog_process_cat_data cd;
int rc;
LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
d.lpd_data = data;
d.lpd_cb = cb;
- d.lpd_startcat = (startcat == LLOG_CAT_FIRST ? 0 : startcat);
- d.lpd_startidx = startidx;
- if (llh->llh_cat_idx > cat_llh->lgh_last_idx) {
- struct llog_process_cat_data cd = {
- .lpcd_read_mode = LLOG_READ_MODE_NORMAL
- };
+ /* default: start from the oldest record */
+ d.lpd_startidx = 0;
+ d.lpd_startcat = llh->llh_cat_idx + 1;
+ cd.lpcd_first_idx = llh->llh_cat_idx;
+ cd.lpcd_last_idx = 0;
+ cd.lpcd_read_mode = LLOG_READ_MODE_NORMAL;
+
+ if (startcat > 0 && startcat <= llog_max_idx(llh)) {
+ /* start from a custom catalog/llog plain indexes*/
+ d.lpd_startidx = startidx;
+ d.lpd_startcat = startcat;
+ cd.lpcd_first_idx = startcat - 1;
+ } else if (startcat != 0) {
+ CWARN("%s: startcat %d out of range for catlog "DFID"\n",
+ loghandle2name(cat_llh), startcat,
+ PLOGID(&cat_llh->lgh_id));
+ return -EINVAL;
+ }
+
+ startcat = d.lpd_startcat;
+
+ /* if startcat <= lgh_last_idx, we only need to process the first part
+ * of the catalog (from startcat).
+ */
+ if (llog_cat_is_wrapped(cat_llh) && startcat > cat_llh->lgh_last_idx) {
+ int cat_idx_origin = llh->llh_cat_idx;
CWARN("%s: catlog " DFID " crosses index zero\n",
loghandle2name(cat_llh),
PLOGID(&cat_llh->lgh_id));
- /*startcat = 0 is default value for general processing */
- if ((startcat != LLOG_CAT_FIRST &&
- startcat >= llh->llh_cat_idx) || !startcat) {
- /* processing the catalog part at the end */
- cd.lpcd_first_idx = (startcat ? startcat :
- llh->llh_cat_idx);
- cd.lpcd_last_idx = 0;
- rc = llog_process_or_fork(env, cat_llh, cat_cb,
- &d, &cd, fork);
- /* Reset the startcat because it has already reached
- * catalog bottom.
- */
- startcat = 0;
- d.lpd_startcat = 0;
- if (rc != 0)
- return rc;
- }
- /* processing the catalog part at the beginning */
- cd.lpcd_first_idx = (startcat == LLOG_CAT_FIRST) ? 0 : startcat;
- /* Note, the processing will stop at the lgh_last_idx value,
- * and it could be increased during processing. So records
- * between current lgh_last_idx and lgh_last_idx in future
- * would left unprocessed.
- */
- cd.lpcd_last_idx = cat_llh->lgh_last_idx;
+
+ /* processing the catalog part at the end */
rc = llog_process_or_fork(env, cat_llh, cat_cb, &d, &cd, fork);
- } else {
- rc = llog_process_or_fork(env, cat_llh, cat_cb, &d, NULL, fork);
+ if (rc)
+ return rc;
+
+ /* Reset the startcat because it has already reached catalog
+ * bottom.
+ * lgh_last_idx value could be increased during processing. So
+ * we process the remaining of catalog entries to be sure.
+ */
+ d.lpd_startcat = 1;
+ d.lpd_startidx = 0;
+ cd.lpcd_first_idx = 0;
+ cd.lpcd_last_idx = max(cat_idx_origin, cat_llh->lgh_last_idx);
+ } else if (llog_cat_is_wrapped(cat_llh)) {
+ /* only process 1st part -> stop before reaching 2sd part */
+ cd.lpcd_last_idx = llh->llh_cat_idx;
}
+ /* processing the catalog part at the beginning */
+ rc = llog_process_or_fork(env, cat_llh, cat_cb, &d, &cd, fork);
+
return rc;
}
+/**
+ * Process catalog records with a callback
+ *
+ * @note
+ * If "starcat = 0", this is the default processing. "startidx" argument is
+ * ignored and processing begin from the oldest record.
+ * If "startcat > 0", this is a custom starting point. Processing begin with
+ * the llog plain defined in the catalog record at index "startcat". The first
+ * llog plain record to process is at index "startidx + 1".
+ *
+ * @env Lustre environnement
+ * @cat_llh Catalog llog handler
+ * @cb Callback executed for each records (in llog plain files)
+ * @data Callback data argument
+ * @startcat Catalog index of the llog plain to start with.
+ * @startidx Index of the llog plain to start processing. The first
+ * record to process is at startidx + 1.
+ *
+ * RETURN 0 processing successfully completed
+ * LLOG_PROC_BREAK processing was stopped by the callback.
+ * -errno on error.
+ */
int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh,
llog_cb_t cb, void *data, int startcat, int startidx)
{
@@ -2619,11 +2619,6 @@ enum llog_flag {
LLOG_F_EXT_X_OMODE | LLOG_F_EXT_X_XATTR,
};
-/* means first record of catalog */
-enum {
- LLOG_CAT_FIRST = -1,
-};
-
/* On-disk header structure of each log object, stored in little endian order */
#define LLOG_MIN_CHUNK_SIZE 8192
#define LLOG_HEADER_SIZE (96) /* sizeof (llog_log_hdr) +