[PATCH 2/4] [gdbsupport] Add task size parameter in parallel_for_each

Tom de Vries tdevries@suse.de
Fri Jul 22 17:03:43 GMT 2022


Add a task_size parameter to parallel_for_each, defaulting to nullptr, and use
the task size to distribute similarly-sized chunks to the threads.

Tested on x86_64-linux.
---
 gdb/unittests/parallel-for-selftests.c |  28 ++++++
 gdbsupport/parallel-for.h              | 113 ++++++++++++++++++++-----
 2 files changed, 119 insertions(+), 22 deletions(-)

diff --git a/gdb/unittests/parallel-for-selftests.c b/gdb/unittests/parallel-for-selftests.c
index 8a86b435fd3..6e341f64037 100644
--- a/gdb/unittests/parallel-for-selftests.c
+++ b/gdb/unittests/parallel-for-selftests.c
@@ -68,6 +68,34 @@ test (int n_threads)
 			  });
   SELF_CHECK (counter == 0);
 
+  auto task_size_max_ = [] (int iter)
+    {
+      return (size_t)SIZE_MAX;
+    };
+  auto task_size_max = gdb::make_function_view (task_size_max_);
+
+  counter = 0;
+  gdb::parallel_for_each (1, 0, NUMBER,
+			  [&] (int start, int end)
+			  {
+			    counter += end - start;
+			  }, task_size_max);
+  SELF_CHECK (counter == NUMBER);
+
+  auto task_size_one_ = [] (int iter)
+    {
+      return (size_t)1;
+    };
+  auto task_size_one = gdb::make_function_view (task_size_one_);
+
+  counter = 0;
+  gdb::parallel_for_each (1, 0, NUMBER,
+			  [&] (int start, int end)
+			  {
+			    counter += end - start;
+			  }, task_size_one);
+  SELF_CHECK (counter == NUMBER);
+
 #undef NUMBER
 }
 
diff --git a/gdbsupport/parallel-for.h b/gdbsupport/parallel-for.h
index 0037ee23ff3..4cd1dbf847e 100644
--- a/gdbsupport/parallel-for.h
+++ b/gdbsupport/parallel-for.h
@@ -23,6 +23,7 @@
 #include <algorithm>
 #include <type_traits>
 #include "gdbsupport/thread-pool.h"
