[PATCH 19/19] staging/lustre/ptlrpc: make ptlrpcd threads cpt-aware

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



From: Olaf Weber <olaf@xxxxxxx>

On NUMA systems, the placement of worker threads relative to the
memory they use greatly affects performance. The CPT mechanism can be
used to constrain a number of Lustre thread types, and this change
makes it possible to configure the placement of ptlrpcd threads in a
similar manner.

To simplify the code changes, the global structures used to manage
ptlrpcd threads are changed to one per CPT. In particular this means
there will be one ptlrpcd recovery thread per CPT.

To prevent ptlrpcd threads from wandering all over the system, all
ptlrpcd thread are bound to a CPT. Note that some CPT configuration
is always created, but the defaults are not likely to be correct for
a NUMA system. After discussing the options with Liang Zhen we
decided that we would not bind ptlrpcd threads to specific CPUs,
and rather trust the kernel scheduler to migrate ptlrpcd threads.

With all ptlrpcd threads bound to a CPT, but not to specific CPUs,
the load policy mechanism can be radically simplified:

- PDL_POLICY_LOCAL and PDL_POLICY_ROUND are currently identical.
- PDL_POLICY_ROUND, if fully implemented, would cost us the locality
  we are trying to achieve, so most or all calls using this policy
  would have to be changed to PDL_POLICY_LOCAL.
- PDL_POLICY_PREFERRED is not used, and cannot be implemented without
  binding ptlrpcd threads to individual CPUs.
- PDL_POLICY_SAME is rarely used, and cannot be implemented without
  binding ptlrpcd threads to individual CPUs.

The partner mechanism is also updated, because now all ptlrpcd
threads are "bound" threads. The only difference between the various
bind policies, PDB_POLICY_NONE, PDB_POLICY_FULL, PDB_POLICY_PAIR, and
PDB_POLICY_NEIGHBOR, is the number of partner threads. The bind
policy is replaced with a tunable that directly specifies the size of
the groups of ptlrpcd partner threads.

Ensure that the ptlrpc_request_set for a ptlrpcd thread is created on
the same CPT that the thread will work on. When threads are bound to
specific nodes and/or CPUs in a NUMA system, it pays to ensure that
the datastructures used by these threads are also on the same node.

Visible changes:

* ptlrpcd thread names include the CPT number, for example
  "ptlrpcd_02_07". In this case the "07" is relative to the CPT, and
  not a CPU number.

Tunables added:

* ptlrpcd_cpts (string): A CPT string describing the CPU partitions
  that ptlrpcd threads should run on. Used to make ptlrpcd threads
  run on a subset of all CPTs.

* ptlrpcd_per_cpt_max (int): The maximum number of ptlrpcd threads
  to run in a CPT.

* ptlrpcd_partner_group_size (int): The desired number of threads
  in each ptlrpcd partner thread group. Default is 2, corresponding
  to the old PDB_POLICY_PAIR. A negative value makes all ptlrpcd
  threads in a CPT partners of each other.

Tunables obsoleted:

* max_ptlrpcds: The new ptlrcpd_per_cpt_max can be used to obtain the
  same effect.

* ptlrpcd_bind_policy: The new ptlrpcd_partner_group_size can be used
  to obtain the same effect.

Internal interface changes:

* pdb_policy_t and related code have been removed. Groups of partner
  ptlrpcd threads are still created, and all threads in a partner
  group are bound on the same CPT. The ptlrpcd threads bound to a
  CPT are typically divided into several partner groups. The partner
  groups on a CPT all have an equal number of ptlrpcd threads.

* pdl_policy_t and related code have been removed. Since ptlrpcd
  threads are not bound to a specific CPU, all the code that avoids
  scheduling on the current CPU (or attempts to do so) has been
  removed as non-functional. A simplified form of PDL_POLICY_LOCAL
  is kept as the only load policy.

* LIOD_BIND and related code have been removed. All ptlrpcd threads
  are now bound to a CPT, and no additional binding policy is
  implemented.

* ptlrpc_prep_set(): Changed to allocate a ptlrpc_request_set
  on the current CPT.

* ptlrpcd(): If an error is encountered before entering the main loop
  store the error in pc_error before exiting.

* ptlrpcd_start(): Check pc_error to verify that the ptlrpcd thread
  has successfully entered its main loop.

* ptlrpcd_init(): Initialize the struct ptlrpcd_ctl for all threads
  for a CPT before starting any of them. This closes a race during
  startup where a partner thread could reference a non-initialized
  struct ptlrpcd_ctl.

Signed-off-by: Olaf Weber <olaf@xxxxxxx>
Reviewed-on: http://review.whamcloud.com/13972
Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-6325
Reviewed-by: Grégoire Pichon <gregoire.pichon@xxxxxxxx>
Reviewed-by: Stephen Champion <schamp@xxxxxxx>
Reviewed-by: James Simmons <uja.ornl@xxxxxxxxx>
Reviewed-by: Jinshan Xiong <jinshan.xiong@xxxxxxxxx>
Signed-off-by: Oleg Drokin <oleg.drokin@xxxxxxxxx>
---
 drivers/staging/lustre/lustre/include/lustre_net.h |  54 +-
 drivers/staging/lustre/lustre/ldlm/ldlm_request.c  |   8 +-
 drivers/staging/lustre/lustre/mdc/mdc_locks.c      |   2 +-
 drivers/staging/lustre/lustre/mdc/mdc_request.c    |   2 +-
 drivers/staging/lustre/lustre/osc/osc_cache.c      |  28 +-
 .../staging/lustre/lustre/osc/osc_cl_internal.h    |   2 +-
 drivers/staging/lustre/lustre/osc/osc_internal.h   |   2 +-
 drivers/staging/lustre/lustre/osc/osc_request.c    |  41 +-
 drivers/staging/lustre/lustre/ptlrpc/client.c      |  11 +-
 drivers/staging/lustre/lustre/ptlrpc/import.c      |   7 +-
 drivers/staging/lustre/lustre/ptlrpc/pinger.c      |   2 +-
 .../staging/lustre/lustre/ptlrpc/ptlrpc_internal.h |   2 +-
 drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c     | 702 +++++++++++++--------
 13 files changed, 486 insertions(+), 377 deletions(-)

diff --git a/drivers/staging/lustre/lustre/include/lustre_net.h b/drivers/staging/lustre/lustre/include/lustre_net.h
index 313a56c..3b6a2d7 100644
--- a/drivers/staging/lustre/lustre/include/lustre_net.h
+++ b/drivers/staging/lustre/lustre/include/lustre_net.h
@@ -2191,21 +2191,29 @@ struct ptlrpcd_ctl {
 	 */
 	struct lu_env	       pc_env;
 	/**
-	 * Index of ptlrpcd thread in the array.
+	 * CPT the thread is bound on.
 	 */
-	int			 pc_index;
+	int				pc_cpt;
 	/**
-	 * Number of the ptlrpcd's partners.
+	 * Index of ptlrpcd thread in the array.
 	 */
-	int			 pc_npartners;
+	int				pc_index;
 	/**
 	 * Pointer to the array of partners' ptlrpcd_ctl structure.
 	 */
 	struct ptlrpcd_ctl	**pc_partners;
 	/**
+	 * Number of the ptlrpcd's partners.
+	 */
+	int				pc_npartners;
+	/**
 	 * Record the partner index to be processed next.
 	 */
 	int			 pc_cursor;
+	/**
+	 * Error code if the thread failed to fully start.
+	 */
+	int				pc_error;
 };
 
 /* Bits for pc_flags */
@@ -2228,10 +2236,6 @@ enum ptlrpcd_ctl_flags {
 	 * This is a recovery ptlrpc thread.
 	 */
 	LIOD_RECOVERY    = 1 << 3,
-	/**
-	 * The ptlrpcd is bound to some CPU core.
-	 */
-	LIOD_BIND	= 1 << 4,
 };
 
 /**
@@ -2903,43 +2907,11 @@ void ptlrpc_pinger_ir_down(void);
 /** @} */
 int ptlrpc_pinger_suppress_pings(void);
 
