@@ -590,6 +590,10 @@ ssize_t max_pages_per_rpc_show(struct kobject *kobj, struct attribute *attr,
char *buf);
ssize_t max_pages_per_rpc_store(struct kobject *kobj, struct attribute *attr,
const char *buffer, size_t count);
+ssize_t short_io_bytes_show(struct kobject *kobj, struct attribute *attr,
+ char *buf);
+ssize_t short_io_bytes_store(struct kobject *kobj, struct attribute *attr,
+ const char *buffer, size_t count);
struct root_squash_info;
int lprocfs_wr_root_squash(const char __user *buffer, unsigned long count,
@@ -245,6 +245,13 @@ static inline bool imp_connect_disp_stripe(struct obd_import *imp)
return ocd->ocd_connect_flags & OBD_CONNECT_DISP_STRIPE;
}
+static inline bool imp_connect_shortio(struct obd_import *imp)
+{
+ struct obd_connect_data *ocd = &imp->imp_connect_data;
+
+ return ocd->ocd_connect_flags & OBD_CONNECT_SHORTIO;
+}
+
static inline int exp_connect_lockahead_old(struct obd_export *exp)
{
return !!(exp_connect_flags(exp) & OBD_CONNECT_LOCKAHEAD_OLD);
@@ -273,9 +273,41 @@
#define MDS_MAXREQSIZE (5 * 1024) /* >= 4736 */
/**
+ * OST_IO_MAXREQSIZE ~=
+ * lustre_msg + ptlrpc_body + obdo + obd_ioobj +
+ * DT_MAX_BRW_PAGES * niobuf_remote
+ *
+ * - single object with 16 pages is 512 bytes
+ * - OST_IO_MAXREQSIZE must be at least 1 page of cookies plus some spillover
+ * - Must be a multiple of 1024
+ */
+#define _OST_MAXREQSIZE_BASE (sizeof(struct lustre_msg) + \
+ sizeof(struct ptlrpc_body) + \
+ sizeof(struct obdo) + \
+ sizeof(struct obd_ioobj) + \
+ sizeof(struct niobuf_remote))
+#define _OST_MAXREQSIZE_SUM (_OST_MAXREQSIZE_BASE + \
+ sizeof(struct niobuf_remote) * \
+ (DT_MAX_BRW_PAGES - 1))
+
+/**
* FIEMAP request can be 4K+ for now
*/
#define OST_MAXREQSIZE (16 * 1024)
+#define OST_IO_MAXREQSIZE max_t(int, OST_MAXREQSIZE, \
+ (((_OST_MAXREQSIZE_SUM - 1) | (1024 - 1)) + 1))
+
+/* Safe estimate of free space in standard RPC, provides upper limit for # of
+ * bytes of i/o to pack in RPC (skipping bulk transfer).
+ */
+#define OST_SHORT_IO_SPACE (OST_IO_MAXREQSIZE - _OST_MAXREQSIZE_BASE)
+
+/* Actual size used for short i/o buffer. Calculation means this:
+ * At least one page (for large PAGE_SIZE), or 16 KiB, but not more
+ * than the available space aligned to a page boundary.
+ */
+#define OBD_MAX_SHORT_IO_BYTES (min(max(PAGE_SIZE, 16UL * 1024UL), \
+ OST_SHORT_IO_SPACE & PAGE_MASK))
/* Macro to hide a typecast. */
#define ptlrpc_req_async_args(req) ((void *)&req->rq_async_args)
@@ -1758,13 +1790,12 @@ static inline int ptlrpc_client_bulk_active(struct ptlrpc_request *req)
int rc;
desc = req->rq_bulk;
+ if (!desc)
+ return 0;
if (req->rq_bulk_deadline > ktime_get_real_seconds())
return 1;
- if (!desc)
- return 0;
-
spin_lock(&desc->bd_lock);
rc = desc->bd_md_count;
spin_unlock(&desc->bd_lock);
@@ -535,6 +535,18 @@ struct osc_page {
unsigned long ops_submit_time;
};
+struct osc_brw_async_args {
+ struct obdo *aa_oa;
+ int aa_requested_nob;
+ int aa_nio_count;
+ u32 aa_page_count;
+ int aa_resends;
+ struct brw_page **aa_ppga;
+ struct client_obd *aa_cli;
+ struct list_head aa_oaps;
+ struct list_head aa_exts;
+};
+
extern struct kmem_cache *osc_lock_kmem;
extern struct kmem_cache *osc_object_kmem;
extern struct kmem_cache *osc_thread_kmem;
@@ -297,6 +297,7 @@ void req_capsule_shrink(struct req_capsule *pill,
extern struct req_msg_field RMF_FIEMAP_KEY;
extern struct req_msg_field RMF_FIEMAP_VAL;
extern struct req_msg_field RMF_OST_ID;
+extern struct req_msg_field RMF_SHORT_IO;
/* MGS config read message format */
extern struct req_msg_field RMF_MGS_CONFIG_BODY;
@@ -248,6 +248,7 @@ struct client_obd {
atomic_t cl_pending_r_pages;
u32 cl_max_pages_per_rpc;
u32 cl_max_rpcs_in_flight;
+ u32 cl_short_io_bytes;
struct obd_histogram cl_read_rpc_hist;
struct obd_histogram cl_write_rpc_hist;
struct obd_histogram cl_read_page_hist;
@@ -205,7 +205,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
OBD_CONNECT_BULK_MBITS | OBD_CONNECT_CKSUM |
OBD_CONNECT_SUBTREE |
OBD_CONNECT_MULTIMODRPCS |
- OBD_CONNECT_GRANT_PARAM | OBD_CONNECT_FLAGS2;
+ OBD_CONNECT_GRANT_PARAM |
+ OBD_CONNECT_SHORTIO | OBD_CONNECT_FLAGS2;
data->ocd_connect_flags2 = 0;
@@ -396,7 +397,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE |
OBD_CONNECT_LAYOUTLOCK |
OBD_CONNECT_PINGLESS | OBD_CONNECT_LFSCK |
- OBD_CONNECT_BULK_MBITS | OBD_CONNECT_FLAGS2;
+ OBD_CONNECT_BULK_MBITS | OBD_CONNECT_SHORTIO |
+ OBD_CONNECT_FLAGS2;
/* The client currently advertises support for OBD_CONNECT_LOCKAHEAD_OLD
* so it can interoperate with an older version of lockahead which was
@@ -1868,3 +1868,58 @@ ssize_t max_pages_per_rpc_store(struct kobject *kobj, struct attribute *attr,
return count;
}
EXPORT_SYMBOL(max_pages_per_rpc_store);
+
+ssize_t short_io_bytes_show(struct kobject *kobj, struct attribute *attr,
+ char *buf)
+{
+ struct obd_device *dev = container_of(kobj, struct obd_device,
+ obd_kset.kobj);
+ struct client_obd *cli = &dev->u.cli;
+ int rc;
+
+ spin_lock(&cli->cl_loi_list_lock);
+ rc = sprintf(buf, "%d\n", cli->cl_short_io_bytes);
+ spin_unlock(&cli->cl_loi_list_lock);
+ return rc;
+}
+EXPORT_SYMBOL(short_io_bytes_show);
+
+/* Used to catch people who think they're specifying pages. */
+#define MIN_SHORT_IO_BYTES 64
+
+ssize_t short_io_bytes_store(struct kobject *kobj, struct attribute *attr,
+ const char *buffer, size_t count)
+{
+ struct obd_device *dev = container_of(kobj, struct obd_device,
+ obd_kset.kobj);
+ struct client_obd *cli = &dev->u.cli;
+ u32 val;
+ int rc;
+
+ rc = lprocfs_climp_check(dev);
+ if (rc)
+ return rc;
+
+ rc = kstrtouint(buffer, 0, &val);
+ if (rc)
+ goto out;
+
+ if (val > OBD_MAX_SHORT_IO_BYTES || val < MIN_SHORT_IO_BYTES) {
+ rc = -ERANGE;
+ goto out;
+ }
+
+ rc = count;
+
+ spin_lock(&cli->cl_loi_list_lock);
+ if (val > (cli->cl_max_pages_per_rpc << PAGE_SHIFT))
+ rc = -ERANGE;
+ else
+ cli->cl_short_io_bytes = val;
+ spin_unlock(&cli->cl_loi_list_lock);
+
+out:
+ up_read(&dev->u.cli.cl_sem);
+ return rc;
+}
+EXPORT_SYMBOL(short_io_bytes_store);
@@ -573,7 +573,9 @@ static ssize_t destroys_in_flight_show(struct kobject *kobj,
atomic_read(&obd->u.cli.cl_destroy_in_flight));
}
LUSTRE_RO_ATTR(destroys_in_flight);
+
LUSTRE_RW_ATTR(max_pages_per_rpc);
+LUSTRE_RW_ATTR(short_io_bytes);
static int osc_unstable_stats_seq_show(struct seq_file *m, void *v)
{
@@ -807,6 +809,7 @@ void lproc_osc_attach_seqstat(struct obd_device *dev)
&lustre_attr_max_dirty_mb.attr,
&lustre_attr_max_pages_per_rpc.attr,
&lustre_attr_max_rpcs_in_flight.attr,
+ &lustre_attr_short_io_bytes.attr,
&lustre_attr_resend_count.attr,
&lustre_attr_ost_conn_uuid.attr,
NULL,
@@ -858,17 +858,28 @@ void osc_lru_unreserve(struct client_obd *cli, unsigned long npages)
* are likely from the same page zone.
*/
static inline void unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
+ struct osc_brw_async_args *aa,
int factor)
{
- int page_count = desc->bd_iov_count;
+ int page_count;
pg_data_t *last = NULL;
int count = 0;
int i;
- LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
+ if (desc) {
+ LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
+ page_count = desc->bd_iov_count;
+ } else {
+ page_count = aa->aa_page_count;
+ }
for (i = 0; i < page_count; i++) {
- pg_data_t *pgdat = page_pgdat(BD_GET_KIOV(desc, i).bv_page);
+ pg_data_t *pgdat;
+
+ if (desc)
+ pgdat = page_pgdat(BD_GET_KIOV(desc, i).bv_page);
+ else
+ pgdat = page_pgdat(aa->aa_ppga[i]->pg);
if (likely(pgdat == last)) {
++count;
@@ -887,14 +898,16 @@ static inline void unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
mod_node_page_state(last, NR_UNSTABLE_NFS, factor * count);
}
-static inline void add_unstable_page_accounting(struct ptlrpc_bulk_desc *desc)
+static inline void add_unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
+ struct osc_brw_async_args *aa)
{
- unstable_page_accounting(desc, 1);
+ unstable_page_accounting(desc, aa, 1);
}
-static inline void dec_unstable_page_accounting(struct ptlrpc_bulk_desc *desc)
+static inline void dec_unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
+ struct osc_brw_async_args *aa)
{
- unstable_page_accounting(desc, -1);
+ unstable_page_accounting(desc, aa, -1);
}
/**
@@ -910,13 +923,20 @@ static inline void dec_unstable_page_accounting(struct ptlrpc_bulk_desc *desc)
*/
void osc_dec_unstable_pages(struct ptlrpc_request *req)
{
+ struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
struct ptlrpc_bulk_desc *desc = req->rq_bulk;
- int page_count = desc->bd_iov_count;
+ int page_count;
long unstable_count;
+ if (desc)
+ page_count = desc->bd_iov_count;
+ else
+ page_count = aa->aa_page_count;
+
LASSERT(page_count >= 0);
- dec_unstable_page_accounting(desc);
+
+ dec_unstable_page_accounting(desc, aa);
unstable_count = atomic_long_sub_return(page_count,
&cli->cl_unstable_count);
@@ -937,15 +957,21 @@ void osc_dec_unstable_pages(struct ptlrpc_request *req)
*/
void osc_inc_unstable_pages(struct ptlrpc_request *req)
{
+ struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
struct ptlrpc_bulk_desc *desc = req->rq_bulk;
- long page_count = desc->bd_iov_count;
+ long page_count;
/* No unstable page tracking */
if (!cli->cl_cache || !cli->cl_cache->ccc_unstable_check)
return;
- add_unstable_page_accounting(desc);
+ if (desc)
+ page_count = desc->bd_iov_count;
+ else
+ page_count = aa->aa_page_count;
+
+ add_unstable_page_accounting(desc, aa);
atomic_long_add(page_count, &cli->cl_unstable_count);
atomic_long_add(page_count, &cli->cl_cache->ccc_unstable_nr);
@@ -62,18 +62,6 @@
static unsigned int osc_reqpool_mem_max = 5;
module_param(osc_reqpool_mem_max, uint, 0444);
-struct osc_brw_async_args {
- struct obdo *aa_oa;
- int aa_requested_nob;
- int aa_nio_count;
- u32 aa_page_count;
- int aa_resends;
- struct brw_page **aa_ppga;
- struct client_obd *aa_cli;
- struct list_head aa_oaps;
- struct list_head aa_exts;
-};
-
struct osc_async_args {
struct obd_info *aa_oi;
};
@@ -1010,7 +998,8 @@ static int check_write_rcs(struct ptlrpc_request *req,
}
}
- if (req->rq_bulk->bd_nob_transferred != requested_nob) {
+ if (req->rq_bulk &&
+ req->rq_bulk->bd_nob_transferred != requested_nob) {
CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
req->rq_bulk->bd_nob_transferred, requested_nob);
return -EPROTO;
@@ -1111,10 +1100,11 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,
struct ost_body *body;
struct obd_ioobj *ioobj;
struct niobuf_remote *niobuf;
- int niocount, i, requested_nob, opc, rc;
+ int niocount, i, requested_nob, opc, rc, short_io_size = 0;
struct osc_brw_async_args *aa;
struct req_capsule *pill;
struct brw_page *pg_prev;
+ void *short_io_buf;
if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
return -ENOMEM; /* Recoverable */
@@ -1144,6 +1134,20 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,
req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
niocount * sizeof(*niobuf));
+ for (i = 0; i < page_count; i++)
+ short_io_size += pga[i]->count;
+
+ /* Check if we can do a short io. */
+ if (!(short_io_size <= cli->cl_short_io_bytes && niocount == 1 &&
+ imp_connect_shortio(cli->cl_import)))
+ short_io_size = 0;
+
+ req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
+ opc == OST_READ ? 0 : short_io_size);
+ if (opc == OST_READ)
+ req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
+ short_io_size);
+
rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
if (rc) {
ptlrpc_request_free(req);
@@ -1152,11 +1156,18 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,
osc_set_io_portal(req);
ptlrpc_at_set_req_timeout(req);
+
/* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
* retry logic
*/
req->rq_no_retry_einprogress = 1;
+ if (short_io_size != 0) {
+ desc = NULL;
+ short_io_buf = NULL;
+ goto no_bulk;
+ }
+
desc = ptlrpc_prep_bulk_imp(
req, page_count,
cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
@@ -1169,7 +1180,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,
goto out;
}
/* NB request now owns desc and will free it when it gets freed */
-
+no_bulk:
body = req_capsule_client_get(pill, &RMF_OST_BODY);
ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
@@ -1185,7 +1196,26 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,
* "max - 1" for old client compatibility sending "0", and also so the
* the actual maximum is a power-of-two number, not one less. LU-1431
*/
- ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
+ if (desc)
+ ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
+ else /* short i/o */
+ ioobj_max_brw_set(ioobj, 0);
+
+ if (short_io_size != 0) {
+ if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
+ body->oa.o_valid |= OBD_MD_FLFLAGS;
+ body->oa.o_flags = 0;
+ }
+ body->oa.o_flags |= OBD_FL_SHORT_IO;
+ CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
+ short_io_size);
+ if (opc == OST_WRITE) {
+ short_io_buf = req_capsule_client_get(pill,
+ &RMF_SHORT_IO);
+ LASSERT(short_io_buf);
+ }
+ }
+
LASSERT(page_count > 0);
pg_prev = pga[0];
for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
@@ -1210,7 +1240,17 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,
LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
(pg->flag & OBD_BRW_SRVLOCK));
- desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
+ if (short_io_size != 0 && opc == OST_WRITE) {
+ unsigned char *ptr = kmap_atomic(pg->pg);
+
+ LASSERT(short_io_size >= requested_nob + pg->count);
+ memcpy(short_io_buf + requested_nob, ptr + poff,
+ pg->count);
+ kunmap_atomic(ptr);
+ } else if (short_io_size == 0) {
+ desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
+ pg->count);
+ }
requested_nob += pg->count;
if (i > 0 && can_merge_pages(pg_prev, pg)) {
@@ -1477,7 +1517,8 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
}
LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
- if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
+ if (req->rq_bulk &&
+ sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
return -EAGAIN;
if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
@@ -1493,8 +1534,14 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
/* The rest of this function executes only for OST_READs */
- /* if unwrap_bulk failed, return -EAGAIN to retry */
- rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
+ if (!req->rq_bulk) {
+ rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
+ RCL_SERVER);
+ LASSERT(rc == req->rq_status);
+ } else {
+ /* if unwrap_bulk failed, return -EAGAIN to retry */
+ rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
+ }
if (rc < 0) {
rc = -EAGAIN;
goto out;
@@ -1506,12 +1553,42 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
return -EPROTO;
}
- if (rc != req->rq_bulk->bd_nob_transferred) {
+ if (req->rq_bulk && rc != req->rq_bulk->bd_nob_transferred) {
CERROR("Unexpected rc %d (%d transferred)\n",
rc, req->rq_bulk->bd_nob_transferred);
return -EPROTO;
}
+ if (!req->rq_bulk) {
+ /* short io */
+ int nob, pg_count, i = 0;
+ unsigned char *buf;
+
+ CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
+ pg_count = aa->aa_page_count;
+ buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
+ rc);
+ nob = rc;
+
+ while (nob > 0 && pg_count > 0) {
+ int count = aa->aa_ppga[i]->count > nob ?
+ nob : aa->aa_ppga[i]->count;
+ unsigned char *ptr;
+
+ CDEBUG(D_CACHE, "page %p count %d\n",
+ aa->aa_ppga[i]->pg, count);
+ ptr = kmap_atomic(aa->aa_ppga[i]->pg);
+ memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
+ count);
+ kunmap_atomic((void *) ptr);
+
+ buf += count;
+ nob -= count;
+ i++;
+ pg_count--;
+ }
+ }
+
if (rc < aa->aa_requested_nob)
handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
@@ -1529,7 +1606,8 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
aa->aa_ppga, OST_READ,
cksum_type);
- if (peer->nid != req->rq_bulk->bd_sender) {
+ if (req->rq_bulk &&
+ peer->nid != req->rq_bulk->bd_sender) {
via = " via ";
router = libcfs_nid2str(req->rq_bulk->bd_sender);
}
@@ -1705,6 +1783,7 @@ static int brw_interpret(const struct lu_env *env,
struct osc_extent *ext;
struct osc_extent *tmp;
struct client_obd *cli = aa->aa_cli;
+ unsigned long transferred = 0;
rc = osc_brw_fini_request(req, rc);
CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
@@ -1798,8 +1877,12 @@ static int brw_interpret(const struct lu_env *env,
LASSERT(list_empty(&aa->aa_exts));
LASSERT(list_empty(&aa->aa_oaps));
+ transferred = (!req->rq_bulk ? /* short io */
+ aa->aa_requested_nob :
+ req->rq_bulk->bd_nob_transferred);
+
osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
- ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
+ ptlrpc_lprocfs_brw(req, transferred);
spin_lock(&cli->cl_loi_list_lock);
/* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
@@ -574,12 +574,14 @@
&RMF_OST_BODY,
&RMF_OBD_IOOBJ,
&RMF_NIOBUF_REMOTE,
- &RMF_CAPA1
+ &RMF_CAPA1,
+ &RMF_SHORT_IO
};
static const struct req_msg_field *ost_brw_read_server[] = {
&RMF_PTLRPC_BODY,
- &RMF_OST_BODY
+ &RMF_OST_BODY,
+ &RMF_SHORT_IO
};
static const struct req_msg_field *ost_brw_write_server[] = {
@@ -1102,6 +1104,10 @@ struct req_msg_field RMF_FIEMAP_VAL =
DEFINE_MSGF("fiemap", 0, -1, lustre_swab_fiemap, NULL);
EXPORT_SYMBOL(RMF_FIEMAP_VAL);
+struct req_msg_field RMF_SHORT_IO =
+ DEFINE_MSGF("short_io", 0, -1, NULL, NULL);
+EXPORT_SYMBOL(RMF_SHORT_IO);
+
struct req_msg_field RMF_HSM_USER_STATE =
DEFINE_MSGF("hsm_user_state", 0, sizeof(struct hsm_user_state),
lustre_swab_hsm_user_state, NULL);