+#include "gdbsupport/function-view.h"
 
 namespace gdb
 {
@@ -134,7 +135,8 @@ typename gdb::detail::par_for_accumulator<
     typename std::result_of<RangeFunction (RandomIt, RandomIt)>::type
   >::result_type
 parallel_for_each (unsigned n, RandomIt first, RandomIt last,
-		   RangeFunction callback)
+		   RangeFunction callback,
+		   gdb::function_view<size_t(RandomIt)> task_size = nullptr)
 {
   using result_type
     = typename std::result_of<RangeFunction (RandomIt, RandomIt)>::type;
@@ -148,17 +150,41 @@ parallel_for_each (unsigned n, RandomIt first, RandomIt last,
   size_t n_elements = last - first;
   size_t elts_per_thread = 0;
   size_t elts_left_over = 0;
+  size_t total_size = 0;
+  size_t size_per_thread = 0;
+  size_t max_element_size = n_elements == 0 ? 1 : SIZE_MAX / n_elements;
 
   if (n_threads > 1)
     {
-      /* Require that there should be at least N elements in a
-	 thread.  */
-      gdb_assert (n > 0);
-      if (n_elements / n_threads < n)
-	n_threads = std::max (n_elements / n, (size_t) 1);
-      elts_per_thread = n_elements / n_threads;
-      elts_left_over = n_elements % n_threads;
-      /* n_elements == n_threads * elts_per_thread + elts_left_over. */
+      if (task_size != nullptr)
+	{
+	  gdb_assert (n == 1);
+	  for (RandomIt i = first; i != last; ++i)
+	    {
+	      size_t element_size = task_size (i);
+	      gdb_assert (element_size > 0);
+	      if (element_size > max_element_size)
+		/* We could start scaling here, but that doesn't seem to be
+		   worth the effort.  */
+		element_size = max_element_size;
+	      size_t prev_total_size = total_size;
+	      total_size += element_size;
+	      /* Check for overflow.  */
+	      gdb_assert (prev_total_size < total_size);
+	    }
+	  size_per_thread = total_size / n_threads;
+	}
+      else
+	{
+	  /* Require that there should be at least N elements in a
+	     thread.  */
+	  gdb_assert (n > 0);
+	  if (n_elements / n_threads < n)
+	    n_threads = std::max (n_elements / n, (size_t) 1);
+	  elts_per_thread = n_elements / n_threads;
+	  elts_left_over = n_elements % n_threads;
+	  /* n_elements == n_threads * elts_per_thread + elts_left_over. */
+	}
     }
 
   size_t count = n_threads == 0 ? 0 : n_threads - 1;
@@ -167,20 +193,52 @@ parallel_for_each (unsigned n, RandomIt first, RandomIt last,
   if (parallel_for_each_debug)
     {
       debug_printf (_("Parallel for: n_elements: %zu\n"), n_elements);
-      debug_printf (_("Parallel for: minimum elements per thread: %u\n"), n);
-      debug_printf (_("Parallel for: elts_per_thread: %zu\n"), elts_per_thread);
+      if (task_size != nullptr)
+	{
+	  debug_printf (_("Parallel for: total_size: %zu\n"), total_size);
+	  debug_printf (_("Parallel for: size_per_thread: %zu\n"), size_per_thread);
+	}
+      else
+	{
+	  debug_printf (_("Parallel for: minimum elements per thread: %u\n"), n);
+	  debug_printf (_("Parallel for: elts_per_thread: %zu\n"), elts_per_thread);
+	}
     }
 
+  size_t remaining_size = total_size;
   for (int i = 0; i < count; ++i)
     {
-      RandomIt end = first + elts_per_thread;
-      if (i < elts_left_over)
-	/* Distribute the leftovers over the worker threads, to avoid having
-	   to handle all of them in a single thread.  */
-	end++;
+      RandomIt end;
+      size_t chunk_size = 0;
+      if (task_size == nullptr)
+	{
+	  end = first + elts_per_thread;
+	  if (i < elts_left_over)
+	    /* Distribute the leftovers over the worker threads, to avoid having
+	       to handle all of them in a single thread.  */
+	    end++;
+	}
+      else
+	{
+	  RandomIt j;
+	  for (j = first; j < last && chunk_size < size_per_thread; ++j)
+	    {
+	      size_t element_size = task_size (j);
+	      if (element_size > max_element_size)
+		element_size = max_element_size;
+	      chunk_size += element_size;
+	    }
+	  end = j;
+	  remaining_size -= chunk_size;
+	}
       if (parallel_for_each_debug)
-	debug_printf (_("Parallel for: elements on worker thread %i\t: %zu\n"),
-		      i, (size_t)(end - first));
+	{
+	  debug_printf (_("Parallel for: elements on worker thread %i\t: %zu"),
+			i, (size_t)(end - first));
+	  if (task_size != nullptr)
+	    debug_printf (_("\t(size: %zu)"), chunk_size);
+	  debug_printf (_("\n"));
+	}
       results.post (i, [=] ()
         {
 	  return callback (first, end);
@@ -190,12 +248,22 @@ parallel_for_each (unsigned n, RandomIt first, RandomIt last,
 
   for (int i = count; i < n_worker_threads; ++i)
     if (parallel_for_each_debug)
-      debug_printf (_("Parallel for: elements on worker thread %i\t: 0\n"), i);
+      {
+	debug_printf (_("Parallel for: elements on worker thread %i\t: 0"), i);
+	if (task_size != nullptr)
+	  debug_printf (_("\t(size: 0)"));
+	debug_printf (_("\n"));
+      }
 
   /* Process all the remaining elements in the main thread.  */
   if (parallel_for_each_debug)
-    debug_printf (_("Parallel for: elements on main thread\t\t: %zu\n"),
-		  (size_t)(last - first));
+    {
+      debug_printf (_("Parallel for: elements on main thread\t\t: %zu"),
+		    (size_t)(last - first));
+      if (task_size != nullptr)
+	debug_printf (_("\t(size: %zu)"), remaining_size);
+      debug_printf (_("\n"));
+    }
   return results.finish ([=] ()
     {
       return callback (first, last);
@@ -211,7 +279,8 @@ typename gdb::detail::par_for_accumulator<
     typename std::result_of<RangeFunction (RandomIt, RandomIt)>::type
   >::result_type
 sequential_for_each (unsigned n, RandomIt first, RandomIt last,
-		   RangeFunction callback)
+		     RangeFunction callback,
+		     gdb::function_view<size_t(RandomIt)> task_size = nullptr)
 {
   using result_type
     = typename std::result_of<RangeFunction (RandomIt, RandomIt)>::type;
-- 
2.35.3



More information about the Gdb-patches mailing list