-/* ptlrpc daemon bind policy */
-typedef enum {
-	/* all ptlrpcd threads are free mode */
-	PDB_POLICY_NONE	  = 1,
-	/* all ptlrpcd threads are bound mode */
-	PDB_POLICY_FULL	  = 2,
-	/* <free1 bound1> <free2 bound2> ... <freeN boundN> */
-	PDB_POLICY_PAIR	  = 3,
-	/* <free1 bound1> <bound1 free2> ... <freeN boundN> <boundN free1>,
-	 * means each ptlrpcd[X] has two partners: thread[X-1] and thread[X+1].
-	 * If kernel supports NUMA, pthrpcd threads are binded and
-	 * grouped by NUMA node */
-	PDB_POLICY_NEIGHBOR      = 4,
-} pdb_policy_t;
-
-/* ptlrpc daemon load policy
- * It is caller's duty to specify how to push the async RPC into some ptlrpcd
- * queue, but it is not enforced, affected by "ptlrpcd_bind_policy". If it is
- * "PDB_POLICY_FULL", then the RPC will be processed by the selected ptlrpcd,
- * Otherwise, the RPC may be processed by the selected ptlrpcd or its partner,
- * depends on which is scheduled firstly, to accelerate the RPC processing. */
-typedef enum {
-	/* on the same CPU core as the caller */
-	PDL_POLICY_SAME	 = 1,
-	/* within the same CPU partition, but not the same core as the caller */
-	PDL_POLICY_LOCAL	= 2,
-	/* round-robin on all CPU cores, but not the same core as the caller */
-	PDL_POLICY_ROUND	= 3,
-	/* the specified CPU core is preferred, but not enforced */
-	PDL_POLICY_PREFERRED    = 4,
-} pdl_policy_t;
-
 /* ptlrpc/ptlrpcd.c */
 void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force);
 void ptlrpcd_free(struct ptlrpcd_ctl *pc);
 void ptlrpcd_wake(struct ptlrpc_request *req);
-void ptlrpcd_add_req(struct ptlrpc_request *req, pdl_policy_t policy, int idx);
+void ptlrpcd_add_req(struct ptlrpc_request *req);
 void ptlrpcd_add_rqset(struct ptlrpc_request_set *set);
 int ptlrpcd_addref(void);
 void ptlrpcd_decref(void);
diff --git a/drivers/staging/lustre/lustre/ldlm/ldlm_request.c b/drivers/staging/lustre/lustre/ldlm/ldlm_request.c
index 6245a2c..b5ee9bd 100644
--- a/drivers/staging/lustre/lustre/ldlm/ldlm_request.c
+++ b/drivers/staging/lustre/lustre/ldlm/ldlm_request.c
@@ -1212,12 +1212,12 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
 
 		ptlrpc_request_set_replen(req);
 		if (flags & LCF_ASYNC) {
-			ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+			ptlrpcd_add_req(req);
 			sent = count;
 			goto out;
-		} else {
-			rc = ptlrpc_queue_wait(req);
 		}
+
+		rc = ptlrpc_queue_wait(req);
 		if (rc == LUSTRE_ESTALE) {
 			CDEBUG(D_DLMTRACE, "client/server (nid %s) out of sync -- not fatal\n",
 			       libcfs_nid2str(req->rq_import->
@@ -2223,7 +2223,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
 	aa = ptlrpc_req_async_args(req);
 	aa->lock_handle = body->lock_handle[0];
 	req->rq_interpret_reply = (ptlrpc_interpterer_t)replay_lock_interpret;
-	ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+	ptlrpcd_add_req(req);
 
 	return 0;
 }
diff --git a/drivers/staging/lustre/lustre/mdc/mdc_locks.c b/drivers/staging/lustre/lustre/mdc/mdc_locks.c
index e6b3bf9..20da064 100644
--- a/drivers/staging/lustre/lustre/mdc/mdc_locks.c
+++ b/drivers/staging/lustre/lustre/mdc/mdc_locks.c
@@ -1307,7 +1307,7 @@ int mdc_intent_getattr_async(struct obd_export *exp,
 	ga->ga_einfo = einfo;
 
 	req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
-	ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+	ptlrpcd_add_req(req);
 
 	return 0;
 }
diff --git a/drivers/staging/lustre/lustre/mdc/mdc_request.c b/drivers/staging/lustre/lustre/mdc/mdc_request.c
index 204d512..d32ae761 100644
--- a/drivers/staging/lustre/lustre/mdc/mdc_request.c
+++ b/drivers/staging/lustre/lustre/mdc/mdc_request.c
@@ -2639,7 +2639,7 @@ static int mdc_renew_capa(struct obd_export *exp, struct obd_capa *oc,
 	ra->ra_oc = oc;
 	ra->ra_cb = cb;
 	req->rq_interpret_reply = mdc_interpret_renew_capa;
-	ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+	ptlrpcd_add_req(req);
 	return 0;
 }
 
diff --git a/drivers/staging/lustre/lustre/osc/osc_cache.c b/drivers/staging/lustre/lustre/osc/osc_cache.c
index c72035e..62da061 100644
--- a/drivers/staging/lustre/lustre/osc/osc_cache.c
+++ b/drivers/staging/lustre/lustre/osc/osc_cache.c
@@ -1934,7 +1934,7 @@ static int get_write_extents(struct osc_object *obj, struct list_head *rpclist)
 
 static int
 osc_send_write_rpc(const struct lu_env *env, struct client_obd *cli,
-		   struct osc_object *osc, pdl_policy_t pol)
+		   struct osc_object *osc)
 {
 	LIST_HEAD(rpclist);
 	struct osc_extent *ext;
@@ -1986,7 +1986,7 @@ osc_send_write_rpc(const struct lu_env *env, struct client_obd *cli,
 
 	if (!list_empty(&rpclist)) {
 		LASSERT(page_count > 0);
-		rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_WRITE, pol);
+		rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_WRITE);
 		LASSERT(list_empty(&rpclist));
 	}
 
@@ -2006,7 +2006,7 @@ osc_send_write_rpc(const struct lu_env *env, struct client_obd *cli,
  */
 static int
 osc_send_read_rpc(const struct lu_env *env, struct client_obd *cli,
-		  struct osc_object *osc, pdl_policy_t pol)
+		  struct osc_object *osc)
 {
 	struct osc_extent *ext;
 	struct osc_extent *next;
@@ -2033,7 +2033,7 @@ osc_send_read_rpc(const struct lu_env *env, struct client_obd *cli,
 		osc_object_unlock(osc);
 
 		LASSERT(page_count > 0);
-		rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_READ, pol);
+		rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_READ);
 		LASSERT(list_empty(&rpclist));
 
 		osc_object_lock(osc);
@@ -2079,8 +2079,7 @@ static struct osc_object *osc_next_obj(struct client_obd *cli)
 }
 
 /* called with the loi list lock held */
-static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
-			   pdl_policy_t pol)
+static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
 {
 	struct osc_object *osc;
 	int rc = 0;
@@ -2109,7 +2108,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
 		 * do io on writes while there are cache waiters */
 		osc_object_lock(osc);
 		if (osc_makes_rpc(cli, osc, OBD_BRW_WRITE)) {
-			rc = osc_send_write_rpc(env, cli, osc, pol);
+			rc = osc_send_write_rpc(env, cli, osc);
 			if (rc < 0) {
 				CERROR("Write request failed with %d\n", rc);
 
@@ -2133,7 +2132,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
 			}
 		}
 		if (osc_makes_rpc(cli, osc, OBD_BRW_READ)) {
-			rc = osc_send_read_rpc(env, cli, osc, pol);
+			rc = osc_send_read_rpc(env, cli, osc);
 			if (rc < 0)
 				CERROR("Read request failed with %d\n", rc);
 		}
@@ -2149,7 +2148,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli,
 }
 
 static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
