[PATCH sched_ext/for-6.11 2/2] sched_ext: Implement scx_bpf_consume_task()

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Implement scx_bpf_consume_task() which allows consuming arbitrary tasks on
the DSQ in any order while iterating in the dispatch path.

scx_qmap is updated to implement periodic dumping of the shared DSQ and a
rather silly prioritization mechanism to demonstrate the use of DSQ
iteration and selective consumption.

Note that it does a bit of nastry dance to pass in the pointer to the
iterator to __scx_bpf_consume_task(). This is to work around the current
limitation in the BPF verifier where it doesn't allow the memory area used
for an iterator to be passed into kfuncs. This may be too nasty and might
require a different approach.

Signed-off-by: Tejun Heo <tj@xxxxxxxxxx>
Reviewed-by: David Vernet <dvernet@xxxxxxxx>
Cc: Alexei Starovoitov <ast@xxxxxxxxxx>
Cc: bpf@xxxxxxxxxxxxxxx
---
Hello, again.

(continuing from the previous patch) so, the problem is that I need to
distinguish the tasks which have left a queue and then get requeued while an
iteration is in progress. The iterator itself already does this - it
remembers a sequence number when iteration starts and ignores tasks which
are queued afterwards.

As a task can get removed and requeued anytime, I need
scx_bpf_consume_task() to do the same testing, so I want to pass in the
iterator pointer into scx_bpf_consume_task() so that it can read the
sequence number stored in the iterator. However, BPF doesn't allow this, so
I'm doing the weird self pointer probe read thing, to obtain it, which is
quite nasty.

What do you think?

Thanks.

 kernel/sched/ext.c                       |   89 +++++++++++++++++++++++++++++--
 tools/sched_ext/include/scx/common.bpf.h |   16 +++++
 tools/sched_ext/scx_qmap.bpf.c           |   34 ++++++++++-
 tools/sched_ext/scx_qmap.c               |   14 +++-
 4 files changed, 142 insertions(+), 11 deletions(-)

