LKML Archive on lore.kernel.org
help / color / mirror / Atom feed
From: Riccardo Mancini <rickyman7@gmail.com>
To: Arnaldo Carvalho de Melo <acme@kernel.org>
Cc: Ian Rogers <irogers@google.com>,
	Namhyung Kim <namhyung@kernel.org>,
	Peter Zijlstra <peterz@infradead.org>,
	Ingo Molnar <mingo@redhat.com>,
	Mark Rutland <mark.rutland@arm.com>, Jiri Olsa <jolsa@redhat.com>,
	linux-kernel@vger.kernel.org, linux-perf-users@vger.kernel.org,
	Alexey Bayduraev <alexey.v.bayduraev@linux.intel.com>,
	Riccardo Mancini <rickyman7@gmail.com>
Subject: [RFC PATCH v2 08/10] perf workqueue: add queue_work and flush_workqueue functions
Date: Fri, 30 Jul 2021 17:34:15 +0200	[thread overview]
Message-ID: <cf11e40889edfb10e296c5d6f575cd73cec99172.1627657061.git.rickyman7@gmail.com> (raw)
In-Reply-To: <cover.1627657061.git.rickyman7@gmail.com>

This patch adds functions to queue and wait work_structs, and
related tests.

When a new work item is added, the workqueue first checks if there
are threads to wake up. If so, it wakes it up with the given work item,
otherwise it will just add it to the list pending work items. A thread
which completes a work item will check this list before going to sleep.

Signed-off-by: Riccardo Mancini <rickyman7@gmail.com>
---
 tools/perf/tests/workqueue.c          |  70 ++++++++++++++++-
 tools/perf/util/workqueue/workqueue.c | 106 ++++++++++++++++++++++++++
 tools/perf/util/workqueue/workqueue.h |   6 ++
 3 files changed, 181 insertions(+), 1 deletion(-)

diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
index 2165a563e47bd6a5..2488ee971877b1f0 100644
--- a/tools/perf/tests/workqueue.c
+++ b/tools/perf/tests/workqueue.c
@@ -147,6 +147,27 @@ static int __test__threadpool(void *_args)
 	return ret;
 }
 