-			  struct osc_object *osc, pdl_policy_t pol, int async)
+			  struct osc_object *osc, int async)
 {
 	int rc = 0;
 
@@ -2161,7 +2160,7 @@ static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
 		 * potential stack overrun problem. LU-2859 */
 		atomic_inc(&cli->cl_lru_shrinkers);
 		client_obd_list_lock(&cli->cl_loi_list_lock);
-		osc_check_rpcs(env, cli, pol);
+		osc_check_rpcs(env, cli);
 		client_obd_list_unlock(&cli->cl_loi_list_lock);
 		atomic_dec(&cli->cl_lru_shrinkers);
 	} else {
@@ -2175,14 +2174,13 @@ static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
 static int osc_io_unplug_async(const struct lu_env *env,
 			       struct client_obd *cli, struct osc_object *osc)
 {
-	/* XXX: policy is no use actually. */
-	return osc_io_unplug0(env, cli, osc, PDL_POLICY_ROUND, 1);
+	return osc_io_unplug0(env, cli, osc, 1);
 }
 
 void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
-		   struct osc_object *osc, pdl_policy_t pol)
+		   struct osc_object *osc)
 {
-	(void)osc_io_unplug0(env, cli, osc, pol, 0);
+	(void)osc_io_unplug0(env, cli, osc, 0);
 }
 
 int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
@@ -2922,7 +2920,7 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
 	}
 
 	if (unplug)
-		osc_io_unplug(env, osc_cli(obj), obj, PDL_POLICY_ROUND);
+		osc_io_unplug(env, osc_cli(obj), obj);
 
 	if (hp || discard) {
 		int rc;
diff --git a/drivers/staging/lustre/lustre/osc/osc_cl_internal.h b/drivers/staging/lustre/lustre/osc/osc_cl_internal.h
index 365b278..75bfda6 100644
--- a/drivers/staging/lustre/lustre/osc/osc_cl_internal.h
+++ b/drivers/staging/lustre/lustre/osc/osc_cl_internal.h
@@ -454,7 +454,7 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
 int osc_cache_wait_range(const struct lu_env *env, struct osc_object *obj,
 			 pgoff_t start, pgoff_t end);
 void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
-		   struct osc_object *osc, pdl_policy_t pol);
+		   struct osc_object *osc);
 
 void osc_object_set_contended  (struct osc_object *obj);
 void osc_object_clear_contended(struct osc_object *obj);
diff --git a/drivers/staging/lustre/lustre/osc/osc_internal.h b/drivers/staging/lustre/lustre/osc/osc_internal.h
index 7d0a3e2..448fdf4 100644
--- a/drivers/staging/lustre/lustre/osc/osc_internal.h
+++ b/drivers/staging/lustre/lustre/osc/osc_internal.h
@@ -132,7 +132,7 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
 
 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *cfg);
 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
-		  struct list_head *ext_list, int cmd, pdl_policy_t p);
+		  struct list_head *ext_list, int cmd);
 int osc_lru_shrink(struct client_obd *cli, int target);
 
 extern spinlock_t osc_ast_guard;
diff --git a/drivers/staging/lustre/lustre/osc/osc_request.c b/drivers/staging/lustre/lustre/osc/osc_request.c
index f41f762..9f53627 100644
--- a/drivers/staging/lustre/lustre/osc/osc_request.c
+++ b/drivers/staging/lustre/lustre/osc/osc_request.c
@@ -437,7 +437,7 @@ int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
 	/* do mds to ost setattr asynchronously */
 	if (!rqset) {
 		/* Do not wait for response. */
-		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+		ptlrpcd_add_req(req);
 	} else {
 		req->rq_interpret_reply =
 			(ptlrpc_interpterer_t)osc_setattr_interpret;
@@ -449,7 +449,7 @@ int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
 		sa->sa_cookie = cookie;
 
 		if (rqset == PTLRPCD_SET)
-			ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+			ptlrpcd_add_req(req);
 		else
 			ptlrpc_set_add_req(rqset, req);
 	}
@@ -590,7 +590,7 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
 	sa->sa_upcall = upcall;
 	sa->sa_cookie = cookie;
 	if (rqset == PTLRPCD_SET)
-		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+		ptlrpcd_add_req(req);
 	else
 		ptlrpc_set_add_req(rqset, req);
 
@@ -657,7 +657,7 @@ int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
 	fa->fa_cookie = cookie;
 
 	if (rqset == PTLRPCD_SET)
-		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+		ptlrpcd_add_req(req);
 	else
 		ptlrpc_set_add_req(rqset, req);
 
@@ -826,7 +826,7 @@ static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
 	}
 
 	/* Do not wait for response */
-	ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+	ptlrpcd_add_req(req);
 	return 0;
 }
 
@@ -1718,7 +1718,7 @@ static int osc_brw_redo_request(struct ptlrpc_request *request,
 	 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
 	 * and wait for all of them to be finished. We should inherit request
 	 * set from old request. */
-	ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
+	ptlrpcd_add_req(new_req);
 
 	DEBUG_REQ(D_INFO, new_req, "new request");
 	return 0;
@@ -1859,7 +1859,7 @@ static int brw_interpret(const struct lu_env *env,
 	osc_wake_cache_waiters(cli);
 	client_obd_list_unlock(&cli->cl_loi_list_lock);
 
-	osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
+	osc_io_unplug(env, cli, NULL);
 	return rc;
 }
 
@@ -1869,7 +1869,7 @@ static int brw_interpret(const struct lu_env *env,
  * Extents in the list must be in OES_RPC state.
  */
 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
-		  struct list_head *ext_list, int cmd, pdl_policy_t pol)
+		  struct list_head *ext_list, int cmd)
 {
 	struct ptlrpc_request *req = NULL;
 	struct osc_extent *ext;
@@ -2043,19 +2043,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
 		  page_count, aa, cli->cl_r_in_flight,
 		  cli->cl_w_in_flight);
 
-	/* XXX: Maybe the caller can check the RPC bulk descriptor to
-	 * see which CPU/NUMA node the majority of pages were allocated
-	 * on, and try to assign the async RPC to the CPU core
-	 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
-	 *
-	 * But on the other hand, we expect that multiple ptlrpcd
-	 * threads and the initial write sponsor can run in parallel,
-	 * especially when data checksum is enabled, which is CPU-bound
-	 * operation and single ptlrpcd thread cannot process in time.
-	 * So more ptlrpcd threads sharing BRW load
-	 * (with PDL_POLICY_ROUND) seems better.
-	 */
-	ptlrpcd_add_req(req, pol, -1);
+	ptlrpcd_add_req(req);
 	rc = 0;
 
 out:
@@ -2382,7 +2370,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
 			req->rq_interpret_reply =
 				(ptlrpc_interpterer_t)osc_enqueue_interpret;
 			if (rqset == PTLRPCD_SET)
-				ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+				ptlrpcd_add_req(req);
 			else
 				ptlrpc_set_add_req(rqset, req);
 		} else if (intent) {
@@ -2997,8 +2985,9 @@ static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
 		LASSERT(set != NULL);
 		ptlrpc_set_add_req(set, req);
 		ptlrpc_check_set(NULL, set);
-	} else
-		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+	} else {
+		ptlrpcd_add_req(req);
+	}
 
 	return 0;
 }
@@ -3090,7 +3079,7 @@ static int osc_import_event(struct obd_device *obd,
 			cli = &obd->u.cli;
 			/* all pages go to failing rpcs due to the invalid
 			 * import */
-			osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
+			osc_io_unplug(env, cli, NULL);
 
 			ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
 			cl_env_put(env, &refcheck);
@@ -3162,7 +3151,7 @@ static int brw_queue_work(const struct lu_env *env, void *data)
 
 	CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
 
-	osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
+	osc_io_unplug(env, cli, NULL);
 	return 0;
 }
 