--- a/kernel/sched/ext.c
+++ b/kernel/sched/ext.c
@@ -1122,6 +1122,12 @@ enum scx_dsq_iter_flags {
 };
 
 struct bpf_iter_scx_dsq_kern {
+	/*
+	 * Must be the first field. Used to work around BPF restriction and pass
+	 * in the iterator pointer to scx_bpf_consume_task().
+	 */
+	struct bpf_iter_scx_dsq_kern	*self;
+
 	struct scx_dsq_node		cursor;
 	struct scx_dispatch_q		*dsq;
 	u32				dsq_seq;
@@ -1518,7 +1524,7 @@ static void dispatch_enqueue(struct scx_
 	p->scx.dsq_seq = dsq->seq;
 
 	dsq_mod_nr(dsq, 1);
-	p->scx.dsq = dsq;
+	WRITE_ONCE(p->scx.dsq, dsq);
 
 	/*
 	 * scx.ddsp_dsq_id and scx.ddsp_enq_flags are only relevant on the
@@ -1611,7 +1617,7 @@ static void dispatch_dequeue(struct rq *
 		WARN_ON_ONCE(task_linked_on_dsq(p));
 		p->scx.holding_cpu = -1;
 	}
-	p->scx.dsq = NULL;
+	WRITE_ONCE(p->scx.dsq, NULL);
 
 	if (!is_local)
 		raw_spin_unlock(&dsq->lock);
@@ -2107,7 +2113,7 @@ static void consume_local_task(struct rq
 	list_add_tail(&p->scx.dsq_node.list, &rq->scx.local_dsq.list);
 	dsq_mod_nr(dsq, -1);
 	dsq_mod_nr(&rq->scx.local_dsq, 1);
-	p->scx.dsq = &rq->scx.local_dsq;
+	WRITE_ONCE(p->scx.dsq, &rq->scx.local_dsq);
 	raw_spin_unlock(&dsq->lock);
 }
 
@@ -5585,12 +5591,88 @@ __bpf_kfunc bool scx_bpf_consume(u64 dsq
 	}
 }
 
+/**
+ * __scx_bpf_consume_task - Transfer a task from DSQ iteration to the local DSQ
+ * @it: DSQ iterator in progress
+ * @p: task to consume
+ *
+ * Transfer @p which is on the DSQ currently iterated by @it to the current
+ * CPU's local DSQ. For the transfer to be successful, @p must still be on the
+ * DSQ and have been queued before the DSQ iteration started. This function
+ * doesn't care whether @p was obtained from the DSQ iteration. @p just has to
+ * be on the DSQ and have been queued before the iteration started.
+ *
+ * Returns %true if @p has been consumed, %false if @p had already been consumed
+ * or dequeued.
+ */
+__bpf_kfunc bool __scx_bpf_consume_task(unsigned long it, struct task_struct *p)
+{
+	struct bpf_iter_scx_dsq_kern *kit = (void *)it;
+	struct scx_dispatch_q *dsq, *kit_dsq;
+	struct scx_dsp_ctx *dspc = this_cpu_ptr(scx_dsp_ctx);
+	struct rq *task_rq;
+	u64 kit_dsq_seq;
+
+	/* can't trust @kit, carefully fetch the values we need */
+	if (get_kernel_nofault(kit_dsq, &kit->dsq) ||
+	    get_kernel_nofault(kit_dsq_seq, &kit->dsq_seq)) {
+		scx_ops_error("invalid @it 0x%lx", it);
+		return false;
+	}
+
+	/*
+	 * @kit can't be trusted and we can only get the DSQ from @p. As we
+	 * don't know @p's rq is locked, use READ_ONCE() to access the field.
+	 * Derefing is safe as DSQs are RCU protected.
+	 */
+	dsq = READ_ONCE(p->scx.dsq);
+
+	if (unlikely(!dsq || dsq != kit_dsq))
+		return false;
+
+	if (unlikely(dsq->id == SCX_DSQ_LOCAL)) {
+		scx_ops_error("local DSQ not allowed");
+		return false;
+	}
+
+	if (!scx_kf_allowed(SCX_KF_DISPATCH))
+		return false;
+
+	flush_dispatch_buf(dspc->rq, dspc->rf);
+
+	raw_spin_lock(&dsq->lock);
+
+	/*
+	 * Did someone else get to it? @p could have already left $dsq, got
+	 * re-enqueud, or be in the process of being consumed by someone else.
+	 */
+	if (unlikely(p->scx.dsq != dsq ||
+		     time_after64(p->scx.dsq_seq, kit_dsq_seq) ||
+		     p->scx.holding_cpu >= 0))
+		goto out_unlock;
+
+	task_rq = task_rq(p);
+
+	if (dspc->rq == task_rq) {
+		consume_local_task(dspc->rq, dsq, p);
+		return true;
+	}
+
+	if (task_can_run_on_remote_rq(p, dspc->rq))
+		return consume_remote_task(dspc->rq, dspc->rf, dsq, p, task_rq);
+
+out_unlock:
+	raw_spin_unlock(&dsq->lock);
+	return false;
+}
+
 __bpf_kfunc_end_defs();
 
 BTF_KFUNCS_START(scx_kfunc_ids_dispatch)
 BTF_ID_FLAGS(func, scx_bpf_dispatch_nr_slots)
 BTF_ID_FLAGS(func, scx_bpf_dispatch_cancel)
 BTF_ID_FLAGS(func, scx_bpf_consume)
+BTF_ID_FLAGS(func, __scx_bpf_consume_task)
 BTF_KFUNCS_END(scx_kfunc_ids_dispatch)
 
 static const struct btf_kfunc_id_set scx_kfunc_set_dispatch = {
@@ -5797,6 +5879,7 @@ __bpf_kfunc int bpf_iter_scx_dsq_new(str
 	INIT_LIST_HEAD(&kit->cursor.list);
 	RB_CLEAR_NODE(&kit->cursor.priq);
 	kit->cursor.flags = SCX_TASK_DSQ_CURSOR;
+	kit->self = kit;
 	kit->dsq_seq = READ_ONCE(kit->dsq->seq);
 	kit->flags = flags;
 
--- a/tools/sched_ext/include/scx/common.bpf.h
+++ b/tools/sched_ext/include/scx/common.bpf.h
@@ -35,6 +35,7 @@ void scx_bpf_dispatch_vtime(struct task_
 u32 scx_bpf_dispatch_nr_slots(void) __ksym;
 void scx_bpf_dispatch_cancel(void) __ksym;
 bool scx_bpf_consume(u64 dsq_id) __ksym;
+bool __scx_bpf_consume_task(unsigned long it, struct task_struct *p) __ksym __weak;
 u32 scx_bpf_reenqueue_local(void) __ksym;
 void scx_bpf_kick_cpu(s32 cpu, u64 flags) __ksym;
 s32 scx_bpf_dsq_nr_queued(u64 dsq_id) __ksym;
@@ -61,6 +62,21 @@ s32 scx_bpf_pick_any_cpu(const cpumask_t
 bool scx_bpf_task_running(const struct task_struct *p) __ksym;
 s32 scx_bpf_task_cpu(const struct task_struct *p) __ksym;
 
+/*
+ * Use the following as @it when calling scx_bpf_consume_task() from whitin
+ * bpf_for_each() loops.
+ */
+#define BPF_FOR_EACH_ITER	(&___it)
+
+/* hopefully temporary wrapper to work around BPF restriction */
+static inline bool scx_bpf_consume_task(struct bpf_iter_scx_dsq *it,
+					struct task_struct *p)
+{
+	unsigned long ptr;
+	bpf_probe_read_kernel(&ptr, sizeof(ptr), it);
+	return __scx_bpf_consume_task(ptr, p);
+}
+
 static inline __attribute__((format(printf, 1, 2)))
 void ___scx_bpf_bstr_format_checker(const char *fmt, ...) {}
 
--- a/tools/sched_ext/scx_qmap.bpf.c
+++ b/tools/sched_ext/scx_qmap.bpf.c
@@ -23,6 +23,7 @@
  * Copyright (c) 2022 David Vernet <dvernet@xxxxxxxx>
  */
 #include <scx/common.bpf.h>
+#include <string.h>
 
 enum consts {
 	ONE_SEC_IN_NS		= 1000000000,
@@ -37,6 +38,7 @@ const volatile u32 stall_kernel_nth;
 const volatile u32 dsp_inf_loop_after;
 const volatile u32 dsp_batch;
 const volatile bool print_shared_dsq;
+const volatile u64 exp_cgid;
 const volatile s32 disallow_tgid;
 const volatile bool suppress_dump;
 
@@ -121,7 +123,7 @@ struct {
 
 /* Statistics */
 u64 nr_enqueued, nr_dispatched, nr_reenqueued, nr_dequeued;
-u64 nr_core_sched_execed;
+u64 nr_core_sched_execed, nr_expedited;
 u32 cpuperf_min, cpuperf_avg, cpuperf_max;
 u32 cpuperf_target_min, cpuperf_target_avg, cpuperf_target_max;
 
@@ -260,6 +262,32 @@ static void update_core_sched_head_seq(s
 		scx_bpf_error("task_ctx lookup failed");
 }
 
+static bool consume_shared_dsq(void)
+{
+	struct task_struct *p;
+	bool consumed;
+
+	if (!exp_cgid)
+		return scx_bpf_consume(SHARED_DSQ);
+
+	/*
+	 * To demonstrate the use of scx_bpf_consume_task(), implement silly
+	 * selective priority boosting mechanism by scanning SHARED_DSQ looking
+	 * for matching comms and consume them first. This makes difference only
+	 * when dsp_batch is larger than 1.
+	 */
+	consumed = false;
+	bpf_for_each(scx_dsq, p, SHARED_DSQ, 0) {
+		if (p->cgroups->dfl_cgrp->kn->id == exp_cgid &&
+		    scx_bpf_consume_task(BPF_FOR_EACH_ITER, p)) {
+			consumed = true;
+			__sync_fetch_and_add(&nr_expedited, 1);
+		}
+	}
+
+	return consumed || scx_bpf_consume(SHARED_DSQ);
+}
+
 void BPF_STRUCT_OPS(qmap_dispatch, s32 cpu, struct task_struct *prev)
 {
 	struct task_struct *p;
@@ -268,7 +296,7 @@ void BPF_STRUCT_OPS(qmap_dispatch, s32 c
 	void *fifo;
 	s32 i, pid;
 
-	if (scx_bpf_consume(SHARED_DSQ))
+	if (consume_shared_dsq())
 		return;
 
 	if (dsp_inf_loop_after && nr_dispatched > dsp_inf_loop_after) {
@@ -319,7 +347,7 @@ void BPF_STRUCT_OPS(qmap_dispatch, s32 c
 			batch--;
 			cpuc->dsp_cnt--;
 			if (!batch || !scx_bpf_dispatch_nr_slots()) {
-				scx_bpf_consume(SHARED_DSQ);
+				consume_shared_dsq();
 				return;
 			}
 			if (!cpuc->dsp_cnt)
--- a/tools/sched_ext/scx_qmap.c
+++ b/tools/sched_ext/scx_qmap.c
@@ -20,7 +20,7 @@ const char help_fmt[] =
 "See the top-level comment in .bpf.c for more details.\n"
 "\n"
 "Usage: %s [-s SLICE_US] [-e COUNT] [-t COUNT] [-T COUNT] [-l COUNT] [-b COUNT]\n"
-"       [-P] [-d PID] [-D LEN] [-p] [-v]\n"
+"       [-P] [-E PREFIX] [-d PID] [-D LEN] [-p] [-v]\n"
 "\n"
 "  -s SLICE_US   Override slice duration\n"
 "  -e COUNT      Trigger scx_bpf_error() after COUNT enqueues\n"
@@ -29,10 +29,11 @@ const char help_fmt[] =
 "  -l COUNT      Trigger dispatch infinite looping after COUNT dispatches\n"
 "  -b COUNT      Dispatch upto COUNT tasks together\n"
 "  -P            Print out DSQ content to trace_pipe every second, use with -b\n"
+"  -E CGID       Expedite consumption of threads in a cgroup, use with -b\n"
 "  -d PID        Disallow a process from switching into SCHED_EXT (-1 for self)\n"
 "  -D LEN        Set scx_exit_info.dump buffer length\n"
 "  -S            Suppress qmap-specific debug dump\n"
-"  -p            Switch only tasks on SCHED_EXT policy instead of all\n"
+"  -p            Switch only tasks on SCHED_EXT policy intead of all\n"
 "  -v            Print libbpf debug messages\n"
 "  -h            Display this help and exit\n";
 
@@ -63,7 +64,7 @@ int main(int argc, char **argv)
 
 	skel = SCX_OPS_OPEN(qmap_ops, scx_qmap);
 
-	while ((opt = getopt(argc, argv, "s:e:t:T:l:b:Pd:D:Spvh")) != -1) {
+	while ((opt = getopt(argc, argv, "s:e:t:T:l:b:PE:d:D:Spvh")) != -1) {
 		switch (opt) {
 		case 's':
 			skel->rodata->slice_ns = strtoull(optarg, NULL, 0) * 1000;
@@ -86,6 +87,9 @@ int main(int argc, char **argv)
 		case 'P':
 			skel->rodata->print_shared_dsq = true;
 			break;
+		case 'E':
+			skel->rodata->exp_cgid = strtoull(optarg, NULL, 0);
+			break;
 		case 'd':
 			skel->rodata->disallow_tgid = strtol(optarg, NULL, 0);
 			if (skel->rodata->disallow_tgid < 0)
@@ -116,10 +120,10 @@ int main(int argc, char **argv)
 		long nr_enqueued = skel->bss->nr_enqueued;
 		long nr_dispatched = skel->bss->nr_dispatched;
 
-		printf("stats  : enq=%lu dsp=%lu delta=%ld reenq=%"PRIu64" deq=%"PRIu64" core=%"PRIu64"\n",
+		printf("stats  : enq=%lu dsp=%lu delta=%ld reenq=%"PRIu64" deq=%"PRIu64" core=%"PRIu64" exp=%"PRIu64"\n",
 		       nr_enqueued, nr_dispatched, nr_enqueued - nr_dispatched,
 		       skel->bss->nr_reenqueued, skel->bss->nr_dequeued,
-		       skel->bss->nr_core_sched_execed);
+		       skel->bss->nr_core_sched_execed, skel->bss->nr_expedited);
 		if (__COMPAT_has_ksym("scx_bpf_cpuperf_cur"))
 			printf("cpuperf: cur min/avg/max=%u/%u/%u target min/avg/max=%u/%u/%u\n",
 			       skel->bss->cpuperf_min,




[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux