@@ -545,6 +545,7 @@ sfw_test_rpc_fini(struct srpc_client_rpc *rpc)
/* Called with hold of tsi->tsi_lock */
LASSERT(list_empty(&rpc->crpc_list));
+ rpc->crpc_wi.swi_state = SWI_STATE_DONE;
list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
}
@@ -651,6 +652,7 @@ sfw_destroy_test_instance(struct sfw_test_instance *tsi)
struct srpc_client_rpc,
crpc_list)) != NULL) {
list_del(&rpc->crpc_list);
+ swi_cancel_workitem(&rpc->crpc_wi);
kfree(rpc);
}
@@ -937,6 +939,7 @@ sfw_create_test_rpc(struct sfw_test_unit *tsu, struct lnet_process_id peer,
blklen, sfw_test_rpc_done,
sfw_test_rpc_fini, tsu);
} else {
+ swi_cancel_workitem(&rpc->crpc_wi);
srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk,
blklen, sfw_test_rpc_done,
sfw_test_rpc_fini, tsu);
@@ -962,14 +965,20 @@ sfw_run_test(struct swi_workitem *wi)
if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc)) {
LASSERT(!rpc);
+ wi->swi_state = SWI_STATE_DONE;
goto test_done;
}
LASSERT(rpc);
spin_lock(&tsi->tsi_lock);
+ if (wi->swi_state == SWI_STATE_DONE) {
+ spin_unlock(&tsi->tsi_lock);
+ return;
+ }
if (tsi->tsi_stopping) {
+ wi->swi_state = SWI_STATE_DONE;
list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
spin_unlock(&tsi->tsi_lock);
goto test_done;
@@ -979,6 +988,7 @@ sfw_run_test(struct swi_workitem *wi)
tsu->tsu_loop--;
list_add_tail(&rpc->crpc_list, &tsi->tsi_active_rpcs);
+ wi->swi_state = SWI_STATE_RUNNING;
spin_unlock(&tsi->tsi_lock);
spin_lock(&rpc->crpc_lock);
@@ -1021,12 +1031,14 @@ sfw_run_batch(struct sfw_batch *tsb)
atomic_inc(&tsb->bat_nactive);
list_for_each_entry(tsu, &tsi->tsi_units, tsu_list) {
+ int cpt;
+
atomic_inc(&tsi->tsi_nactive);
tsu->tsu_loop = tsi->tsi_loop;
wi = &tsu->tsu_worker;
- swi_init_workitem(wi, sfw_run_test,
- lst_test_wq[lnet_cpt_of_nid(tsu->tsu_dest.nid,
- NULL)]);
+
+ cpt = lnet_cpt_of_nid(tsu->tsu_dest.nid, NULL);
+ swi_init_workitem(wi, sfw_run_test, lst_test_wq[cpt]);
swi_schedule_workitem(wi);
}
}
@@ -1406,14 +1418,15 @@ sfw_create_rpc(struct lnet_process_id peer, int service,
rpc = list_first_entry(&sfw_data.fw_zombie_rpcs,
struct srpc_client_rpc, crpc_list);
list_del(&rpc->crpc_list);
-
- srpc_init_client_rpc(rpc, peer, service, 0, 0,
- done, sfw_client_rpc_fini, priv);
}
-
spin_unlock(&sfw_data.fw_lock);
- if (!rpc) {
+ if (rpc) {
+ /* Ensure that rpc is done */
+ swi_cancel_workitem(&rpc->crpc_wi);
+ srpc_init_client_rpc(rpc, peer, service, 0, 0,
+ done, sfw_client_rpc_fini, priv);
+ } else {
rpc = srpc_create_client_rpc(peer, service,
nbulkiov, bulklen, done,
nbulkiov ? NULL :
@@ -88,7 +88,7 @@ static int
lnet_selftest_init(void)
{
int nscheds;
- int rc;
+ int rc = -ENOMEM;
int i;
rc = libcfs_setup();
@@ -118,11 +118,14 @@ lnet_selftest_init(void)
/* reserve at least one CPU for LND */
nthrs = max(nthrs - 1, 1);
- lst_test_wq[i] = alloc_workqueue("lst_t", WQ_UNBOUND, nthrs);
- if (!lst_test_wq[i]) {
- CWARN("Failed to create CPU partition affinity WI scheduler %d for LST\n",
- i);
- rc = -ENOMEM;
+ lst_test_wq[i] = cfs_cpt_bind_workqueue("lst_t",
+ lnet_cpt_table(), 0,
+ i, nthrs);
+ if (IS_ERR(lst_test_wq[i])) {
+ rc = PTR_ERR(lst_test_wq[i]);
+ CERROR("Failed to create CPU partition affinity WI scheduler %d for LST: rc = %d\n",
+ i, rc);
+ lst_test_wq[i] = NULL;
goto error;
}
@@ -93,8 +93,7 @@ srpc_serv_portal(int svc_id)
}
/* forward ref's */
-void srpc_handle_rpc(struct swi_workitem *wi);
-
+static void srpc_handle_rpc(struct swi_workitem *wi);
void srpc_get_counters(struct srpc_counters *cnt)
{
@@ -295,8 +294,7 @@ srpc_service_init(struct srpc_service *svc)
scd->scd_ev.ev_data = scd;
scd->scd_ev.ev_type = SRPC_REQUEST_RCVD;
- /*
- * NB: don't use lst_serial_wq for adding buffer,
+ /* NB: don't use lst_serial_wq for adding buffer,
* see details in srpc_service_add_buffers()
*/
swi_init_workitem(&scd->scd_buf_wi,
@@ -601,6 +599,7 @@ srpc_add_buffer(struct swi_workitem *wi)
scd->scd_buf_posting--;
}
+ wi->swi_state = SWI_STATE_RUNNING;
spin_unlock(&scd->scd_lock);
}
@@ -933,8 +932,6 @@ srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status)
struct srpc_service *sv = scd->scd_svc;
struct srpc_buffer *buffer;
- LASSERT(status || rpc->srpc_wi.swi_state == SWI_STATE_DONE);
-
rpc->srpc_status = status;
CDEBUG_LIMIT(!status ? D_NET : D_NETERROR,
@@ -969,6 +966,7 @@ srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status)
* Cancel pending schedules and prevent future schedule attempts:
*/
LASSERT(rpc->srpc_ev.ev_fired);
+ rpc->srpc_wi.swi_state = SWI_STATE_DONE;
if (!sv->sv_shuttingdown && !list_empty(&scd->scd_buf_blocked)) {
buffer = list_first_entry(&scd->scd_buf_blocked,
@@ -986,8 +984,7 @@ srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status)
}
/* handles an incoming RPC */
-void
-srpc_handle_rpc(struct swi_workitem *wi)
+static void srpc_handle_rpc(struct swi_workitem *wi)
{
struct srpc_server_rpc *rpc = container_of(wi, struct srpc_server_rpc,
srpc_wi);
@@ -996,20 +993,22 @@ srpc_handle_rpc(struct swi_workitem *wi)
struct srpc_event *ev = &rpc->srpc_ev;
int rc = 0;
- LASSERT(wi == &rpc->srpc_wi);
-
spin_lock(&scd->scd_lock);
+ if (wi->swi_state == SWI_STATE_DONE) {
+ spin_unlock(&scd->scd_lock);
+ return;
+ }
if (sv->sv_shuttingdown || rpc->srpc_aborted) {
+ wi->swi_state = SWI_STATE_DONE;
spin_unlock(&scd->scd_lock);
if (rpc->srpc_bulk)
LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
LNetMDUnlink(rpc->srpc_replymdh);
- if (ev->ev_fired) { /* no more event, OK to finish */
+ if (ev->ev_fired) /* no more event, OK to finish */
srpc_server_rpc_done(rpc, -ESHUTDOWN);
- }
return;
}
@@ -1069,7 +1068,6 @@ srpc_handle_rpc(struct swi_workitem *wi)
if (sv->sv_bulk_ready)
rc = (*sv->sv_bulk_ready) (rpc, rc);
-
if (rc) {
srpc_server_rpc_done(rpc, rc);
return;
@@ -1164,8 +1162,6 @@ srpc_client_rpc_done(struct srpc_client_rpc *rpc, int status)
{
struct swi_workitem *wi = &rpc->crpc_wi;
- LASSERT(status || wi->swi_state == SWI_STATE_DONE);
-
spin_lock(&rpc->crpc_lock);
rpc->crpc_closed = 1;
@@ -1188,6 +1184,7 @@ srpc_client_rpc_done(struct srpc_client_rpc *rpc, int status)
* Cancel pending schedules and prevent future schedule attempts:
*/
LASSERT(!srpc_event_pending(rpc));
+ wi->swi_state = SWI_STATE_DONE;
spin_unlock(&rpc->crpc_lock);
@@ -1214,6 +1211,10 @@ srpc_send_rpc(struct swi_workitem *wi)
do_bulk = rpc->crpc_bulk.bk_niov > 0;
spin_lock(&rpc->crpc_lock);
+ if (wi->swi_state == SWI_STATE_DONE) {
+ spin_unlock(&rpc->crpc_lock);
+ return;
+ }
if (rpc->crpc_aborted) {
spin_unlock(&rpc->crpc_lock);
@@ -126,14 +126,18 @@ enum lnet_selftest_group_nodelist_prop_attrs {
#define LNET_SELFTEST_GROUP_NODELIST_PROP_MAX (__LNET_SELFTEST_GROUP_NODELIST_PROP_MAX_PLUS_ONE - 1)
-#define SWI_STATE_NEWBORN 0
-#define SWI_STATE_REPLY_SUBMITTED 1
-#define SWI_STATE_REPLY_SENT 2
-#define SWI_STATE_REQUEST_SUBMITTED 3
-#define SWI_STATE_REQUEST_SENT 4
-#define SWI_STATE_REPLY_RECEIVED 5
-#define SWI_STATE_BULK_STARTED 6
-#define SWI_STATE_DONE 10
+enum lsr_swi_state {
+ SWI_STATE_DONE = 0,
+ SWI_STATE_NEWBORN,
+ SWI_STATE_REPLY_SUBMITTED,
+ SWI_STATE_REPLY_SENT,
+ SWI_STATE_REQUEST_SUBMITTED,
+ SWI_STATE_REQUEST_SENT,
+ SWI_STATE_REPLY_RECEIVED,
+ SWI_STATE_BULK_STARTED,
+ SWI_STATE_RUNNING,
+ SWI_STATE_PAUSE,
+};
/* forward refs */
struct srpc_service;
@@ -248,9 +252,9 @@ typedef void (*swi_action_t) (struct swi_workitem *);
struct swi_workitem {
struct workqueue_struct *swi_wq;
- struct work_struct swi_work;
- swi_action_t swi_action;
- int swi_state;
+ struct work_struct swi_work;
+ swi_action_t swi_action;
+ enum lsr_swi_state swi_state;
};
/* server-side state of a RPC */
@@ -562,7 +566,6 @@ swi_wi_action(struct work_struct *wi)
struct swi_workitem *swi;
swi = container_of(wi, struct swi_workitem, swi_work);
-
swi->swi_action(swi);
}