diff --git a/drivers/staging/lustre/lustre/ptlrpc/client.c b/drivers/staging/lustre/lustre/ptlrpc/client.c
index 90b24fc..e1830fe 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/client.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/client.c
@@ -844,14 +844,17 @@ ptlrpc_prep_req(struct obd_import *imp, __u32 version, int opcode, int count,
 EXPORT_SYMBOL(ptlrpc_prep_req);
 
 /**
- * Allocate and initialize new request set structure.
+ * Allocate and initialize new request set structure on the current CPT.
  * Returns a pointer to the newly allocated set structure or NULL on error.
  */
 struct ptlrpc_request_set *ptlrpc_prep_set(void)
 {
 	struct ptlrpc_request_set *set;
+	int cpt;
 
-	set = kzalloc(sizeof(*set), GFP_NOFS);
+	cpt = cfs_cpt_current(cfs_cpt_table, 0);
+	set = kzalloc_node(sizeof(*set), GFP_NOFS,
+			   cfs_cpt_spread_node(cfs_cpt_table, cpt));
 	if (!set)
 		return NULL;
 	atomic_set(&set->set_refcount, 1);
@@ -2827,7 +2830,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
 	atomic_inc(&req->rq_import->imp_replay_inflight);
 	ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
 
-	ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+	ptlrpcd_add_req(req);
 	return 0;
 }
 EXPORT_SYMBOL(ptlrpc_replay_req);
@@ -3033,7 +3036,7 @@ static void ptlrpcd_add_work_req(struct ptlrpc_request *req)
 	req->rq_xid		= ptlrpc_next_xid();
 	req->rq_import_generation = req->rq_import->imp_generation;
 
-	ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+	ptlrpcd_add_req(req);
 }
 
 static int work_interpreter(const struct lu_env *env,
diff --git a/drivers/staging/lustre/lustre/ptlrpc/import.c b/drivers/staging/lustre/lustre/ptlrpc/import.c
index f5b3245..c52ceef 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/import.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/import.c
@@ -742,12 +742,11 @@ int ptlrpc_connect_import(struct obd_import *imp)
 
 	DEBUG_REQ(D_RPCTRACE, request, "(re)connect request (timeout %d)",
 		  request->rq_timeout);
-	ptlrpcd_add_req(request, PDL_POLICY_ROUND, -1);
+	ptlrpcd_add_req(request);
 	rc = 0;
 out:
-	if (rc != 0) {
+	if (rc != 0)
 		IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
-	}
 
 	return rc;
 }
@@ -1257,7 +1256,7 @@ static int signal_completed_replay(struct obd_import *imp)
 		req->rq_timeout *= 3;
 	req->rq_interpret_reply = completed_replay_interpret;
 
-	ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+	ptlrpcd_add_req(req);
 	return 0;
 }
 
diff --git a/drivers/staging/lustre/lustre/ptlrpc/pinger.c b/drivers/staging/lustre/lustre/ptlrpc/pinger.c
index f8edb79..d3aea4a 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/pinger.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/pinger.c
@@ -105,7 +105,7 @@ static int ptlrpc_ping(struct obd_import *imp)
 
 	DEBUG_REQ(D_INFO, req, "pinging %s->%s",
 		  imp->imp_obd->obd_uuid.uuid, obd2cli_tgt(imp->imp_obd));
-	ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+	ptlrpcd_add_req(req);
 
 	return 0;
 }
diff --git a/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_internal.h b/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_internal.h
index 6dc3998..1d64ca7 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_internal.h
+++ b/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_internal.h
@@ -50,7 +50,7 @@ extern struct mutex ptlrpc_all_services_mutex;
 
 int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait);
 /* ptlrpcd.c */
-int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc);
+int ptlrpcd_start(struct ptlrpcd_ctl *pc);
 
 /* client.c */
 struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw,
diff --git a/drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c b/drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c
index 17cc81d..00efdbf 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c
@@ -67,22 +67,94 @@
 
 #include "ptlrpc_internal.h"
 
+/* One of these per CPT. */
 struct ptlrpcd {
 	int pd_size;
 	int pd_index;
+	int pd_cpt;
+	int pd_cursor;
 	int pd_nthreads;
-	struct ptlrpcd_ctl pd_thread_rcv;
+	int pd_groupsize;
 	struct ptlrpcd_ctl pd_threads[0];
 };
 
+/*
+ * max_ptlrpcds is obsolete, but retained to ensure that the kernel
+ * module will load on a system where it has been tuned.
+ * A value other than 0 implies it was tuned, in which case the value
+ * is used to derive a setting for ptlrpcd_per_cpt_max.
+ */
 static int max_ptlrpcds;
 module_param(max_ptlrpcds, int, 0644);
 MODULE_PARM_DESC(max_ptlrpcds, "Max ptlrpcd thread count to be started.");
 
-static int ptlrpcd_bind_policy = PDB_POLICY_PAIR;
+/*
+ * ptlrpcd_bind_policy is obsolete, but retained to ensure that
+ * the kernel module will load on a system where it has been tuned.
+ * A value other than 0 implies it was tuned, in which case the value
+ * is used to derive a setting for ptlrpcd_partner_group_size.
+ */
+static int ptlrpcd_bind_policy;
 module_param(ptlrpcd_bind_policy, int, 0644);
-MODULE_PARM_DESC(ptlrpcd_bind_policy, "Ptlrpcd threads binding mode.");
-static struct ptlrpcd *ptlrpcds;
+MODULE_PARM_DESC(ptlrpcd_bind_policy,
+		 "Ptlrpcd threads binding mode (obsolete).");
+
+/*
+ * ptlrpcd_per_cpt_max: The maximum number of ptlrpcd threads to run
+ * in a CPT.
+ */
+static int ptlrpcd_per_cpt_max;
+module_param(ptlrpcd_per_cpt_max, int, 0644);
+MODULE_PARM_DESC(ptlrpcd_per_cpt_max,
+		 "Max ptlrpcd thread count to be started per cpt.");
+
+/*
+ * ptlrpcd_partner_group_size: The desired number of threads in each
+ * ptlrpcd partner thread group. Default is 2, corresponding to the
+ * old PDB_POLICY_PAIR. A negative value makes all ptlrpcd threads in
+ * a CPT partners of each other.
+ */
+static int ptlrpcd_partner_group_size;
+module_param(ptlrpcd_partner_group_size, int, 0644);
+MODULE_PARM_DESC(ptlrpcd_partner_group_size,
+		 "Number of ptlrpcd threads in a partner group.");
+
+/*
+ * ptlrpcd_cpts: A CPT string describing the CPU partitions that
+ * ptlrpcd threads should run on. Used to make ptlrpcd threads run on
+ * a subset of all CPTs.
+ *
+ * ptlrpcd_cpts=2
+ * ptlrpcd_cpts=[2]
+ *   run ptlrpcd threads only on CPT 2.
+ *
+ * ptlrpcd_cpts=0-3
+ * ptlrpcd_cpts=[0-3]
+ *   run ptlrpcd threads on CPTs 0, 1, 2, and 3.
+ *
+ * ptlrpcd_cpts=[0-3,5,7]
+ *   run ptlrpcd threads on CPTS 0, 1, 2, 3, 5, and 7.
+ */
+static char *ptlrpcd_cpts;
+module_param(ptlrpcd_cpts, charp, 0644);
+MODULE_PARM_DESC(ptlrpcd_cpts,
+		 "CPU partitions ptlrpcd threads should run in");
+
+/* ptlrpcds_cpt_idx maps cpt numbers to an index in the ptlrpcds array. */
+static int		*ptlrpcds_cpt_idx;
+
+/* ptlrpcds_num is the number of entries in the ptlrpcds array. */
+static int		ptlrpcds_num;
+static struct ptlrpcd	**ptlrpcds;
+
+/*
+ * In addition to the regular thread pool above, there is a single
+ * global recovery thread. Recovery isn't critical for performance,
+ * and doesn't block, but must always be able to proceed, and it is
+ * possible that all normal ptlrpcd threads are blocked. Hence the
+ * need for a dedicated thread.
+ */
+static struct ptlrpcd_ctl ptlrpcd_rcv;
 
 struct mutex ptlrpcd_mutex;
 static int ptlrpcd_users;