+struct test_work {
+	struct work_struct work;
+	int i;
+	int *array;
+};
+
+static void test_work_fn1(struct work_struct *work)
+{
+	struct test_work *mwork = container_of(work, struct test_work, work);
+
+	dummy_work(mwork->i);
+	mwork->array[mwork->i] = mwork->i+1;
+}
+
+static void test_work_fn2(struct work_struct *work)
+{
+	struct test_work *mwork = container_of(work, struct test_work, work);
+
+	dummy_work(mwork->i);
+	mwork->array[mwork->i] = mwork->i*2;
+}
 
 static int __workqueue__prepare(struct threadpool **pool,
 				struct workqueue_struct **wq,
@@ -175,21 +196,68 @@ static int __workqueue__teardown(struct threadpool *pool,
 	return __threadpool__teardown(pool);
 }
 
+static int __workqueue__exec_wait(struct workqueue_struct *wq,
+				int *array, struct test_work *works,
+				work_func_t func, int n_work_items)
+{
+	int ret, i;
+
+	for (i = 0; i < n_work_items; i++) {
+		works[i].array = array;
+		works[i].i = i;
+
+		init_work(&works[i].work);
+		works[i].work.func = func;
+		queue_work(wq, &works[i].work);
+	}
+
+	ret = flush_workqueue(wq);
+	TEST_ASSERT_VAL("workqueue flush failure", ret == 0);
+
+	return TEST_OK;
+}
+
+
 static int __test__workqueue(void *_args)
 {
 	struct workqueue_test_args_t *args = _args;
 	struct threadpool *pool;
 	struct workqueue_struct *wq;
-	int ret = __workqueue__prepare(&pool, &wq, args->pool_size);
+	int *array;
+	struct test_work *works;
+	int i, ret = __workqueue__prepare(&pool, &wq, args->pool_size);
 
+	if (ret)
+		return ret;
+
+	array = calloc(args->n_work_items, sizeof(*array));
+	TEST_ASSERT_VAL("failed array calloc", array);
+	works = calloc(args->n_work_items, sizeof(*works));
+	TEST_ASSERT_VAL("failed works calloc", works);
+
+	ret = __workqueue__exec_wait(wq, array, works, test_work_fn1,
+					args->n_work_items);
 	if (ret)
 		goto out;
 
+	for (i = 0; i < args->n_work_items; i++)
+		TEST_ASSERT_VAL("failed array check (1)", array[i] == i+1);
+
+	ret = __workqueue__exec_wait(wq, array, works, test_work_fn2,
+					args->n_work_items);
+	if (ret)
+		goto out;
+
+	for (i = 0; i < args->n_work_items; i++)
+		TEST_ASSERT_VAL("failed array check (2)", array[i] == 2*i);
+
 	ret = __workqueue__teardown(pool, wq);
 	if (ret)
 		goto out;
 
 out:
+	free(array);
+	free(works);
 	return ret;
 }
 
diff --git a/tools/perf/util/workqueue/workqueue.c b/tools/perf/util/workqueue/workqueue.c
index 16a55de25cf247d8..a32d62dac1ec04a6 100644
--- a/tools/perf/util/workqueue/workqueue.c
+++ b/tools/perf/util/workqueue/workqueue.c
@@ -23,12 +23,14 @@ enum worker_msg {
 
 enum workqueue_status {
 	WORKQUEUE_STATUS__READY,	/* wq is ready to receive work */
+	WORKQUEUE_STATUS__STOPPING,	/* wq is being destructed */
 	WORKQUEUE_STATUS__ERROR,
 	WORKQUEUE_STATUS__MAX
 };
 
 static const char * const workqueue_status_tags[] = {
 	"ready",
+	"stopping",
 	"error"
 };
 
@@ -120,6 +122,39 @@ __must_hold(&wq->lock)
 		pthread_cond_signal(&wq->idle_cond);
 }
 
+/**
+ * wake_worker - wake worker @w of workqueue @wq assigning @work to do
+ *
+ * Called from main thread.
+ * Moves worker from idle to busy list, assigns @work to it and sends it a
+ * wake up message.
+ *
+ * NB: this function releases the lock to be able to send the notification
+ * outside the critical section.
+ */
+static int wake_worker(struct workqueue_struct *wq, struct worker *w,
+			struct work_struct *work)
+__must_hold(&wq->lock)
+__releases(&wq->lock)
+{
+	enum worker_msg msg = WORKER_MSG__WAKE;
+	int ret;
+
+	list_move(&w->entry, &wq->busy_list);
+	w->current_work = work;
+	unlock_workqueue(wq);
+
+	// send wake msg outside critical section to reduce time spent inside it
+	ret = writen(w->msg_pipe[1], &msg, sizeof(msg));
+	if (ret < 0) {
+		pr_debug2("wake_worker[%d]: error seding msg: %s\n",
+			w->tidx, strerror(errno));
+		return -WORKQUEUE_ERROR__WRITEPIPE;
+	}
+
+	return 0;
+}
+
 /**
  * stop_worker - stop worker @w
  *
@@ -319,6 +354,11 @@ static int detach_threadpool_from_workqueue(struct workqueue_struct *wq)
 		return -WORKQUEUE_ERROR__NOTALLOWED;
 	}
 
+	wq->status = WORKQUEUE_STATUS__STOPPING;
+	ret = flush_workqueue(wq);
+	if (ret)
+		return ret;
+
 	lock_workqueue(wq);
 	for_each_idle_worker(wq, w) {
 		ret = stop_worker(w);
@@ -507,3 +547,69 @@ int workqueue_nr_threads(struct workqueue_struct *wq)
 {
 	return threadpool__size(wq->pool);
 }
+
+/**
+ * queue_work - add @work to @wq internal queue
+ *
+ * If there are idle threads, one of these will be woken up.
+ * Otherwise, the work is added to the pending list.
+ */
+int queue_work(struct workqueue_struct *wq, struct work_struct *work)
+{
+	struct worker *chosen_worker;
+	int ret = 0;
+
+	// in particular, this can fail if workqueue is marked to be stopping
+	if (wq->status != WORKQUEUE_STATUS__READY) {
+		pr_debug2("workqueue: trying to queue but workqueue is not ready\n");
+		return -WORKQUEUE_ERROR__NOTALLOWED;
+	}
+
+	lock_workqueue(wq);
+	if (list_empty(&wq->idle_list)) {
+		list_add_tail(&work->entry, &wq->pending);
+		unlock_workqueue(wq);
+		pr_debug("workqueue: queued new work item\n");
+	} else {
+		chosen_worker = list_first_entry(&wq->idle_list, struct worker, entry);
+		ret = wake_worker(wq, chosen_worker, work);
+		pr_debug("workqueue: woke worker %d\n", chosen_worker->tidx);
+	}
+
+	if (ret) {
+		wq->status = WORKQUEUE_STATUS__ERROR;
+		return ret;
+	}
+	return 0;
+}
+
+/**
+ * flush_workqueue - wait for all currently executed and pending work to finish
+ *
+ * This function blocks until all threads become idle.
+ */
+int flush_workqueue(struct workqueue_struct *wq)
+{
+	int err = 0, ret;
+
+	lock_workqueue(wq);
+	while (!list_empty(&wq->busy_list)) {
+		ret = pthread_cond_wait(&wq->idle_cond, &wq->lock);
+		if (ret) {
+			pr_debug2("%s: error in pthread_cond_wait\n", __func__);
+			err = -ret;
+			break;
+		}
+	}
+	unlock_workqueue(wq);
+
+	return err;
+}
+
+/**
+ * init_work - initialize the @work struct
+ */
+void init_work(struct work_struct *work)
+{
+	INIT_LIST_HEAD(&work->entry);
+}
diff --git a/tools/perf/util/workqueue/workqueue.h b/tools/perf/util/workqueue/workqueue.h
index 456cd8b6cb2a26d8..ec998291b6a0623d 100644
--- a/tools/perf/util/workqueue/workqueue.h
+++ b/tools/perf/util/workqueue/workqueue.h
@@ -22,6 +22,12 @@ extern int destroy_workqueue(struct workqueue_struct *wq);
 
 extern int workqueue_nr_threads(struct workqueue_struct *wq);
 
+extern int queue_work(struct workqueue_struct *wq, struct work_struct *work);
+
+extern int flush_workqueue(struct workqueue_struct *wq);
+
+extern void init_work(struct work_struct *work);
+
 #define WORKQUEUE_STRERR_BUFSIZE (128+THREADPOOL_STRERR_BUFSIZE)
 #define WORKQUEUE_ERROR__OFFSET 512
 enum {
-- 
2.31.1


  parent reply	other threads:[~2021-07-30 15:38 UTC|newest]

Thread overview: 21+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
     [not found] <cover.1627657061.git.rickyman7@gmail.com>
2021-07-30 15:34 ` [RFC PATCH v2 01/10] perf workqueue: threadpool creation and destruction Riccardo Mancini
2021-08-07  2:24   ` Namhyung Kim
2021-08-09 10:30     ` Riccardo Mancini
2021-08-10 18:54       ` Namhyung Kim
2021-08-10 20:24         ` Arnaldo Carvalho de Melo
2021-08-11 17:55           ` Riccardo Mancini
2021-07-30 15:34 ` [RFC PATCH v2 02/10] perf tests: add test for workqueue Riccardo Mancini
2021-07-30 15:34 ` [RFC PATCH v2 03/10] perf workqueue: add threadpool start and stop functions Riccardo Mancini
2021-08-07  2:43   ` Namhyung Kim
2021-08-09 10:35     ` Riccardo Mancini
2021-07-30 15:34 ` [RFC PATCH v2 04/10] perf workqueue: add threadpool execute and wait functions Riccardo Mancini
2021-08-07  2:56   ` Namhyung Kim
2021-07-30 15:34 ` [RFC PATCH v2 05/10] tools: add sparse context/locking annotations in compiler-types.h Riccardo Mancini
2021-07-30 15:34 ` [RFC PATCH v2 06/10] perf workqueue: introduce workqueue struct Riccardo Mancini
2021-08-09 12:04   ` Jiri Olsa
2021-07-30 15:34 ` [RFC PATCH v2 07/10] perf workqueue: implement worker thread and management Riccardo Mancini
2021-07-30 15:34 ` Riccardo Mancini [this message]
2021-07-30 15:34 ` [RFC PATCH v2 09/10] perf workqueue: add utility to execute a for loop in parallel Riccardo Mancini
2021-07-30 15:34 ` [RFC PATCH v2 10/10] perf synthetic-events: use workqueue parallel_for Riccardo Mancini
2021-08-09 12:04   ` Jiri Olsa
2021-08-09 13:24     ` Riccardo Mancini

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=cf11e40889edfb10e296c5d6f575cd73cec99172.1627657061.git.rickyman7@gmail.com \
    --to=rickyman7@gmail.com \
    --cc=acme@kernel.org \
    --cc=alexey.v.bayduraev@linux.intel.com \
    --cc=irogers@google.com \
    --cc=jolsa@redhat.com \
    --cc=linux-kernel@vger.kernel.org \
    --cc=linux-perf-users@vger.kernel.org \
    --cc=mark.rutland@arm.com \
    --cc=mingo@redhat.com \
    --cc=namhyung@kernel.org \
    --cc=peterz@infradead.org \
    --subject='Re: [RFC PATCH v2 08/10] perf workqueue: add queue_work and flush_workqueue functions' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).