@@ -1871,7 +1871,20 @@ struct cl_io {
*/
ci_noatime:1,
/* Tell sublayers not to expand LDLM locks requested for this IO */
- ci_lock_no_expand:1;
+ ci_lock_no_expand:1,
+ /**
+ * Set if non-delay RPC should be used for this IO.
+ *
+ * If this file has multiple mirrors, and if the OSTs of the current
+ * mirror is inaccessible, non-delay RPC would error out quickly so
+ * that the upper layer can try to access the next mirror.
+ */
+ ci_ndelay:1;
+ /**
+ * How many times the read has retried before this one.
+ * Set by the top level and consumed by the LOV.
+ */
+ unsigned int ci_ndelay_tried;
/**
* Number of pages owned by this IO. For invariant checking.
*/
@@ -2336,9 +2349,8 @@ static inline int cl_io_is_trunc(const struct cl_io *io)
do { \
typeof(foo_io) __foo_io = (foo_io); \
\
- BUILD_BUG_ON(offsetof(typeof(*__foo_io), base) != 0); \
- memset(&__foo_io->base + 1, 0, \
- sizeof(*__foo_io) - sizeof(__foo_io->base)); \
+ memset(&__foo_io->base, 0, \
+ sizeof(*__foo_io) - offsetof(typeof(*__foo_io), base)); \
} while (0)
/** @} cl_io */
@@ -2385,6 +2397,8 @@ void cl_page_list_del(const struct lu_env *env, struct cl_page_list *plist,
struct cl_page *page);
void cl_page_list_disown(const struct lu_env *env,
struct cl_io *io, struct cl_page_list *plist);
+void cl_page_list_discard(const struct lu_env *env,
+ struct cl_io *io, struct cl_page_list *plist);
void cl_page_list_fini(const struct lu_env *env, struct cl_page_list *plist);
void cl_2queue_init(struct cl_2queue *queue);
@@ -391,6 +391,13 @@
#define ldlm_set_excl(_l) LDLM_SET_FLAG((_l), 1ULL << 55)
#define ldlm_clear_excl(_l) LDLM_CLEAR_FLAG((_l), 1ULL << 55)
+/**
+ * This flags means to use non-delay RPC to send dlm request RPC.
+ */
+#define LDLM_FL_NDELAY 0x0400000000000000ULL /* bit 58 */
+#define ldlm_is_ndelay(_l) LDLM_TEST_FLAG((_l), 1ULL << 58)
+#define ldlm_set_ndelay(_l) LDLM_SET_FLAG((_l), 1ULL << 58)
+
/** l_flags bits marked as "ast" bits */
#define LDLM_FL_AST_MASK (LDLM_FL_FLOCK_DEADLOCK |\
LDLM_FL_DISCARD_DATA)
@@ -587,7 +587,7 @@ int osc_teardown_async_page(const struct lu_env *env, struct osc_object *obj,
int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
struct osc_page *ops);
int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
- struct list_head *list, int cmd, int brw_flags);
+ struct list_head *list, int brw_flags);
int osc_cache_truncate_start(const struct lu_env *env, struct osc_object *obj,
u64 size, struct osc_extent **extp);
void osc_cache_truncate_end(const struct lu_env *env, struct osc_extent *ext);
@@ -927,7 +927,9 @@ struct osc_extent {
/* this extent should be written back asap. set if one of pages is
* called by page WB daemon, or sync write or reading requests.
*/
- oe_urgent:1;
+ oe_urgent:1,
+ /* Non-delay RPC should be used for this extent. */
+ oe_ndelay:1;
/* how many grants allocated for this extent.
* Grant allocated for this extent. There is no grant allocated
* for reading extents and sync write extents.
@@ -474,6 +474,9 @@
/* LMV */
#define OBD_FAIL_UNKNOWN_LMV_STRIPE 0x1901
+/* FLR */
+#define OBD_FAIL_FLR_GLIMPSE_IMMUTABLE 0x1A00
+
/* Assign references to moved code to reduce code changes */
#define OBD_FAIL_PRECHECK(id) CFS_FAIL_PRECHECK(id)
#define OBD_FAIL_CHECK(id) CFS_FAIL_CHECK(id)
@@ -748,6 +748,15 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
DLM_LOCKREQ_OFF, len, (int)sizeof(*body));
}
+ if (*flags & LDLM_FL_NDELAY) {
+ DEBUG_REQ(D_DLMTRACE, req, "enque lock with no delay\n");
+ req->rq_no_resend = req->rq_no_delay = 1;
+ /* probably set a shorter timeout value and handle ETIMEDOUT
+ * in osc_lock_upcall() correctly
+ */
+ /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
+ }
+
/* Dump lock data into the request buffer */
body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
ldlm_lock2desc(lock, &body->lock_desc);
@@ -1155,6 +1155,11 @@ static void ll_io_init(struct cl_io *io, const struct file *file, int write)
}
io->ci_noatime = file_is_noatime(file);
+
+ /* FLR: only use non-delay I/O for read as there is only one
+ * available mirror for write.
+ */
+ io->ci_ndelay = !write;
}
static ssize_t
@@ -1169,6 +1174,7 @@ static void ll_io_init(struct cl_io *io, const struct file *file, int write)
struct cl_io *io;
ssize_t result = 0;
int rc = 0;
+ unsigned int retried = 0;
CDEBUG(D_VFSTRACE, "file: %pD, type: %d ppos: %llu, count: %zu\n",
file, iot, *ppos, count);
@@ -1176,6 +1182,7 @@ static void ll_io_init(struct cl_io *io, const struct file *file, int write)
restart:
io = vvp_env_thread_io(env);
ll_io_init(io, file, iot == CIT_WRITE);
+ io->ci_ndelay_tried = retried;
if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
struct vvp_io *vio = vvp_env_io(env);
@@ -1232,12 +1239,19 @@ static void ll_io_init(struct cl_io *io, const struct file *file, int write)
out:
cl_io_fini(env, io);
+ CDEBUG(D_VFSTRACE,
+ "%s: %d io complete with rc: %d, result: %zd, restart: %d\n",
+ file->f_path.dentry->d_name.name,
+ iot, rc, result, io->ci_need_restart);
+
if ((!rc || rc == -ENODATA) && count > 0 && io->ci_need_restart) {
CDEBUG(D_VFSTRACE,
"%s: restart %s from %lld, count:%zu, result: %zd\n",
file_dentry(file)->d_name.name,
iot == CIT_READ ? "read" : "write",
*ppos, count, result);
+ /* preserve the tried count for FLR */
+ retried = io->ci_ndelay_tried;
goto restart;
}
@@ -187,24 +187,28 @@ int __cl_glimpse_size(struct inode *inode, int agl)
struct cl_io *io = NULL;
int result;
u16 refcheck;
+ int retried = 0;
result = cl_io_get(inode, &env, &io, &refcheck);
if (result <= 0)
return result;
do {
- io->ci_need_restart = 0;
- io->ci_verify_layout = 1;
+ io->ci_ndelay_tried = retried++;
+ io->ci_ndelay = io->ci_verify_layout = 1;
result = cl_io_init(env, io, CIT_GLIMPSE, io->ci_obj);
- if (result > 0)
+ if (result > 0) {
/*
* nothing to do for this io. This currently happens
* when stripe sub-object's are not yet created.
*/
result = io->ci_result;
- else if (result == 0)
+ } else if (result == 0) {
result = cl_glimpse_lock(env, io, inode, io->ci_obj,
agl);
+ if (!agl && result == -EWOULDBLOCK)
+ io->ci_need_restart = 1;
+ }
OBD_FAIL_TIMEOUT(OBD_FAIL_GLIMPSE_DELAY, 2);
cl_io_fini(env, io);
@@ -1100,8 +1100,9 @@ static int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
struct inode *inode = vvp_object_inode(page->cp_obj);
struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
struct ll_readahead_state *ras = &fd->fd_ras;
- struct cl_2queue *queue = &io->ci_queue;
+ struct cl_2queue *queue = &io->ci_queue;
struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct cl_sync_io *anchor = NULL;
struct vvp_page *vpg;
bool uptodate;
int rc = 0;
@@ -1128,6 +1129,10 @@ static int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
cl_page_export(env, page, 1);
cl_page_disown(env, io, page);
} else {
+ anchor = &vvp_env_info(env)->vti_anchor;
+ cl_sync_io_init(anchor, 1);
+ page->cp_sync_io = anchor;
+
cl_page_list_add(&queue->c2_qin, page);
}
@@ -1148,6 +1153,27 @@ static int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
if (!rc)
task_io_account_read(PAGE_SIZE * count);
}
+ if (anchor && !cl_page_is_owned(page, io)) { /* have sent */
+ rc = cl_sync_io_wait(env, anchor, 0);
+
+ cl_page_assume(env, io, page);
+ cl_page_list_del(env, &queue->c2_qout, page);
+
+ if (!PageUptodate(cl_page_vmpage(page))) {
+ /* Failed to read a mirror, discard this page so that
+ * new page can be created with new mirror.
+ *
+ * TODO: this is not needed after page reinit
+ * route is implemented
+ */
+ cl_page_discard(env, io, page);
+ }
+ cl_page_disown(env, io, page);
+ }
+
+ /* TODO: discard all pages until page reinit route is implemented */
+ cl_page_list_discard(env, io, &queue->c2_qin);
+
/*
* Unlock unsent pages in case of error.
*/
@@ -127,6 +127,7 @@ struct vvp_thread_info {
struct cl_lock_descr vti_descr;
struct cl_io vti_io;
struct cl_attr vti_attr;
+ struct cl_sync_io vti_anchor;
};
static inline struct vvp_thread_info *vvp_env_info(const struct lu_env *env)
@@ -281,6 +281,7 @@ static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
struct cl_object *obj = io->ci_obj;
struct vvp_io *vio = cl2vvp_io(env, ios);
struct inode *inode = vvp_object_inode(obj);
+ u32 gen = 0;
int rc;
CLOBINVRNT(env, obj, vvp_object_invariant(obj));
@@ -304,18 +305,41 @@ static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
* block on layout lock hold by the MDT
* as MDT will not send new layout in lvb (see LU-3124)
* we have to explicitly fetch it, all this will be done
- * by ll_layout_refresh()
+ * by ll_layout_refresh().
+ * Even if ll_layout_restore() returns zero, it doesn't mean
+ * that restore has been successful. Therefore it sets
+ * ci_verify_layout so that it will check layout at the end
+ * of this function.
*/
- if (rc == 0) {
- io->ci_restore_needed = 0;
- io->ci_need_restart = 1;
- io->ci_verify_layout = 1;
- } else {
+ if (rc) {
io->ci_restore_needed = 1;
io->ci_need_restart = 0;
io->ci_verify_layout = 0;
io->ci_result = rc;
+ goto out;
+ }
+
+ io->ci_restore_needed = 0;
+
+ /* Even if ll_layout_restore() returns zero, it doesn't mean
+ * that restore has been successful. Therefore it should verify
+ * if there was layout change and restart I/O correspondingly.
+ */
+ ll_layout_refresh(inode, &gen);
+ io->ci_need_restart = vio->vui_layout_gen != gen;
+ if (io->ci_need_restart) {
+ CDEBUG(D_VFSTRACE,
+ DFID" layout changed from %d to %d.\n",
+ PFID(lu_object_fid(&obj->co_lu)),
+ vio->vui_layout_gen, gen);
+ /* today successful restore is the only possible
+ * case
+ */
+ /* restore was done, clear restoring state */
+ clear_bit(LLIF_FILE_RESTORING,
+ &ll_i2info(vvp_object_inode(obj))->lli_flags);
}
+ goto out;
}
/**
@@ -352,11 +376,11 @@ static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
io->ci_result = rc;
if (!rc)
io->ci_need_restart = 1;
+ goto out;
}
- if (!io->ci_ignore_layout && io->ci_verify_layout) {
- u32 gen = 0;
-
+ if (!io->ci_need_restart &&
+ !io->ci_ignore_layout && io->ci_verify_layout) {
/* check layout version */
ll_layout_refresh(inode, &gen);
io->ci_need_restart = vio->vui_layout_gen != gen;
@@ -365,12 +389,11 @@ static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
DFID " layout changed from %d to %d.\n",
PFID(lu_object_fid(&obj->co_lu)),
vio->vui_layout_gen, gen);
- /* today successful restore is the only possible case */
- /* restore was done, clear restoring state */
- clear_bit(LLIF_FILE_RESTORING,
- &ll_i2info(inode)->lli_flags);
}
+ goto out;
}
+out:
+ ;
}
static void vvp_io_fault_fini(const struct lu_env *env,
@@ -740,7 +763,10 @@ static int vvp_io_read_start(const struct lu_env *env,
if (!can_populate_pages(env, io, inode))
return 0;
- result = vvp_prep_size(env, obj, io, pos, tot, &exceed);
+ /* Unless this is reading a sparse file, otherwise the lock has already
+ * been acquired so vvp_prep_size() is an empty op.
+ */
+ result = vvp_prep_size(env, obj, io, pos, cnt, &exceed);
if (result != 0)
return result;
if (exceed != 0)
@@ -765,6 +791,7 @@ static int vvp_io_read_start(const struct lu_env *env,
file_accessed(file);
LASSERT(vio->vui_iocb->ki_pos == pos);
result = generic_file_read_iter(vio->vui_iocb, vio->vui_iter);
+ goto out;
out:
if (result >= 0) {
@@ -267,8 +267,15 @@ static void vvp_page_completion_read(const struct lu_env *env,
if (ioret == 0) {
if (!vpg->vpg_defer_uptodate)
cl_page_export(env, page, 1);
- } else {
+ } else if (vpg->vpg_defer_uptodate) {
vpg->vpg_defer_uptodate = 0;
+ if (ioret == -EWOULDBLOCK) {
+ /* mirror read failed, it needs to destroy the page
+ * because subpage would be from wrong osc when trying
+ * to read from a new mirror
+ */
+ generic_error_remove_page(vmpage->mapping, vmpage);
+ }
}
if (!page->cp_sync_io)
@@ -178,7 +178,7 @@ struct lov_layout_raid0 {
* object. This field is reset to 0 when attributes of
* any sub-object change.
*/
- int lo_attr_valid;
+ bool lo_attr_valid;
/**
* Array of sub-objects. Allocated when top-object is
* created (lov_init_raid0()).
@@ -217,7 +217,9 @@ struct lov_layout_dom {
struct lov_layout_entry {
u32 lle_type;
- struct lu_extent lle_extent;
+ unsigned int lle_valid:1;
+ struct lu_extent *lle_extent;
+ struct lov_stripe_md_entry *lle_lsme;
struct lov_comp_layout_entry_ops *lle_comp_ops;
union {
struct lov_layout_raid0 lle_raid0;
@@ -225,6 +227,18 @@ struct lov_layout_entry {
};
};
+struct lov_mirror_entry {
+ unsigned short lre_mirror_id;
+ unsigned short lre_preferred:1,
+ lre_valid:1; /* set if at least one of components
+ * in this mirror is valid
+ */
+ unsigned short lre_start; /* index to lo_entries, start index of
+ * this mirror
+ */
+ unsigned short lre_end; /* end index of this mirror */
+};
+
/**
* lov-specific file state.
*
@@ -280,9 +294,36 @@ struct lov_object {
} released;
struct lov_layout_composite {
/**
- * Current valid entry count of entries.
+ * flags of lov_comp_md_v1::lcm_flags. Mainly used
+ * by FLR.
+ */
+ u32 lo_flags;
+ /**
+ * For FLR: index of preferred mirror to read.
+ * Preferred mirror is initialized by the preferred
+ * bit of lsme. It can be changed when the preferred
+ * is inaccessible.
+ * In order to make lov_lsm_entry() return the same
+ * mirror in the same IO context, it's only possible
+ * to change the preferred mirror when the
+ * lo_active_ios reaches zero.
+ */
+ int lo_preferred_mirror;
+ /**
+ * For FLR: the lock to protect access to
+ * lo_preferred_mirror.
*/
- unsigned int lo_entry_count;
+ spinlock_t lo_write_lock;
+ /**
+ * For FLR: Number of (valid) mirrors.
+ */
+ unsigned int lo_mirror_count;
+ struct lov_mirror_entry *lo_mirrors;
+ /**
+ * Current entry count of lo_entries, include
+ * invalid entries.
+ */
+ unsigned int lo_entry_count;
struct lov_layout_entry *lo_entries;
} composite;
} u;
@@ -293,10 +334,80 @@ struct lov_object {
struct task_struct *lo_owner;
};
+static inline struct lov_layout_raid0 *lov_r0(struct lov_object *lov, int i)
+{
+ LASSERT(lov->lo_type == LLT_COMP);
+ LASSERTF(i < lov->u.composite.lo_entry_count,
+ "entry %d entry_count %d", i, lov->u.composite.lo_entry_count);
+
+ return &lov->u.composite.lo_entries[i].lle_raid0;
+}
+
+static inline struct lov_stripe_md_entry *lov_lse(struct lov_object *lov, int i)
+{
+ LASSERT(lov->lo_lsm);
+ LASSERT(i < lov->lo_lsm->lsm_entry_count);
+
+ return lov->lo_lsm->lsm_entries[i];
+}
+
+static inline unsigned int lov_flr_state(const struct lov_object *lov)
+{
+ if (lov->lo_type != LLT_COMP)
+ return LCM_FL_NOT_FLR;
+
+ return lov->u.composite.lo_flags & LCM_FL_FLR_MASK;
+}
+
+static inline bool lov_is_flr(const struct lov_object *lov)
+{
+ return lov_flr_state(lov) != LCM_FL_NOT_FLR;
+}
+
+static inline struct lov_layout_entry *lov_entry(struct lov_object *lov, int i)
+{
+ LASSERT(lov->lo_type == LLT_COMP);
+ LASSERTF(i < lov->u.composite.lo_entry_count,
+ "entry %d entry_count %d", i, lov->u.composite.lo_entry_count);
+
+ return &lov->u.composite.lo_entries[i];
+}
+
+#define lov_for_layout_entry(lov, entry, start, end) \
+ for (entry = lov_entry(lov, start); \
+ entry <= lov_entry(lov, end); entry++)
+
#define lov_foreach_layout_entry(lov, entry) \
- for (entry = &lov->u.composite.lo_entries[0]; \
- entry < &lov->u.composite.lo_entries[lov->u.composite.lo_entry_count];\
- entry++)
+ lov_for_layout_entry(lov, entry, 0, \
+ (lov)->u.composite.lo_entry_count - 1)
+
+#define lov_foreach_mirror_layout_entry(lov, entry, lre) \
+ lov_for_layout_entry(lov, entry, (lre)->lre_start, (lre)->lre_end)
+
+static inline struct lov_mirror_entry *
+lov_mirror_entry(struct lov_object *lov, int i)
+{
+ LASSERT(i < lov->u.composite.lo_mirror_count);
+ return &lov->u.composite.lo_mirrors[i];
+}
+
+#define lov_foreach_mirror_entry(lov, lre) \
+ for (lre = lov_mirror_entry(lov, 0); \
+ lre <= lov_mirror_entry(lov, \
+ lov->u.composite.lo_mirror_count - 1);\
+ lre++)
+
+static inline unsigned
+lov_layout_entry_index(struct lov_object *lov, struct lov_layout_entry *entry)
+{
+ struct lov_layout_entry *first = &lov->u.composite.lo_entries[0];
+ unsigned int index = (unsigned int)(entry - first);
+
+ LASSERT(entry >= first);
+ LASSERT(index < lov->u.composite.lo_entry_count);
+
+ return index;
+}
/**
* State lov_lock keeps for each sub-lock.
@@ -412,6 +523,26 @@ struct lov_io_sub {
struct lov_io {
/** super-class */
struct cl_io_slice lis_cl;
+
+ /**
+ * FLR: index to lo_mirrors. Valid only if lov_is_flr() returns true.
+ *
+ * The mirror index of this io. Preserved over cl_io_init()
+ * if io->ci_ndelay_tried is greater than zero.
+ */
+ int lis_mirror_index;
+ /**
+ * FLR: the layout gen when lis_mirror_index was cached. The
+ * mirror index makes sense only when the layout gen doesn't
+ * change.
+ */
+ int lis_mirror_layout_gen;
+
+ /**
+ * fields below this will be initialized in lov_io_init().
+ */
+ unsigned int lis_preserved;
+
/**
* Pointer to the object slice. This is a duplicate of
* lov_io::lis_cl::cis_object.
@@ -518,10 +649,25 @@ struct lu_object *lovsub_object_alloc(const struct lu_env *env,
struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov);
int lov_lsm_entry(const struct lov_stripe_md *lsm, u64 offset);
+int lov_io_layout_at(struct lov_io *lio, u64 offset);
#define lov_foreach_target(lov, var) \
for (var = 0; var < lov_targets_nr(lov); ++var)
+static inline struct lu_extent *lov_io_extent(struct lov_io *io, int i)
+{
+ return &lov_lse(io->lis_object, i)->lsme_extent;
+}
+
+/**
+ * For layout entries within @ext.
+ */
+#define lov_foreach_io_layout(ind, lio, ext) \
+ for (ind = lov_io_layout_at(lio, (ext)->e_start); \
+ ind >= 0 && \
+ lu_extent_is_overlapped(lov_io_extent(lio, ind), ext); \
+ ind = lov_io_layout_at(lio, lov_io_extent(lio, ind)->e_end))
+
/*****************************************************************************
*
* Type conversions.
@@ -690,32 +836,6 @@ static inline struct lov_thread_info *lov_env_info(const struct lu_env *env)
return info;
}
-static inline struct lov_layout_entry *lov_entry(struct lov_object *lov, int i)
-{
- LASSERT(lov->lo_type == LLT_COMP);
- LASSERTF(i < lov->u.composite.lo_entry_count,
- "entry %d entry_count %d", i, lov->u.composite.lo_entry_count);
-
- return &lov->u.composite.lo_entries[i];
-}
-
-static inline struct lov_layout_raid0 *lov_r0(struct lov_object *lov, int i)
-{
- LASSERT(lov->lo_type == LLT_COMP);
- LASSERTF(i < lov->u.composite.lo_entry_count,
- "entry %d entry_count %d", i, lov->u.composite.lo_entry_count);
-
- return &lov->u.composite.lo_entries[i].lle_raid0;
-}
-
-static inline struct lov_stripe_md_entry *lov_lse(struct lov_object *lov, int i)
-{
- LASSERT(lov->lo_lsm);
- LASSERT(i < lov->lo_lsm->lsm_entry_count);
-
- return lov->lo_lsm->lsm_entries[i];
-}
-
/* lov_pack.c */
int lov_getstripe(const struct lu_env *env, struct lov_object *obj,
struct lov_stripe_md *lsm, struct lov_user_md __user *lump,
@@ -87,6 +87,15 @@ static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
}
}
+static inline bool
+is_index_within_mirror(struct lov_object *lov, int index, int mirror_index)
+{
+ struct lov_layout_composite *comp = &lov->u.composite;
+ struct lov_mirror_entry *lre = &comp->lo_mirrors[mirror_index];
+
+ return (index >= lre->lre_start && index <= lre->lre_end);
+}
+
static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
struct lov_io_sub *sub)
{
@@ -104,6 +113,11 @@ static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
!lov_r0(lov, index)->lo_sub[stripe]))
return -EIO;
+ LASSERTF(is_index_within_mirror(lov, index, lio->lis_mirror_index),
+ DFID "iot = %d, index = %d, mirror = %d\n",
+ PFID(lu_object_fid(lov2lu(lov))), io->ci_type, index,
+ lio->lis_mirror_index);
+
/* obtain new environment */
sub->sub_env = cl_env_get(&sub->sub_refcheck);
if (IS_ERR(sub->sub_env))
@@ -121,6 +135,7 @@ static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
sub_io->ci_no_srvlock = io->ci_no_srvlock;
sub_io->ci_noatime = io->ci_noatime;
sub_io->ci_lock_no_expand = io->ci_lock_no_expand;
+ sub_io->ci_ndelay = io->ci_ndelay;
rc = cl_io_sub_init(sub->sub_env, sub_io, io->ci_type, sub_obj);
if (rc < 0)
@@ -193,9 +208,102 @@ static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
return 0;
}
+static int lov_io_mirror_init(struct lov_io *lio, struct lov_object *obj,
+ struct cl_io *io)
+{
+ struct lov_layout_composite *comp = &obj->u.composite;
+ int index;
+ int i;
+
+ if (!lov_is_flr(obj)) {
+ LASSERT(comp->lo_preferred_mirror == 0);
+ lio->lis_mirror_index = comp->lo_preferred_mirror;
+ io->ci_ndelay = 0;
+ return 0;
+ }
+
+ if (io->ci_ndelay_tried == 0 || /* first time to try */
+ /* reset the mirror index if layout has changed */
+ lio->lis_mirror_layout_gen != obj->lo_lsm->lsm_layout_gen) {
+ lio->lis_mirror_layout_gen = obj->lo_lsm->lsm_layout_gen;
+ index = lio->lis_mirror_index = comp->lo_preferred_mirror;
+ } else {
+ index = lio->lis_mirror_index;
+ LASSERT(index >= 0);
+
+ /* move mirror index to the next one */
+ index = (index + 1) % comp->lo_mirror_count;
+ }
+
+ for (i = 0; i < comp->lo_mirror_count; i++) {
+ struct lu_extent ext = { .e_start = lio->lis_pos,
+ .e_end = lio->lis_pos + 1 };
+ struct lov_mirror_entry *lre;
+ struct lov_layout_entry *lle;
+ bool found = false;
+
+ lre = &comp->lo_mirrors[(index + i) % comp->lo_mirror_count];
+ if (!lre->lre_valid)
+ continue;
+
+ lov_foreach_mirror_layout_entry(obj, lle, lre) {
+ if (!lle->lle_valid)
+ continue;
+
+ if (lu_extent_is_overlapped(&ext, lle->lle_extent)) {
+ found = true;
+ break;
+ }
+ }
+
+ if (found) {
+ index = (index + i) % comp->lo_mirror_count;
+ break;
+ }
+ }
+ if (i == comp->lo_mirror_count) {
+ CERROR(DFID ": failed to find a component covering I/O region at %llu\n",
+ PFID(lu_object_fid(lov2lu(obj))), lio->lis_pos);
+
+ dump_lsm(D_ERROR, obj->lo_lsm);
+
+ return -EIO;
+ }
+
+ CDEBUG(D_VFSTRACE,
+ DFID ": flr state: %d, move mirror from %d to %d, have retried: %d, mirror count: %d\n",
+ PFID(lu_object_fid(lov2lu(obj))), lov_flr_state(obj),
+ lio->lis_mirror_index, index, io->ci_ndelay_tried,
+ comp->lo_mirror_count);
+
+ lio->lis_mirror_index = index;
+
+ /* FLR: if all mirrors have been tried once, most likely the network
+ * of this client has been partitioned. We should relinquish CPU for
+ * a while before trying again.
+ */
+ ++io->ci_ndelay_tried;
+ if (io->ci_ndelay && io->ci_ndelay_tried >= comp->lo_mirror_count) {
+ set_current_state(TASK_INTERRUPTIBLE);
+ schedule_timeout(msecs_to_jiffies(MSEC_PER_SEC)); /* 10ms */
+ if (signal_pending(current))
+ return -EINTR;
+
+ /* reset retry counter */
+ io->ci_ndelay_tried = 1;
+ }
+
+ CDEBUG(D_VFSTRACE, "use %sdelayed RPC state for this IO\n",
+ io->ci_ndelay ? "non-" : "");
+
+ return 0;
+}
+
static int lov_io_slice_init(struct lov_io *lio, struct lov_object *obj,
struct cl_io *io)
{
+ int result = 0;
+
io->ci_result = 0;
lio->lis_object = obj;
@@ -260,7 +368,8 @@ static int lov_io_slice_init(struct lov_io *lio, struct lov_object *obj,
lio->lis_pos = 0;
lio->lis_endpos = OBD_OBJECT_EOF;
- if ((obj->lo_lsm->lsm_flags & LCM_FL_FLR_MASK) == LCM_FL_RDONLY)
+ if (lov_flr_state(obj) == LCM_FL_RDONLY &&
+ !OBD_FAIL_CHECK(OBD_FAIL_FLR_GLIMPSE_IMMUTABLE))
return 1; /* SoM is accurate, no need glimpse */
break;
@@ -272,7 +381,8 @@ static int lov_io_slice_init(struct lov_io *lio, struct lov_object *obj,
default:
LBUG();
}
- return 0;
+ result = lov_io_mirror_init(lio, obj, io);
+ return result;
}
static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
@@ -390,7 +500,6 @@ static int lov_io_iter_init(const struct lu_env *env,
struct lov_io *lio = cl2lov_io(env, ios);
struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
struct cl_io *io = ios->cis_io;
- struct lov_layout_entry *le;
struct lov_io_sub *sub;
struct lu_extent ext;
int rc = 0;
@@ -399,20 +508,15 @@ static int lov_io_iter_init(const struct lu_env *env,
ext.e_start = lio->lis_pos;
ext.e_end = lio->lis_endpos;
- index = 0;
- lov_foreach_layout_entry(lio->lis_object, le) {
- struct lov_layout_raid0 *r0 = &le->lle_raid0;
+ lov_foreach_io_layout(index, lio, &ext) {
+ struct lov_layout_raid0 *r0 = lov_r0(lio->lis_object, index);
int stripe;
u64 start;
u64 end;
- index++;
- if (!lu_extent_is_overlapped(&ext, &le->lle_extent))
- continue;
-
CDEBUG(D_VFSTRACE, "component[%d] flags %#x\n",
- index - 1, lsm->lsm_entries[index - 1]->lsme_flags);
- if (!lsm_entry_inited(lsm, index - 1)) {
+ index, lsm->lsm_entries[index]->lsme_flags);
+ if (!lsm_entry_inited(lsm, index)) {
/* truncate IO will trigger write intent as well, and
* it's handled in lov_io_setattr_iter_init()
*/
@@ -429,7 +533,7 @@ static int lov_io_iter_init(const struct lu_env *env,
}
for (stripe = 0; stripe < r0->lo_nr; stripe++) {
- if (!lov_stripe_intersects(lsm, index - 1, stripe,
+ if (!lov_stripe_intersects(lsm, index, stripe,
&ext, &start, &end))
continue;
@@ -444,7 +548,7 @@ static int lov_io_iter_init(const struct lu_env *env,
end = lov_offset_mod(end, 1);
sub = lov_sub_get(env, lio,
- lov_comp_index(index - 1, stripe));
+ lov_comp_index(index, stripe));
if (IS_ERR(sub)) {
rc = PTR_ERR(sub);
break;
@@ -472,7 +576,6 @@ static int lov_io_rw_iter_init(const struct lu_env *env,
const struct cl_io_slice *ios)
{
struct lov_io *lio = cl2lov_io(env, ios);
- struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
struct cl_io *io = ios->cis_io;
u64 start = io->u.ci_rw.crw_pos;
struct lov_stripe_md_entry *lse;
@@ -484,7 +587,7 @@ static int lov_io_rw_iter_init(const struct lu_env *env,
if (cl_io_is_append(io))
return lov_io_iter_init(env, ios);
- index = lov_lsm_entry(lsm, io->u.ci_rw.crw_pos);
+ index = lov_io_layout_at(lio, io->u.ci_rw.crw_pos);
if (index < 0) { /* non-existing layout component */
if (io->ci_type == CIT_READ) {
/* TODO: it needs to detect the next component and
@@ -542,7 +645,7 @@ static int lov_io_setattr_iter_init(const struct lu_env *env,
int index;
if (cl_io_is_trunc(io) && lio->lis_pos > 0) {
- index = lov_lsm_entry(lsm, lio->lis_pos - 1);
+ index = lov_io_layout_at(lio, lio->lis_pos - 1);
/* no entry found for such offset */
if (index < 0) {
io->ci_result = -ENODATA;
@@ -676,7 +779,7 @@ static int lov_io_read_ahead(const struct lu_env *env,
int rc;
offset = cl_offset(obj, start);
- index = lov_lsm_entry(loo->lo_lsm, offset);
+ index = lov_io_layout_at(lio, offset);
if (index < 0 || !lsm_entry_inited(loo->lo_lsm, index))
return -ENODATA;
@@ -715,7 +818,7 @@ static int lov_io_read_ahead(const struct lu_env *env,
ra_end, stripe);
/* boundary of current component */
- ra_end = cl_index(obj, (loff_t)lov_lse(loo, index)->lsme_extent.e_end);
+ ra_end = cl_index(obj, (loff_t)lov_io_extent(lio, index)->e_end);
if (ra_end != CL_PAGE_EOF && ra->cra_end >= ra_end)
ra->cra_end = ra_end - 1;
@@ -1148,8 +1251,8 @@ int lov_io_init_released(const struct lu_env *env, struct cl_object *obj,
LASSERTF(0, "invalid type %d\n", io->ci_type);
result = -EOPNOTSUPP;
break;
- case CIT_MISC:
case CIT_GLIMPSE:
+ case CIT_MISC:
case CIT_FSYNC:
case CIT_LADVISE:
case CIT_DATA_VERSION:
@@ -1184,4 +1287,42 @@ int lov_io_init_released(const struct lu_env *env, struct cl_object *obj,
return result;
}
-/** @} lov */
+/**
+ * Return the index in composite:lo_entries by the file offset
+ */
+int lov_io_layout_at(struct lov_io *lio, u64 offset)
+{
+ struct lov_object *lov = lio->lis_object;
+ struct lov_layout_composite *comp = &lov->u.composite;
+ int start_index = 0;
+ int end_index = comp->lo_entry_count - 1;
+ int i;
+
+ LASSERT(lov->lo_type == LLT_COMP);
+
+ /* This is actual file offset so nothing can cover eof. */
+ if (offset == LUSTRE_EOF)
+ return -1;
+
+ if (lov_is_flr(lov)) {
+ struct lov_mirror_entry *lre;
+
+ LASSERT(lio->lis_mirror_index >= 0);
+
+ lre = &comp->lo_mirrors[lio->lis_mirror_index];
+ start_index = lre->lre_start;
+ end_index = lre->lre_end;
+ }
+
+ for (i = start_index; i <= end_index; i++) {
+ struct lov_layout_entry *lle = lov_entry(lov, i);
+
+ if ((offset >= lle->lle_extent->e_start &&
+ offset < lle->lle_extent->e_end) ||
+ (offset == OBD_OBJECT_EOF &&
+ lle->lle_extent->e_end == OBD_OBJECT_EOF))
+ return i;
+ }
+
+ return -1;
+}
@@ -131,15 +131,9 @@ static struct lov_lock *lov_lock_sub_init(const struct lu_env *env,
ext.e_end = cl_offset(obj, lock->cll_descr.cld_end + 1);
nr = 0;
- for (index = lov_lsm_entry(lov->lo_lsm, ext.e_start);
- index >= 0 && index < lov->lo_lsm->lsm_entry_count; index++) {
+ lov_foreach_io_layout(index, lov_env_io(env), &ext) {
struct lov_layout_raid0 *r0 = lov_r0(lov, index);
- /* assume lsm entries are sorted. */
- if (!lu_extent_is_overlapped(&ext,
- &lov_lse(lov, index)->lsme_extent))
- break;
-
for (i = 0; i < r0->lo_nr; i++) {
if (likely(r0->lo_sub[i]) && /* spare layout */
lov_stripe_intersects(lov->lo_lsm, index, i,
@@ -160,14 +154,9 @@ static struct lov_lock *lov_lock_sub_init(const struct lu_env *env,
lovlck->lls_nr = nr;
nr = 0;
- for (index = lov_lsm_entry(lov->lo_lsm, ext.e_start);
- index >= 0 && index < lov->lo_lsm->lsm_entry_count; index++) {
+ lov_foreach_io_layout(index, lov_env_io(env), &ext) {
struct lov_layout_raid0 *r0 = lov_r0(lov, index);
- /* assume lsm entries are sorted. */
- if (!lu_extent_is_overlapped(&ext,
- &lov_lse(lov, index)->lsme_extent))
- break;
for (i = 0; i < r0->lo_nr; ++i) {
struct lov_lock_sub *lls = &lovlck->lls_sub[nr];
struct cl_lock_descr *descr = &lls->sub_lock.cll_descr;
@@ -437,8 +437,8 @@ static int lov_attr_get_dom(const struct lu_env *env, struct lov_object *lov,
* component end. Alternatively, check that limit on server
* and do not allow size overflow there.
*/
- if (attr->cat_size > lle->lle_extent.e_end)
- attr->cat_size = lle->lle_extent.e_end;
+ if (attr->cat_size > lle->lle_extent->e_end)
+ attr->cat_size = lle->lle_extent->e_end;
attr->cat_kms = attr->cat_size;
@@ -604,19 +604,38 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
union lov_layout_state *state)
{
struct lov_layout_composite *comp = &state->composite;
+ int flr_state = lsm->lsm_flags & LCM_FL_FLR_MASK;
struct lov_layout_entry *lle;
+ struct lov_mirror_entry *lre;
unsigned int entry_count;
unsigned int psz = 0;
+ unsigned int mirror_count;
int result = 0;
- int i;
+ int i, j;
LASSERT(lsm->lsm_entry_count > 0);
LASSERT(!lov->lo_lsm);
lov->lo_lsm = lsm_addref(lsm);
lov->lo_layout_invalid = true;
+ dump_lsm(D_INODE, lsm);
+
entry_count = lsm->lsm_entry_count;
- comp->lo_entry_count = entry_count;
+
+ spin_lock_init(&comp->lo_write_lock);
+ comp->lo_flags = lsm->lsm_flags;
+ comp->lo_mirror_count = lsm->lsm_mirror_count + 1;
+ comp->lo_entry_count = lsm->lsm_entry_count;
+ comp->lo_preferred_mirror = -1;
+
+ if (equi(flr_state == LCM_FL_NOT_FLR, comp->lo_mirror_count > 1))
+ return -EINVAL;
+
+ comp->lo_mirrors = kcalloc(comp->lo_mirror_count,
+ sizeof(*comp->lo_mirrors),
+ GFP_NOFS);
+ if (!comp->lo_mirrors)
+ return -ENOMEM;
comp->lo_entries = kcalloc(entry_count, sizeof(*comp->lo_entries),
GFP_NOFS);
@@ -624,10 +643,13 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
return -ENOMEM;
/* Initiate all entry types and extents data at first */
- for (i = 0; i < entry_count; i++) {
+ for (i = 0, j = 0, mirror_count = 1; i < entry_count; i++) {
+ int mirror_id = 0;
+
lle = &comp->lo_entries[i];
- lle->lle_type = lov_entry_type(lsm->lsm_entries[i]);
+ lle->lle_lsme = lsm->lsm_entries[i];
+ lle->lle_type = lov_entry_type(lle->lle_lsme);
switch (lle->lle_type) {
case LOV_PATTERN_RAID0:
lle->lle_comp_ops = &raid0_ops;
@@ -642,30 +664,99 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
dump_lsm(D_ERROR, lsm);
return -EIO;
}
- lle->lle_extent = lsm->lsm_entries[i]->lsme_extent;
+
+ lle->lle_extent = &lle->lle_lsme->lsme_extent;
+ lle->lle_valid = !(lle->lle_lsme->lsme_flags & LCME_FL_STALE);
+
+ if (flr_state != LCM_FL_NOT_FLR)
+ mirror_id = mirror_id_of(lle->lle_lsme->lsme_id);
+
+ lre = &comp->lo_mirrors[j];
+ if (i > 0) {
+ if (mirror_id == lre->lre_mirror_id) {
+ lre->lre_valid |= lle->lle_valid;
+ lre->lre_end = i;
+ continue;
+ }
+
+ /* new mirror detected, assume that the mirrors
+ * are shorted in layout
+ */
+ ++mirror_count;
+ ++j;
+ if (j >= comp->lo_mirror_count)
+ break;
+
+ lre = &comp->lo_mirrors[j];
+ }
+
+ /* entries must be sorted by mirrors */
+ lre->lre_mirror_id = mirror_id;
+ lre->lre_start = lre->lre_end = i;
+ lre->lre_preferred = (lle->lle_lsme->lsme_flags &
+ LCME_FL_PREFERRED);
+ lre->lre_valid = lle->lle_valid;
+ }
+
+ /* sanity check for FLR */
+ if (mirror_count != comp->lo_mirror_count) {
+ CDEBUG(D_INODE, DFID
+ " doesn't have the # of mirrors it claims, %u/%u\n",
+ PFID(lu_object_fid(lov2lu(lov))), mirror_count,
+ comp->lo_mirror_count + 1);
+
+ result = -EINVAL;
+ goto out;
}
- i = 0;
lov_foreach_layout_entry(lov, lle) {
+ int index = lov_layout_entry_index(lov, lle);
+
/**
* If the component has not been init-ed on MDS side, for
* PFL layout, we'd know that the components beyond this one
* will be dynamically init-ed later on file write/trunc ops.
*/
- if (lsm_entry_inited(lsm, i)) {
- result = lle->lle_comp_ops->lco_init(env, dev, lov, i,
- conf, lle);
- if (result < 0)
- break;
+ if (!lsme_inited(lle->lle_lsme))
+ continue;
- LASSERT(ergo(psz > 0, psz == result));
- psz = result;
- }
- i++;
+ result = lle->lle_comp_ops->lco_init(env, dev, lov, index,
+ conf, lle);
+ if (result < 0)
+ break;
+
+ LASSERT(ergo(psz > 0, psz == result));
+ psz = result;
}
+
if (psz > 0)
cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
+ /* decide the preferred mirror */
+ mirror_count = 0;
+ i = 0;
+ lov_foreach_mirror_entry(lov, lre) {
+ i++;
+ if (!lre->lre_valid)
+ continue;
+
+ mirror_count++; /* valid mirror */
+
+ if (lre->lre_preferred || comp->lo_preferred_mirror < 0)
+ comp->lo_preferred_mirror = i - 1;
+ }
+ if (mirror_count == 0) {
+ CDEBUG(D_INODE, DFID
+ " doesn't have any valid mirrors\n",
+ PFID(lu_object_fid(lov2lu(lov))));
+
+ result = -EINVAL;
+ goto out;
+ }
+
+ LASSERT(comp->lo_preferred_mirror >= 0);
+
+out:
return result > 0 ? 0 : result;
}
@@ -739,6 +830,10 @@ static void lov_fini_composite(const struct lu_env *env,
comp->lo_entries = NULL;
}
+ kfree(comp->lo_mirrors);
+
+ memset(comp, 0, sizeof(*comp));
+
dump_lsm(D_INODE, lov->lo_lsm);
lov_free_memmd(&lov->lo_lsm);
}
@@ -821,24 +916,25 @@ static int lov_attr_get_composite(const struct lu_env *env,
struct lov_object *lov = cl2lov(obj);
struct lov_layout_entry *entry;
int result = 0;
- int index = 0;
attr->cat_size = 0;
attr->cat_blocks = 0;
lov_foreach_layout_entry(lov, entry) {
+ int index = lov_layout_entry_index(lov, entry);
struct cl_attr *lov_attr = NULL;
+ if (!entry->lle_valid)
+ continue;
+
/* PFL: This component has not been init-ed. */
if (!lsm_entry_inited(lov->lo_lsm, index))
- break;
+ continue;
result = entry->lle_comp_ops->lco_getattr(env, lov, index,
entry, &lov_attr);
if (result < 0)
return result;
- index++;
-
if (!lov_attr)
continue;
@@ -861,6 +957,7 @@ static int lov_attr_get_composite(const struct lu_env *env,
if (attr->cat_mtime < lov_attr->cat_mtime)
attr->cat_mtime = lov_attr->cat_mtime;
}
+
return 0;
}
@@ -1051,12 +1148,11 @@ static int lov_layout_change(const struct lu_env *unused,
CDEBUG(D_INODE, DFID "Apply new layout lov %p, type %d\n",
PFID(lu_object_fid(lov2lu(lov))), lov, llt);
- lov->lo_type = LLT_EMPTY;
-
/* page bufsize fixup */
cl_object_header(&lov->lo_cl)->coh_page_bufsize -=
lov_page_slice_fixup(lov, NULL);
+ lov->lo_type = llt;
rc = new_ops->llo_init(env, lov_dev, lov, lsm, conf, state);
if (rc) {
struct obd_device *obd = lov2obd(lov_dev->ld_lov);
@@ -1066,10 +1162,10 @@ static int lov_layout_change(const struct lu_env *unused,
new_ops->llo_delete(env, lov, state);
new_ops->llo_fini(env, lov, state);
/* this file becomes an EMPTY file. */
+ lov->lo_type = LLT_EMPTY;
goto out;
}
- lov->lo_type = llt;
out:
cl_env_put(env, &refcheck);
return rc;
@@ -1218,7 +1314,7 @@ int lov_page_init(const struct lu_env *env, struct cl_object *obj,
int lov_io_init(const struct lu_env *env, struct cl_object *obj,
struct cl_io *io)
{
- CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
+ CL_IO_SLICE_CLEAN(lov_env_io(env), lis_preserved);
CDEBUG(D_INODE, DFID "io %p type %d ignore/verify layout %d/%d\n",
PFID(lu_object_fid(&obj->co_lu)), io, io->ci_type,
@@ -1767,6 +1863,7 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
goto out_fm_local;
}
+ /* TODO: rewrite it with lov_foreach_io_layout() */
for (entry = start_entry; entry <= end_entry; entry++) {
lsme = lsm->lsm_entries[entry];
@@ -82,7 +82,7 @@ int lov_page_init_composite(const struct lu_env *env, struct cl_object *obj,
int rc;
offset = cl_offset(obj, index);
- entry = lov_lsm_entry(loo->lo_lsm, offset);
+ entry = lov_io_layout_at(lio, offset);
if (entry < 0 || !lsm_entry_inited(loo->lo_lsm, entry)) {
/* non-existing layout component */
lov_page_init_empty(env, obj, page, index);
@@ -191,6 +191,9 @@ int cl_io_init(const struct lu_env *env, struct cl_io *io,
{
LASSERT(obj == cl_object_top(obj));
+ /* clear I/O restart from previous instance */
+ io->ci_need_restart = 0;
+
return __cl_io_init(env, io, iot, obj);
}
EXPORT_SYMBOL(cl_io_init);
@@ -722,6 +725,12 @@ int cl_io_loop(const struct lu_env *env, struct cl_io *io)
}
cl_io_iter_fini(env, io);
} while (result == 0 && io->ci_continue);
+
+ if (result == -EWOULDBLOCK && io->ci_ndelay) {
+ io->ci_need_restart = 1;
+ result = 0;
+ }
+
if (result == 0)
result = io->ci_result;
return result < 0 ? result : 0;
@@ -917,8 +926,8 @@ static void cl_page_list_assume(const struct lu_env *env,
/**
* Discards all pages in a queue.
*/
-static void cl_page_list_discard(const struct lu_env *env, struct cl_io *io,
- struct cl_page_list *plist)
+void cl_page_list_discard(const struct lu_env *env, struct cl_io *io,
+ struct cl_page_list *plist)
{
struct cl_page *page;
@@ -926,6 +935,7 @@ static void cl_page_list_discard(const struct lu_env *env, struct cl_io *io,
cl_page_list_for_each(page, plist)
cl_page_discard(env, io, page);
}
+EXPORT_SYMBOL(cl_page_list_discard);
/**
* Initialize dual page queue.
@@ -1916,6 +1916,7 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
if (tmp->oe_srvlock != ext->oe_srvlock ||
!tmp->oe_grants != !ext->oe_grants ||
+ tmp->oe_ndelay != ext->oe_ndelay ||
tmp->oe_no_merge || ext->oe_no_merge)
return 0;
@@ -2604,7 +2605,7 @@ int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
}
int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
- struct list_head *list, int cmd, int brw_flags)
+ struct list_head *list, int brw_flags)
{
struct client_obd *cli = osc_cli(obj);
struct osc_extent *ext;
@@ -2642,7 +2643,7 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
return -ENOMEM;
}
- ext->oe_rw = !!(cmd & OBD_BRW_READ);
+ ext->oe_rw = !!(brw_flags & OBD_BRW_READ);
ext->oe_sync = 1;
ext->oe_no_merge = !can_merge;
ext->oe_urgent = 1;
@@ -2651,6 +2652,7 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
ext->oe_max_end = end;
ext->oe_obj = obj;
ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
+ ext->oe_ndelay = !!(brw_flags & OBD_BRW_NDELAY);
ext->oe_nr_pages = page_count;
ext->oe_mppr = mppr;
list_splice_init(list, &ext->oe_pages);
@@ -2658,7 +2660,7 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
osc_object_lock(obj);
/* Reuse the initial refcount for RPC, don't drop it */
osc_extent_state_set(ext, OES_LOCK_DONE);
- if (cmd & OBD_BRW_WRITE) {
+ if (!ext->oe_rw) { /* write */
list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
osc_update_pending(obj, OBD_BRW_WRITE, page_count);
} else {
@@ -120,7 +120,6 @@ int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios,
struct cl_page_list *qout = &queue->c2_qout;
unsigned int queued = 0;
int result = 0;
- int cmd;
int brw_flags;
unsigned int max_pages;
@@ -132,8 +131,10 @@ int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios,
cli = osc_cli(osc);
max_pages = cli->cl_max_pages_per_rpc;
- cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
brw_flags = osc_io_srvlock(cl2osc_io(env, ios)) ? OBD_BRW_SRVLOCK : 0;
+ brw_flags |= crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
+ if (crt == CRT_READ && ios->cis_io->ci_ndelay)
+ brw_flags |= OBD_BRW_NDELAY;
/*
* NOTE: here @page is a top-level page. This is done to avoid
@@ -187,7 +188,7 @@ int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios,
if (++queued == max_pages) {
queued = 0;
- result = osc_queue_sync_pages(env, osc, &list, cmd,
+ result = osc_queue_sync_pages(env, osc, &list,
brw_flags);
if (result < 0)
break;
@@ -195,7 +196,7 @@ int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios,
}
if (queued > 0)
- result = osc_queue_sync_pages(env, osc, &list, cmd, brw_flags);
+ result = osc_queue_sync_pages(env, osc, &list, brw_flags);
/* Update c/mtime for sync write. LU-7310 */
if (crt == CRT_WRITE && qout->pl_nr > 0 && !result) {
@@ -301,6 +301,8 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh,
NULL, &oscl->ols_lvb);
/* Hide the error. */
rc = 0;
+ } else if (rc < 0 && oscl->ols_flags & LDLM_FL_NDELAY) {
+ rc = -EWOULDBLOCK;
}
if (oscl->ols_owner)
@@ -1167,6 +1169,8 @@ int osc_lock_init(const struct lu_env *env,
oscl->ols_flags |= LDLM_FL_BLOCK_GRANTED;
oscl->ols_glimpse = 1;
}
+ if (io->ci_ndelay && cl_object_same(io->ci_obj, obj))
+ oscl->ols_flags |= LDLM_FL_NDELAY;
osc_lock_build_einfo(env, lock, cl2osc(obj), &oscl->ols_einfo);
cl_lock_slice_add(lock, &oscl->ols_cl, obj, &osc_lock_ops);
@@ -1790,7 +1790,7 @@ static int brw_interpret(const struct lu_env *env,
/* When server return -EINPROGRESS, client should always retry
* regardless of the number of times the bulk was resent already.
*/
- if (osc_recoverable_error(rc)) {
+ if (osc_recoverable_error(rc) && !req->rq_no_delay) {
if (req->rq_import_generation !=
req->rq_import->imp_generation) {
CDEBUG(D_HA,
@@ -1872,7 +1872,8 @@ static int brw_interpret(const struct lu_env *env,
list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
list_del_init(&ext->oe_link);
- osc_extent_finish(env, ext, 1, rc);
+ osc_extent_finish(env, ext, 1,
+ rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
}
LASSERT(list_empty(&aa->aa_exts));
LASSERT(list_empty(&aa->aa_oaps));
@@ -1942,6 +1943,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
int page_count = 0;
bool soft_sync = false;
int grant = 0;
+ bool ndelay = false;
int i;
int rc;
struct ost_body *body;
@@ -1999,6 +2001,8 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
LASSERT(oap->oap_page_off + oap->oap_count ==
PAGE_SIZE);
}
+ if (ext->oe_ndelay)
+ ndelay = true;
}
/* first page in the list */
@@ -2027,6 +2031,13 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
req->rq_memalloc = mem_tight != 0;
oap->oap_request = ptlrpc_request_addref(req);
+ if (ndelay) {
+ req->rq_no_resend = req->rq_no_delay = 1;
+ /* probably set a shorter timeout value.
+ * to handle ETIMEDOUT in brw_interpret() correctly.
+ */
+ /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
+ }
/* Need to update the timestamps after the request is built in case
* we race with setattr (locally or in queue at OST). If OST gets
@@ -1181,6 +1181,10 @@ struct hsm_state_set {
#define OBD_BRW_READ 0x01
#define OBD_BRW_WRITE 0x02
#define OBD_BRW_RWMASK (OBD_BRW_READ | OBD_BRW_WRITE)
+#define OBD_BRW_NDELAY 0x04 /* Non-delay RPC should be issued for
+ * this page. Non-delay RPCs have bit
+ * rq_no_delay set.
+ */
#define OBD_BRW_SYNC 0x08 /* this page is a part of synchronous
* transfer and is not accounted in
* the grant.