@@ -98,45 +170,29 @@ void ptlrpcd_wake(struct ptlrpc_request *req)
 EXPORT_SYMBOL(ptlrpcd_wake);
 
 static struct ptlrpcd_ctl *
-ptlrpcd_select_pc(struct ptlrpc_request *req, pdl_policy_t policy, int index)
+ptlrpcd_select_pc(struct ptlrpc_request *req)
 {
-	int idx = 0;
+	struct ptlrpcd	*pd;
+	int		cpt;
+	int		idx;
 
 	if (req != NULL && req->rq_send_state != LUSTRE_IMP_FULL)
-		return &ptlrpcds->pd_thread_rcv;
-
-	switch (policy) {
-	case PDL_POLICY_SAME:
-		idx = smp_processor_id() % ptlrpcds->pd_nthreads;
-		break;
-	case PDL_POLICY_LOCAL:
-		/* Before CPU partition patches available, process it the same
-		 * as "PDL_POLICY_ROUND". */
-# ifdef CFS_CPU_MODE_NUMA
-# warning "fix this code to use new CPU partition APIs"
-# endif
-		/* Fall through to PDL_POLICY_ROUND until the CPU
-		 * CPU partition patches are available. */
-		index = -1;
-	case PDL_POLICY_PREFERRED:
-		if (index >= 0 && index < num_online_cpus()) {
-			idx = index % ptlrpcds->pd_nthreads;
-			break;
-		}
-		/* Fall through to PDL_POLICY_ROUND for bad index. */
-	default:
-		/* Fall through to PDL_POLICY_ROUND for unknown policy. */
-	case PDL_POLICY_ROUND:
+		return &ptlrpcd_rcv;
+
+	cpt = cfs_cpt_current(cfs_cpt_table, 1);
+	if (!ptlrpcds_cpt_idx)
+		idx = cpt;
+	else
+		idx = ptlrpcds_cpt_idx[cpt];
+	pd = ptlrpcds[idx];
+
 		/* We do not care whether it is strict load balance. */
-		idx = ptlrpcds->pd_index + 1;
-		if (idx == smp_processor_id())
-			idx++;
-		idx %= ptlrpcds->pd_nthreads;
-		ptlrpcds->pd_index = idx;
-		break;
-	}
+	idx = pd->pd_cursor;
+	if (++idx == pd->pd_nthreads)
+		idx = 0;
+	pd->pd_cursor = idx;
 
-	return &ptlrpcds->pd_threads[idx];
+	return &pd->pd_threads[idx];
 }
 
 /**
@@ -150,7 +206,7 @@ void ptlrpcd_add_rqset(struct ptlrpc_request_set *set)
 	struct ptlrpc_request_set *new;
 	int count, i;
 
-	pc = ptlrpcd_select_pc(NULL, PDL_POLICY_LOCAL, -1);
+	pc = ptlrpcd_select_pc(NULL);
 	new = pc->pc_set;
 
 	list_for_each_safe(pos, tmp, &set->set_requests) {
@@ -212,7 +268,7 @@ static int ptlrpcd_steal_rqset(struct ptlrpc_request_set *des,
  * Requests that are added to the ptlrpcd queue are sent via
  * ptlrpcd_check->ptlrpc_check_set().
  */
-void ptlrpcd_add_req(struct ptlrpc_request *req, pdl_policy_t policy, int idx)
+void ptlrpcd_add_req(struct ptlrpc_request *req)
 {
 	struct ptlrpcd_ctl *pc;
 
@@ -242,7 +298,7 @@ void ptlrpcd_add_req(struct ptlrpc_request *req, pdl_policy_t policy, int idx)
 		spin_unlock(&req->rq_lock);
 	}
 
-	pc = ptlrpcd_select_pc(req, policy, idx);
+	pc = ptlrpcd_select_pc(req);
 
 	DEBUG_REQ(D_INFO, req, "add req [%p] to pc [%s:%d]",
 		  req, pc->pc_name, pc->pc_index);
@@ -372,25 +428,29 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc)
 static int ptlrpcd(void *arg)
 {
 	struct ptlrpcd_ctl *pc = arg;
-	struct ptlrpc_request_set *set = pc->pc_set;
+	struct ptlrpc_request_set *set;
 	struct lu_env env = { .le_ses = NULL };
-	int rc, exit = 0;
+	int rc = 0;
+	int exit = 0;
 
 	unshare_fs_struct();
-#if defined(CONFIG_SMP)
-	if (test_bit(LIOD_BIND, &pc->pc_flags)) {
-		int index = pc->pc_index;
-
-		if (index >= 0 && index < num_possible_cpus()) {
-			while (!cpu_online(index)) {
-				if (++index >= num_possible_cpus())
-					index = 0;
-			}
-			set_cpus_allowed_ptr(current,
-					cpumask_of_node(cpu_to_node(index)));
-		}
+	if (cfs_cpt_bind(cfs_cpt_table, pc->pc_cpt) != 0)
+		CWARN("Failed to bind %s on CPT %d\n", pc->pc_name, pc->pc_cpt);
+
+	/*
+	 * Allocate the request set after the thread has been bound
+	 * above. This is safe because no requests will be queued
+	 * until all ptlrpcd threads have confirmed that they have
+	 * successfully started.
+	 */
+	set = ptlrpc_prep_set();
+	if (!set) {
+		rc = -ENOMEM;
+		goto failed;
 	}
-#endif
+	spin_lock(&pc->pc_lock);
+	pc->pc_set = set;
+	spin_unlock(&pc->pc_lock);
 	/*
 	 * XXX So far only "client" ptlrpcd uses an environment. In
 	 * the future, ptlrpcd thread (or a thread-set) has to given
@@ -398,10 +458,10 @@ static int ptlrpcd(void *arg)
 	 */
 	rc = lu_context_init(&env.le_ctx,
 			     LCT_CL_THREAD|LCT_REMEMBER|LCT_NOREF);
-	complete(&pc->pc_starting);
-
 	if (rc != 0)
-		return rc;
+		goto failed;
+
+	complete(&pc->pc_starting);
 
 	/*
 	 * This mainloop strongly resembles ptlrpc_set_wait() except that our
@@ -447,174 +507,97 @@ static int ptlrpcd(void *arg)
 	complete(&pc->pc_finishing);
 
 	return 0;
+failed:
+	pc->pc_error = rc;
+	complete(&pc->pc_starting);
+	return rc;
 }
 
-/* XXX: We want multiple CPU cores to share the async RPC load. So we start many
- *      ptlrpcd threads. We also want to reduce the ptlrpcd overhead caused by
- *      data transfer cross-CPU cores. So we bind ptlrpcd thread to specified
- *      CPU core. But binding all ptlrpcd threads maybe cause response delay
- *      because of some CPU core(s) busy with other loads.
- *
- *      For example: "ls -l", some async RPCs for statahead are assigned to
- *      ptlrpcd_0, and ptlrpcd_0 is bound to CPU_0, but CPU_0 may be quite busy
- *      with other non-ptlrpcd, like "ls -l" itself (we want to the "ls -l"
- *      thread, statahead thread, and ptlrpcd thread can run in parallel), under
- *      such case, the statahead async RPCs can not be processed in time, it is
- *      unexpected. If ptlrpcd_0 can be re-scheduled on other CPU core, it may
- *      be better. But it breaks former data transfer policy.
- *
- *      So we shouldn't be blind for avoiding the data transfer. We make some
- *      compromise: divide the ptlrpcd threads pool into two parts. One part is
- *      for bound mode, each ptlrpcd thread in this part is bound to some CPU
- *      core. The other part is for free mode, all the ptlrpcd threads in the
- *      part can be scheduled on any CPU core. We specify some partnership
- *      between bound mode ptlrpcd thread(s) and free mode ptlrpcd thread(s),
- *      and the async RPC load within the partners are shared.
+static void ptlrpcd_ctl_init(struct ptlrpcd_ctl *pc, int index, int cpt)
+{
+	pc->pc_index = index;
+	pc->pc_cpt = cpt;
+	init_completion(&pc->pc_starting);
+	init_completion(&pc->pc_finishing);
+	spin_lock_init(&pc->pc_lock);
+
+	if (index < 0) {
+		/* Recovery thread. */
+		snprintf(pc->pc_name, sizeof(pc->pc_name), "ptlrpcd_rcv");
+	} else {
+		/* Regular thread. */
+		snprintf(pc->pc_name, sizeof(pc->pc_name),
+			 "ptlrpcd_%02d_%02d", cpt, index);
+	}
+}
+
+/* XXX: We want multiple CPU cores to share the async RPC load. So we
+ *	start many ptlrpcd threads. We also want to reduce the ptlrpcd
+ *	overhead caused by data transfer cross-CPU cores. So we bind
+ *	all ptlrpcd threads to a CPT, in the expectation that CPTs
+ *	will be defined in a way that matches these boundaries. Within
+ *	a CPT a ptlrpcd thread can be scheduled on any available core.
  *
- *      It can partly avoid data transfer cross-CPU (if the bound mode ptlrpcd
- *      thread can be scheduled in time), and try to guarantee the async RPC
- *      processed ASAP (as long as the free mode ptlrpcd thread can be scheduled
- *      on any CPU core).
+ *	Each ptlrpcd thread has its own request queue. This can cause
+ *	response delay if the thread is already busy. To help with
+ *	this we define partner threads: these are other threads bound
+ *	to the same CPT which will check for work in each other's
+ *	request queues if they have no work to do.
  *
- *      As for how to specify the partnership between bound mode ptlrpcd
- *      thread(s) and free mode ptlrpcd thread(s), the simplest way is to use
- *      <free bound> pair. In future, we can specify some more complex
- *      partnership based on the patches for CPU partition. But before such
- *      patches are available, we prefer to use the simplest one.
+ *	The desired number of partner threads can be tuned by setting
+ *	ptlrpcd_partner_group_size. The default is to create pairs of
+ *	partner threads.
  */
-# ifdef CFS_CPU_MODE_NUMA
-# warning "fix ptlrpcd_bind() to use new CPU partition APIs"
-# endif
-static int ptlrpcd_bind(int index, int max)
+static int ptlrpcd_partners(struct ptlrpcd *pd, int index)
 {
 	struct ptlrpcd_ctl *pc;
+	struct ptlrpcd_ctl **ppc;
+	int first;
+	int i;
 	int rc = 0;
-#if defined(CONFIG_NUMA)
-	cpumask_t mask;
-#endif
+	int size;
+
+	LASSERT(index >= 0 && index < pd->pd_nthreads);
+	pc = &pd->pd_threads[index];
+	pc->pc_npartners = pd->pd_groupsize - 1;
+
+	if (pc->pc_npartners <= 0)
+		goto out;
 
-	LASSERT(index <= max - 1);
-	pc = &ptlrpcds->pd_threads[index];
-	switch (ptlrpcd_bind_policy) {
-	case PDB_POLICY_NONE:
-		pc->pc_npartners = -1;
-		break;
-	case PDB_POLICY_FULL:
+	size = sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners;
+	pc->pc_partners = kzalloc_node(size, GFP_NOFS,
+				       cfs_cpt_spread_node(cfs_cpt_table,
+							   pc->pc_cpt));
+	if (!pc->pc_partners) {
 		pc->pc_npartners = 0;
-		set_bit(LIOD_BIND, &pc->pc_flags);
-		break;
-	case PDB_POLICY_PAIR:
-		LASSERT(max % 2 == 0);
-		pc->pc_npartners = 1;
-		break;
-	case PDB_POLICY_NEIGHBOR:
-#if defined(CONFIG_NUMA)
-	{
-		int i;
-		cpumask_copy(&mask, cpumask_of_node(cpu_to_node(index)));
-		for (i = max; i < num_online_cpus(); i++)
-			cpumask_clear_cpu(i, &mask);
-		pc->pc_npartners = cpumask_weight(&mask) - 1;
-		set_bit(LIOD_BIND, &pc->pc_flags);
-	}
-#else
-		LASSERT(max >= 3);
-		pc->pc_npartners = 2;
-#endif
-		break;
-	default:
-		CERROR("unknown ptlrpcd bind policy %d\n", ptlrpcd_bind_policy);
-		rc = -EINVAL;
+		rc = -ENOMEM;
+		goto out;
 	}
 
-	if (rc == 0 && pc->pc_npartners > 0) {
-		pc->pc_partners = kcalloc(pc->pc_npartners,
-					  sizeof(struct ptlrpcd_ctl *),
-					  GFP_NOFS);
-		if (pc->pc_partners == NULL) {
-			pc->pc_npartners = 0;
-			rc = -ENOMEM;
-		} else {
-			switch (ptlrpcd_bind_policy) {
-			case PDB_POLICY_PAIR:
-				if (index & 0x1) {
-					set_bit(LIOD_BIND, &pc->pc_flags);
-					pc->pc_partners[0] = &ptlrpcds->
-						pd_threads[index - 1];
-					ptlrpcds->pd_threads[index - 1].
-						pc_partners[0] = pc;
-				}
-				break;
-			case PDB_POLICY_NEIGHBOR:
-#if defined(CONFIG_NUMA)
-			{
-				struct ptlrpcd_ctl *ppc;
-				int i, pidx;
-				/* partners are cores in the same NUMA node.
-				 * setup partnership only with ptlrpcd threads
-				 * that are already initialized
-				 */
-				for (pidx = 0, i = 0; i < index; i++) {
-					if (cpumask_test_cpu(i, &mask)) {
-						ppc = &ptlrpcds->pd_threads[i];
-						pc->pc_partners[pidx++] = ppc;
-						ppc->pc_partners[ppc->
-							  pc_npartners++] = pc;
-					}
-				}
-				/* adjust number of partners to the number
-				 * of partnership really setup */
-				pc->pc_npartners = pidx;
-			}
-#else
-				if (index & 0x1)
-					set_bit(LIOD_BIND, &pc->pc_flags);
-				if (index > 0) {
-					pc->pc_partners[0] = &ptlrpcds->
-						pd_threads[index - 1];
-					ptlrpcds->pd_threads[index - 1].
-						pc_partners[1] = pc;
-					if (index == max - 1) {
-						pc->pc_partners[1] =
-						&ptlrpcds->pd_threads[0];
-						ptlrpcds->pd_threads[0].
-						pc_partners[0] = pc;
-					}
-				}
-#endif
-				break;
-			}
-		}
+	first = index - index % pd->pd_groupsize;
+	ppc = pc->pc_partners;
+	for (i = first; i < first + pd->pd_groupsize; i++) {
+		if (i != index)
+			*ppc++ = &pd->pd_threads[i];
 	}
-
+out:
 	return rc;
 }
 
-
-int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc)
+int ptlrpcd_start(struct ptlrpcd_ctl *pc)
 {
-	int rc;
+	struct task_struct *task;
+	int rc = 0;
 
 	/*
 	 * Do not allow start second thread for one pc.
 	 */
 	if (test_and_set_bit(LIOD_START, &pc->pc_flags)) {
 		CWARN("Starting second thread (%s) for same pc %p\n",
-		      name, pc);
+		      pc->pc_name, pc);
 		return 0;
 	}
 
-	pc->pc_index = index;
-	init_completion(&pc->pc_starting);
-	init_completion(&pc->pc_finishing);
-	spin_lock_init(&pc->pc_lock);
-	strlcpy(pc->pc_name, name, sizeof(pc->pc_name));
-	pc->pc_set = ptlrpc_prep_set();
-	if (pc->pc_set == NULL) {
-		rc = -ENOMEM;
-		goto out;
-	}
-
 	/*
 	 * So far only "client" ptlrpcd uses an environment. In the future,
 	 * ptlrpcd thread (or a thread-set) has to be given an argument,
@@ -622,29 +605,21 @@ int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc)
 	 */
 	rc = lu_context_init(&pc->pc_env.le_ctx, LCT_CL_THREAD|LCT_REMEMBER);
 	if (rc != 0)
-		goto out_set;
+		goto out;
 
-	{
-		struct task_struct *task;
-		if (index >= 0) {
-			rc = ptlrpcd_bind(index, max);
-			if (rc < 0)
-				goto out_env;
-		}
+	task = kthread_run(ptlrpcd, pc, "%s", pc->pc_name);
+	if (IS_ERR(task)) {
+		rc = PTR_ERR(task);
+		goto out_set;
+	}
 
-		task = kthread_run(ptlrpcd, pc, "%s", pc->pc_name);
-		if (IS_ERR(task)) {
-			rc = PTR_ERR(task);
-			goto out_env;
-		}
+	wait_for_completion(&pc->pc_starting);
+	rc = pc->pc_error;
+	if (rc != 0)
+		goto out_set;
 
-		wait_for_completion(&pc->pc_starting);
-	}
 	return 0;
 
-out_env:
-	lu_context_fini(&pc->pc_env.le_ctx);
-
 out_set:
 	if (pc->pc_set != NULL) {
 		struct ptlrpc_request_set *set = pc->pc_set;
@@ -654,7 +629,7 @@ out_set:
 		spin_unlock(&pc->pc_lock);
 		ptlrpc_set_destroy(set);
 	}
-	clear_bit(LIOD_BIND, &pc->pc_flags);
+	lu_context_fini(&pc->pc_env.le_ctx);
 
 out:
 	clear_bit(LIOD_START, &pc->pc_flags);
@@ -694,7 +669,6 @@ void ptlrpcd_free(struct ptlrpcd_ctl *pc)
 	clear_bit(LIOD_START, &pc->pc_flags);
 	clear_bit(LIOD_STOP, &pc->pc_flags);
 	clear_bit(LIOD_FORCE, &pc->pc_flags);
-	clear_bit(LIOD_BIND, &pc->pc_flags);
 
 out:
 	if (pc->pc_npartners > 0) {
@@ -704,88 +678,262 @@ out:
 		pc->pc_partners = NULL;
 	}
 	pc->pc_npartners = 0;
+	pc->pc_error = 0;
 }
 
 static void ptlrpcd_fini(void)
 {
 	int i;
+	int j;
 
 	if (ptlrpcds != NULL) {
-		for (i = 0; i < ptlrpcds->pd_nthreads; i++)
-			ptlrpcd_stop(&ptlrpcds->pd_threads[i], 0);
-		for (i = 0; i < ptlrpcds->pd_nthreads; i++)
-			ptlrpcd_free(&ptlrpcds->pd_threads[i]);
-		ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0);
-		ptlrpcd_free(&ptlrpcds->pd_thread_rcv);
+		for (i = 0; i < ptlrpcds_num; i++) {
+			if (!ptlrpcds[i])
+				break;
+			for (j = 0; j < ptlrpcds[i]->pd_nthreads; j++)
+				ptlrpcd_stop(&ptlrpcds[i]->pd_threads[j], 0);
+			for (j = 0; j < ptlrpcds[i]->pd_nthreads; j++)
+				ptlrpcd_free(&ptlrpcds[i]->pd_threads[j]);
+			kfree(ptlrpcds[i]);
+			ptlrpcds[i] = NULL;
+		}
 		kfree(ptlrpcds);
-		ptlrpcds = NULL;
 	}
+	ptlrpcds_num = 0;
+
+	ptlrpcd_stop(&ptlrpcd_rcv, 0);
+	ptlrpcd_free(&ptlrpcd_rcv);
+
+	kfree(ptlrpcds_cpt_idx);
+	ptlrpcds_cpt_idx = NULL;
 }
 
 static int ptlrpcd_init(void)
 {
-	int nthreads = num_online_cpus();
-	char name[16];
-	int size, i = -1, j, rc = 0;
-
-	if (max_ptlrpcds > 0 && max_ptlrpcds < nthreads)
-		nthreads = max_ptlrpcds;
-	if (nthreads < 2)
-		nthreads = 2;
-	if (nthreads < 3 && ptlrpcd_bind_policy == PDB_POLICY_NEIGHBOR)
-		ptlrpcd_bind_policy = PDB_POLICY_PAIR;
-	else if (nthreads % 2 != 0 && ptlrpcd_bind_policy == PDB_POLICY_PAIR)
-		nthreads &= ~1; /* make sure it is even */
-
-	size = offsetof(struct ptlrpcd, pd_threads[nthreads]);
-	ptlrpcds = kzalloc(size, GFP_NOFS);
+	int nthreads;
+	int groupsize;
+	int size;
+	int i;
+	int j;
+	int rc = 0;
+	struct cfs_cpt_table *cptable;
+	__u32 *cpts = NULL;
+	int ncpts;
+	int cpt;
+	struct ptlrpcd *pd;
+
+	/*
+	 * Determine the CPTs that ptlrpcd threads will run on.
+	 */
+	cptable = cfs_cpt_table;
+	ncpts = cfs_cpt_number(cptable);
+	if (ptlrpcd_cpts) {
+		struct cfs_expr_list *el;
+
+		size = ncpts * sizeof(ptlrpcds_cpt_idx[0]);
+		ptlrpcds_cpt_idx = kzalloc(size, GFP_KERNEL);
+		if (!ptlrpcds_cpt_idx) {
+			rc = -ENOMEM;
+			goto out;
+		}
+
+		rc = cfs_expr_list_parse(ptlrpcd_cpts,
+					 strlen(ptlrpcd_cpts),
+					 0, ncpts - 1, &el);
+
+		if (rc != 0) {
+			CERROR("ptlrpcd_cpts: invalid CPT pattern string: %s",
+			       ptlrpcd_cpts);
+			rc = -EINVAL;
+			goto out;
+		}
+
+		rc = cfs_expr_list_values(el, ncpts, &cpts);
+		cfs_expr_list_free(el);
+		if (rc <= 0) {
+			CERROR("ptlrpcd_cpts: failed to parse CPT array %s: %d\n",
+			       ptlrpcd_cpts, rc);
+			if (rc == 0)
+				rc = -EINVAL;
+			goto out;
+		}
+
+		/*
+		 * Create the cpt-to-index map. When there is no match
+		 * in the cpt table, pick a cpt at random. This could
+		 * be changed to take the topology of the system into
+		 * account.
+		 */
+		for (cpt = 0; cpt < ncpts; cpt++) {
+			for (i = 0; i < rc; i++)
+				if (cpts[i] == cpt)
+					break;
+			if (i >= rc)
+				i = cpt % rc;
+			ptlrpcds_cpt_idx[cpt] = i;
+		}
+
+		cfs_expr_list_values_free(cpts, rc);
+		ncpts = rc;
+	}
+	ptlrpcds_num = ncpts;
+
+	size = ncpts * sizeof(ptlrpcds[0]);
+	ptlrpcds = kzalloc(size, GFP_KERNEL);
 	if (!ptlrpcds) {
 		rc = -ENOMEM;
 		goto out;
 	}
 
-	snprintf(name, sizeof(name), "ptlrpcd_rcv");
-	set_bit(LIOD_RECOVERY, &ptlrpcds->pd_thread_rcv.pc_flags);
-	rc = ptlrpcd_start(-1, nthreads, name, &ptlrpcds->pd_thread_rcv);
+	/*
+	 * The max_ptlrpcds parameter is obsolete, but do something
+	 * sane if it has been tuned, and complain if
+	 * ptlrpcd_per_cpt_max has also been tuned.
+	 */
+	if (max_ptlrpcds != 0) {
+		CWARN("max_ptlrpcds is obsolete.\n");
+		if (ptlrpcd_per_cpt_max == 0) {
+			ptlrpcd_per_cpt_max = max_ptlrpcds / ncpts;
+			/* Round up if there is a remainder. */
+			if (max_ptlrpcds % ncpts != 0)
+				ptlrpcd_per_cpt_max++;
+			CWARN("Setting ptlrpcd_per_cpt_max = %d\n",
+			      ptlrpcd_per_cpt_max);
+		} else {
+			CWARN("ptlrpd_per_cpt_max is also set!\n");
+		}
+	}
+
+	/*
+	 * The ptlrpcd_bind_policy parameter is obsolete, but do
+	 * something sane if it has been tuned, and complain if
+	 * ptlrpcd_partner_group_size is also tuned.
+	 */
+	if (ptlrpcd_bind_policy != 0) {
+		CWARN("ptlrpcd_bind_policy is obsolete.\n");
+		if (ptlrpcd_partner_group_size == 0) {
+			switch (ptlrpcd_bind_policy) {
+			case 1: /* PDB_POLICY_NONE */
+			case 2: /* PDB_POLICY_FULL */
+				ptlrpcd_partner_group_size = 1;
+				break;
+			case 3: /* PDB_POLICY_PAIR */
+				ptlrpcd_partner_group_size = 2;
+				break;
+			case 4: /* PDB_POLICY_NEIGHBOR */
+#ifdef CONFIG_NUMA
+				ptlrpcd_partner_group_size = -1; /* CPT */
+#else
+				ptlrpcd_partner_group_size = 3; /* Triplets */
+#endif
+				break;
+			default: /* Illegal value, use the default. */
+				ptlrpcd_partner_group_size = 2;
+				break;
+			}
+			CWARN("Setting ptlrpcd_partner_group_size = %d\n",
+			      ptlrpcd_partner_group_size);
+		} else {
+			CWARN("ptlrpcd_partner_group_size is also set!\n");
+		}
+	}
+
+	if (ptlrpcd_partner_group_size == 0)
+		ptlrpcd_partner_group_size = 2;
+	else if (ptlrpcd_partner_group_size < 0)
+		ptlrpcd_partner_group_size = -1;
+	else if (ptlrpcd_per_cpt_max > 0 &&
+		 ptlrpcd_partner_group_size > ptlrpcd_per_cpt_max)
+		ptlrpcd_partner_group_size = ptlrpcd_per_cpt_max;
+
+	/*
+	 * Start the recovery thread first.
+	 */
+	set_bit(LIOD_RECOVERY, &ptlrpcd_rcv.pc_flags);
+	ptlrpcd_ctl_init(&ptlrpcd_rcv, -1, CFS_CPT_ANY);
+	rc = ptlrpcd_start(&ptlrpcd_rcv);
 	if (rc < 0)
 		goto out;
 
-	/* XXX: We start nthreads ptlrpc daemons. Each of them can process any
-	 *      non-recovery async RPC to improve overall async RPC efficiency.
-	 *
-	 *      But there are some issues with async I/O RPCs and async non-I/O
-	 *      RPCs processed in the same set under some cases. The ptlrpcd may
-	 *      be blocked by some async I/O RPC(s), then will cause other async
-	 *      non-I/O RPC(s) can not be processed in time.
-	 *
-	 *      Maybe we should distinguish blocked async RPCs from non-blocked
-	 *      async RPCs, and process them in different ptlrpcd sets to avoid
-	 *      unnecessary dependency. But how to distribute async RPCs load
-	 *      among all the ptlrpc daemons becomes another trouble. */
-	for (i = 0; i < nthreads; i++) {
-		snprintf(name, sizeof(name), "ptlrpcd_%d", i);
-		rc = ptlrpcd_start(i, nthreads, name, &ptlrpcds->pd_threads[i]);
-		if (rc < 0)
+	for (i = 0; i < ncpts; i++) {
+		if (!cpts)
+			cpt = i;
+		else
+			cpt = cpts[i];
+
+		nthreads = cfs_cpt_weight(cptable, cpt);
+		if (ptlrpcd_per_cpt_max > 0 && ptlrpcd_per_cpt_max < nthreads)
+			nthreads = ptlrpcd_per_cpt_max;
+		if (nthreads < 2)
+			nthreads = 2;
+
+		if (ptlrpcd_partner_group_size <= 0) {
+			groupsize = nthreads;
+		} else if (nthreads <= ptlrpcd_partner_group_size) {
+			groupsize = nthreads;
+		} else {
+			groupsize = ptlrpcd_partner_group_size;
+			if (nthreads % groupsize != 0)
+				nthreads += groupsize - (nthreads % groupsize);
+		}
+
+		size = offsetof(struct ptlrpcd, pd_threads[nthreads]);
+		pd = kzalloc_node(size, GFP_NOFS,
+				  cfs_cpt_spread_node(cfs_cpt_table, cpt));
+		if (!pd) {
+			rc = -ENOMEM;
 			goto out;
-	}
+		}
+		pd->pd_size = size;
+		pd->pd_index = i;
+		pd->pd_cpt = cpt;
+		pd->pd_cursor = 0;
+		pd->pd_nthreads = nthreads;
+		pd->pd_groupsize = groupsize;
+		ptlrpcds[i] = pd;
 
-	ptlrpcds->pd_size = size;
-	ptlrpcds->pd_index = 0;
-	ptlrpcds->pd_nthreads = nthreads;
+		/*
+		 * The ptlrpcd threads in a partner group can access
+		 * each other's struct ptlrpcd_ctl, so these must be
+		 * initialized before any thread is started.
+		 */
+		for (j = 0; j < nthreads; j++) {
+			ptlrpcd_ctl_init(&pd->pd_threads[j], j, cpt);
+			rc = ptlrpcd_partners(pd, j);
+			if (rc < 0)
+				goto out;
+		}
 
-out:
-	if (rc != 0 && ptlrpcds != NULL) {
-		for (j = 0; j <= i; j++)
-			ptlrpcd_stop(&ptlrpcds->pd_threads[j], 0);
-		for (j = 0; j <= i; j++)
-			ptlrpcd_free(&ptlrpcds->pd_threads[j]);
-		ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0);
-		ptlrpcd_free(&ptlrpcds->pd_thread_rcv);
-		kfree(ptlrpcds);
-		ptlrpcds = NULL;
+		/* XXX: We start nthreads ptlrpc daemons.
+		 *	Each of them can process any non-recovery
+		 *	async RPC to improve overall async RPC
+		 *	efficiency.
+		 *
+		 *	But there are some issues with async I/O RPCs
+		 *	and async non-I/O RPCs processed in the same
+		 *	set under some cases. The ptlrpcd may be
+		 *	blocked by some async I/O RPC(s), then will
+		 *	cause other async non-I/O RPC(s) can not be
+		 *	processed in time.
+		 *
+		 *	Maybe we should distinguish blocked async RPCs
+		 *	from non-blocked async RPCs, and process them
+		 *	in different ptlrpcd sets to avoid unnecessary
+		 *	dependency. But how to distribute async RPCs
+		 *	load among all the ptlrpc daemons becomes
+		 *	another trouble.
+		 */
+		for (j = 0; j < nthreads; j++) {
+			rc = ptlrpcd_start(&pd->pd_threads[j]);
+			if (rc < 0)
+				goto out;
+		}
 	}
+out:
+	if (rc != 0)
+		ptlrpcd_fini();
 
-	return 0;
+	return rc;
 }
 
 int ptlrpcd_addref(void)
-- 
2.1.0

_______________________________________________
devel mailing list
devel@xxxxxxxxxxxxxxxxxxxxxx
http://driverdev.linuxdriverproject.org/mailman/listinfo/driverdev-devel





[Index of Archives]     [Linux Driver Backports]     [DMA Engine]     [Linux GPIO]     [Linux SPI]     [Video for Linux]     [Linux USB Devel]     [Linux Coverity]     [Linux Audio Users]     [Linux Kernel]     [Linux SCSI]     [Yosemite Backpacking]
  Powered by Linux