This is the mail archive of the elfutils-devel@sourceware.org mailing list for the elfutils project.


Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]
Other format: [Raw text]

PATCH: debuginfod scan threading rework


Hi -

This patch dramatically improves concurrency during scanning multiple
and/or large directories, and thus speeds up file/archive scanning.

This is an independent patch from the debuginfod-fdcache bit from the
other day.  They should compose well and give further performance
improvements.

git diff -w follows:

commit b225b73266a068afa87ac06858c81688f0c00ea3 (HEAD -> fche/debuginfod-workq, origin/fche/debuginfod-workq)
Author: Frank Ch. Eigler <fche@redhat.com>
Date:   Tue Dec 31 20:06:30 2019 -0500

    debuginfod: rework threading model for file/archive scanning
    
    We switch from a thread per supplied PATH, with a semaphore based
    concurrency control, to a fixed number of worker threads collecting
    the result of a plain directory traversal being put into a work queue.
    This allows maximal continuous concurrency, even if the PATH
    directories are dramatically differently sized.  There is no more need
    to use concurrency-motivated subdirectory wildcards for PATH entries:
    just a single top level directory will work fast.  doc & tests incl.

diff --git a/debuginfod/ChangeLog b/debuginfod/ChangeLog
index 1582eba5bc0e..68102ceabf1f 100644
--- a/debuginfod/ChangeLog
+++ b/debuginfod/ChangeLog
@@ -1,3 +1,19 @@
+2019-12-31  Frank Ch. Eigler  <fche@redhat.com>
+
+	* debuginfod.cxx: Rework threading model.
+	(workq): New class for concurrent work-queue.
+	(semaphore): Removed class, now unused.
+	(scan_source_file_path): Rework into ...
+	(scan_source_file): New function.
+	(thread_main_scan_source_file_path): Nuke.
+	(scan_source_archive_path): Rework into ...
+	(scan_archive_file): New function.
+	(thread_main_scanner): New function for scanner threads.
+	(thread_main_fts_source_paths): New function for traversal thread.
+	(scan_source_paths): ... doing this.
+	(thread_groom): Tweak metrics for consistency.
+	(main): Start 1 traversal and N scanner threads if needed.
+
 2019-12-22  Frank Ch. Eigler  <fche@redhat.com>
 
 	* debuginfod.cxx (*_rpm_*): Rename to *_archive_* throughout.
diff --git a/debuginfod/debuginfod.cxx b/debuginfod/debuginfod.cxx
index 70cb95fecd65..05fbacc2b873 100644
--- a/debuginfod/debuginfod.cxx
+++ b/debuginfod/debuginfod.cxx
@@ -79,6 +79,7 @@ extern "C" {
 #include <ostream>
 #include <sstream>
 #include <mutex>
+#include <deque>
 #include <condition_variable>
 #include <thread>
 // #include <regex> // on rhel7 gcc 4.8, not competent
@@ -397,6 +398,7 @@ static void inc_metric(const string& metric,
 static void add_metric(const string& metric,
                        const string& lname, const string& lvalue,
                        int64_t value);
+// static void add_metric(const string& metric, int64_t value);
 
 /* Handle program arguments.  */
 static error_t
@@ -519,38 +521,56 @@ struct elfutils_exception: public reportable_exception
 
 ////////////////////////////////////////////////////////////////////////
 
-// a c++ counting-semaphore class ... since we're c++11 not c++20
-
-class semaphore
+template <typename Payload>
+class workq
 {
+  deque<Payload> q;
+  mutex mtx;
+  condition_variable cv;
+  bool dead;
+
 public:
-  semaphore (unsigned c=1): count(c) {}
-  inline void notify () {
+  workq() { dead = false;}
+  ~workq() {}
+
+  void push_back(const Payload& p)
+  {
     unique_lock<mutex> lock(mtx);
-    count++;
+    q.push_back(p);
+    set_metric("thread_work_pending","role","scan", q.size());
     cv.notify_one();
   }
-  inline void wait() {
+
+  void nuke() {
+    unique_lock<mutex> lock(mtx);
+    // optional: q.clear();
+    dead = true;
+    cv.notify_all();
+  }
+
+  bool wait_front (Payload& p)
+  {
     unique_lock<mutex> lock(mtx);
-    while (count == 0)
+    while (q.size() == 0 && !dead)
       cv.wait(lock);
-    count--;
+    if (dead)
+      return false;
+    else
+      {
+        p = q.front();
+        q.pop_front();
+        set_metric("thread_work_pending","role","scan", q.size());
+        return true;
+      }
   }
-private:
-  mutex mtx;
-  condition_variable cv;
-  unsigned count;
 };
 
+typedef struct stat stat_t;
+typedef pair<string,stat_t> scan_payload;
+static workq<scan_payload> scanq; // just a single one
+// producer: thread_main_fts_source_paths()
+// consumer: thread_main_scanner()
 
-class semaphore_borrower
-{
-public:
-  semaphore_borrower(semaphore* s): sem(s) { sem->wait(); }
-  ~semaphore_borrower() { sem->notify(); }
-private:
-  semaphore* sem;
-};
 
 
 ////////////////////////////////////////////////////////////////////////
@@ -1181,6 +1201,15 @@ add_metric(const string& metric,
   unique_lock<mutex> lock(metrics_lock);
   metrics[key] += value;
 }
+#if 0
+static void
+add_metric(const string& metric,
+           int64_t value)
+{
+  unique_lock<mutex> lock(metrics_lock);
+  metrics[metric] += value;
+}
+#endif
 
 
 // and more for higher arity labels if needed
@@ -1568,102 +1597,24 @@ elf_classify (int fd, bool &executable_p, bool &debuginfo_p, string &buildid, se
 }
 
 
-static semaphore* scan_concurrency_sem = 0; // used to implement -c load limiting
-
-
 static void
-scan_source_file_path (const string& dir)
-{
-  obatched(clog) << "fts/file traversing " << dir << endl;
-
-  struct timeval tv_start, tv_end;
-  gettimeofday (&tv_start, NULL);
-
-  sqlite_ps ps_upsert_buildids (db, "file-buildids-intern", "insert or ignore into " BUILDIDS "_buildids VALUES (NULL, ?);");
-  sqlite_ps ps_upsert_files (db, "file-files-intern", "insert or ignore into " BUILDIDS "_files VALUES (NULL, ?);");
-  sqlite_ps ps_upsert_de (db, "file-de-upsert",
-                          "insert or ignore into " BUILDIDS "_f_de "
-                          "(buildid, debuginfo_p, executable_p, file, mtime) "
-                          "values ((select id from " BUILDIDS "_buildids where hex = ?),"
-                          "        ?,?,"
-                          "        (select id from " BUILDIDS "_files where name = ?), ?);");
-  sqlite_ps ps_upsert_s (db, "file-s-upsert",
-                         "insert or ignore into " BUILDIDS "_f_s "
-                         "(buildid, artifactsrc, file, mtime) "
-                         "values ((select id from " BUILDIDS "_buildids where hex = ?),"
-                         "        (select id from " BUILDIDS "_files where name = ?),"
-                         "        (select id from " BUILDIDS "_files where name = ?),"
-                         "        ?);");
-  sqlite_ps ps_query (db, "file-negativehit-find",
-                      "select 1 from " BUILDIDS "_file_mtime_scanned where sourcetype = 'F' and file = (select id from " BUILDIDS "_files where name = ?) and mtime = ?;");
-  sqlite_ps ps_scan_done (db, "file-scanned",
-                          "insert or ignore into " BUILDIDS "_file_mtime_scanned (sourcetype, file, mtime, size)"
-                          "values ('F', (select id from " BUILDIDS "_files where name = ?), ?, ?);");
-
-
-  char * const dirs[] = { (char*) dir.c_str(), NULL };
-
-  unsigned fts_scanned=0, fts_regex=0, fts_cached=0, fts_debuginfo=0, fts_executable=0, fts_sourcefiles=0;
-
-  FTS *fts = fts_open (dirs,
-                       (traverse_logical ? FTS_LOGICAL : FTS_PHYSICAL|FTS_XDEV)
-                       | FTS_NOCHDIR /* multithreaded */,
-                       NULL);
-  if (fts == NULL)
-    {
-      obatched(cerr) << "cannot fts_open " << dir << endl;
-      return;
-    }
-
-  FTSENT *f;
-  while ((f = fts_read (fts)) != NULL)
-    {
-      semaphore_borrower handle_one_file (scan_concurrency_sem);
-
-      fts_scanned ++;
-      if (interrupted)
-        break;
-
-      if (verbose > 2)
-        obatched(clog) << "fts/file traversing " << f->fts_path << endl;
-
-      try
-        {
-          /* Found a file.  Convert it to an absolute path, so
-             the buildid database does not have relative path
-             names that are unresolvable from a subsequent run
-             in a different cwd. */
-          char *rp = realpath(f->fts_path, NULL);
-          if (rp == NULL)
-            continue; // ignore dangling symlink or such
-          string rps = string(rp);
-          free (rp);
-
-          bool ri = !regexec (&file_include_regex, rps.c_str(), 0, 0, 0);
-          bool rx = !regexec (&file_exclude_regex, rps.c_str(), 0, 0, 0);
-          if (!ri || rx)
-            {
-              if (verbose > 3)
-                obatched(clog) << "fts/file skipped by regex " << (!ri ? "I" : "") << (rx ? "X" : "") << endl;
-              fts_regex ++;
-              continue;
-            }
-
-          switch (f->fts_info)
-            {
-            case FTS_D:
-              break;
-
-            case FTS_DP:
-              break;
-
-            case FTS_F:
+scan_source_file (const string& rps, const stat_t& st,
+                  sqlite_ps& ps_upsert_buildids,
+                  sqlite_ps& ps_upsert_files,
+                  sqlite_ps& ps_upsert_de,
+                  sqlite_ps& ps_upsert_s,
+                  sqlite_ps& ps_query,
+                  sqlite_ps& ps_scan_done,
+                  unsigned& fts_cached,
+                  unsigned& fts_executable,
+                  unsigned& fts_debuginfo,
+                  unsigned& fts_sourcefiles)
 {
   /* See if we know of it already. */
   int rc = ps_query
     .reset()
     .bind(1, rps)
-                  .bind(2, f->fts_statp->st_mtime)
+    .bind(2, st.st_mtime)
     .step();
   ps_query.reset();
   if (rc == SQLITE_ROW) // i.e., a result, as opposed to DONE (no results)
@@ -1672,7 +1623,7 @@ scan_source_file_path (const string& dir)
     // (so is stored with buildid=NULL)
     {
       fts_cached++;
-                    continue;
+      return;
     }
 
   bool executable_p = false, debuginfo_p = false; // E and/or D
@@ -1688,12 +1639,10 @@ scan_source_file_path (const string& dir)
         throw libc_exception(errno, string("open ") + rps);
       inc_metric ("scanned_total","source","file");
     }
-
   // NB: we catch exceptions here too, so that we can
   // cache the corrupt-elf case (!executable_p &&
   // !debuginfo_p) just below, just as if we had an
   // EPERM error from open(2).
-
   catch (const reportable_exception& e)
     {
       e.report(clog);
@@ -1735,7 +1684,7 @@ scan_source_file_path (const string& dir)
         .bind(2, debuginfo_p ? 1 : 0)
         .bind(3, executable_p ? 1 : 0)
         .bind(4, rps)
-                      .bind(5, f->fts_statp->st_mtime)
+        .bind(5, st.st_mtime)
         .step_ok_done();
     }
   if (executable_p)
@@ -1792,92 +1741,17 @@ scan_source_file_path (const string& dir)
   ps_scan_done
     .reset()
     .bind(1, rps)
-                  .bind(2, f->fts_statp->st_mtime)
-                  .bind(3, f->fts_statp->st_size)
+    .bind(2, st.st_mtime)
+    .bind(3, st.st_size)
     .step_ok_done();
 
   if (verbose > 2)
     obatched(clog) << "recorded buildid=" << buildid << " file=" << rps
-                                 << " mtime=" << f->fts_statp->st_mtime << " atype="
+                   << " mtime=" << st.st_mtime << " atype="
                    << (executable_p ? "E" : "")
                    << (debuginfo_p ? "D" : "") << endl;
 }
-              break;
-
-            case FTS_ERR:
-            case FTS_NS:
-              throw libc_exception(f->fts_errno, string("fts/file traversal ") + string(f->fts_path));
-
-            default:
-            case FTS_SL: /* ignore symlinks; seen in non-L mode only */
-              break;
-            }
-
-          if ((verbose && f->fts_info == FTS_DP) ||
-              (verbose > 1 && f->fts_info == FTS_F))
-            obatched(clog) << "fts/file traversing " << rps << ", scanned=" << fts_scanned
-                 << ", regex-skipped=" << fts_regex
-                 << ", cached=" << fts_cached << ", debuginfo=" << fts_debuginfo
-                 << ", executable=" << fts_executable << ", source=" << fts_sourcefiles << endl;
-        }
-      catch (const reportable_exception& e)
-        {
-          e.report(clog);
-        }
-    }
-  fts_close (fts);
 
-  gettimeofday (&tv_end, NULL);
-  double deltas = (tv_end.tv_sec - tv_start.tv_sec) + (tv_end.tv_usec - tv_start.tv_usec)*0.000001;
-
-  obatched(clog) << "fts/file traversed " << dir << " in " << deltas << "s, scanned=" << fts_scanned
-                 << ", regex-skipped=" << fts_regex
-                 << ", cached=" << fts_cached << ", debuginfo=" << fts_debuginfo
-                 << ", executable=" << fts_executable << ", source=" << fts_sourcefiles << endl;
-}
-
-
-static void*
-thread_main_scan_source_file_path (void* arg)
-{
-  string dir = string((const char*) arg);
-
-  unsigned rescan_timer = 0;
-  sig_atomic_t forced_rescan_count = 0;
-  set_metric("thread_timer_max", "file", dir, rescan_s);
-  set_metric("thread_tid", "file", dir, tid());
-  while (! interrupted)
-    {
-      set_metric("thread_timer", "file", dir, rescan_timer);
-      set_metric("thread_forced_total", "file", dir, forced_rescan_count);
-      if (rescan_s && rescan_timer > rescan_s)
-        rescan_timer = 0;
-      if (sigusr1 != forced_rescan_count)
-        {
-          forced_rescan_count = sigusr1;
-          rescan_timer = 0;
-        }
-      if (rescan_timer == 0)
-        try
-          {
-            set_metric("thread_working", "file", dir, time(NULL));
-            inc_metric("thread_work_total", "file", dir);
-            scan_source_file_path (dir);
-            set_metric("thread_working", "file", dir, 0);
-          }
-        catch (const sqlite_exception& e)
-          {
-            obatched(cerr) << e.message << endl;
-          }
-      sleep (1);
-      rescan_timer ++;
-    }
-
-  return 0;
-}
-
-
-////////////////////////////////////////////////////////////////////////
 
 
 
@@ -2061,106 +1935,25 @@ archive_classify (const string& rps, string& archive_extension,
 
 // scan for archive files such as .rpm
 static void
-scan_source_archive_path (const string& dir)
+scan_archive_file (const string& rps, const stat_t& st,
+                   sqlite_ps& ps_upsert_buildids,
+                   sqlite_ps& ps_upsert_files,
+                   sqlite_ps& ps_upsert_de,
+                   sqlite_ps& ps_upsert_sref,
+                   sqlite_ps& ps_upsert_sdef,
+                   sqlite_ps& ps_query,
+                   sqlite_ps& ps_scan_done,
+                   unsigned& fts_cached,
+                   unsigned& fts_executable,
+                   unsigned& fts_debuginfo,
+                   unsigned& fts_sref,
+                   unsigned& fts_sdef)
 {
-  obatched(clog) << "fts/archive traversing " << dir << endl;
-
-  sqlite_ps ps_upsert_buildids (db, "rpm-buildid-intern", "insert or ignore into " BUILDIDS "_buildids VALUES (NULL, ?);");
-  sqlite_ps ps_upsert_files (db, "rpm-file-intern", "insert or ignore into " BUILDIDS "_files VALUES (NULL, ?);");
-  sqlite_ps ps_upsert_de (db, "rpm-de-insert",
-                          "insert or ignore into " BUILDIDS "_r_de (buildid, debuginfo_p, executable_p, file, mtime, content) values ("
-                          "(select id from " BUILDIDS "_buildids where hex = ?), ?, ?, "
-                          "(select id from " BUILDIDS "_files where name = ?), ?, "
-                          "(select id from " BUILDIDS "_files where name = ?));");
-  sqlite_ps ps_upsert_sref (db, "rpm-sref-insert",
-                            "insert or ignore into " BUILDIDS "_r_sref (buildid, artifactsrc) values ("
-                            "(select id from " BUILDIDS "_buildids where hex = ?), "
-                            "(select id from " BUILDIDS "_files where name = ?));");
-  sqlite_ps ps_upsert_sdef (db, "rpm-sdef-insert",
-                            "insert or ignore into " BUILDIDS "_r_sdef (file, mtime, content) values ("
-                            "(select id from " BUILDIDS "_files where name = ?), ?,"
-                            "(select id from " BUILDIDS "_files where name = ?));");
-  sqlite_ps ps_query (db, "rpm-negativehit-query",
-                      "select 1 from " BUILDIDS "_file_mtime_scanned where "
-                      "sourcetype = 'R' and file = (select id from " BUILDIDS "_files where name = ?) and mtime = ?;");
-  sqlite_ps ps_scan_done (db, "rpm-scanned",
-                          "insert or ignore into " BUILDIDS "_file_mtime_scanned (sourcetype, file, mtime, size)"
-                          "values ('R', (select id from " BUILDIDS "_files where name = ?), ?, ?);");
-
-  char * const dirs[] = { (char*) dir.c_str(), NULL };
-
-  struct timeval tv_start, tv_end;
-  gettimeofday (&tv_start, NULL);
-  unsigned fts_scanned=0, fts_regex=0, fts_cached=0, fts_debuginfo=0;
-  unsigned fts_executable=0, fts_archive = 0, fts_sref=0, fts_sdef=0;
-
-  FTS *fts = fts_open (dirs,
-                       (traverse_logical ? FTS_LOGICAL : FTS_PHYSICAL|FTS_XDEV)
-                       | FTS_NOCHDIR /* multithreaded */,
-                       NULL);
-  if (fts == NULL)
-    {
-      obatched(cerr) << "cannot fts_open " << dir << endl;
-      return;
-    }
-
-  FTSENT *f;
-  while ((f = fts_read (fts)) != NULL)
-    {
-      semaphore_borrower handle_one_file (scan_concurrency_sem);
-
-      fts_scanned ++;
-      if (interrupted)
-        break;
-
-      if (verbose > 2)
-        obatched(clog) << "fts/archive traversing " << f->fts_path << endl;
-
-      try
-        {
-          /* Found a file.  Convert it to an absolute path, so
-             the buildid database does not have relative path
-             names that are unresolvable from a subsequent run
-             in a different cwd. */
-          char *rp = realpath(f->fts_path, NULL);
-          if (rp == NULL)
-            continue; // ignore dangling symlink or such
-          string rps = string(rp);
-          free (rp);
-
-          bool ri = !regexec (&file_include_regex, rps.c_str(), 0, 0, 0);
-          bool rx = !regexec (&file_exclude_regex, rps.c_str(), 0, 0, 0);
-          if (!ri || rx)
-            {
-              if (verbose > 3)
-                obatched(clog) << "fts/archive skipped by regex " << (!ri ? "I" : "") << (rx ? "X" : "") << endl;
-              fts_regex ++;
-              continue;
-            }
-
-          switch (f->fts_info)
-            {
-            case FTS_D:
-              break;
-
-            case FTS_DP:
-              break;
-
-            case FTS_F:
-              {
-		bool any = false;
-                for (auto&& arch : scan_archives)
-                  if (string_endswith(rps, arch.first))
-		    any = true;
-		if (! any)
-                  continue;
-                fts_archive ++;
-
   /* See if we know of it already. */
   int rc = ps_query
     .reset()
     .bind(1, rps)
-                  .bind(2, f->fts_statp->st_mtime)
+    .bind(2, st.st_mtime)
     .step();
   ps_query.reset();
   if (rc == SQLITE_ROW) // i.e., a result, as opposed to DONE (no results)
@@ -2170,7 +1963,7 @@ scan_source_archive_path (const string& dir)
     // (so is stored with buildid=NULL)
     {
       fts_cached ++;
-                    continue;
+      return;
     }
 
   // intern the archive file name
@@ -2188,7 +1981,7 @@ scan_source_archive_path (const string& dir)
       archive_classify (rps, archive_extension,
                         ps_upsert_buildids, ps_upsert_files,
                         ps_upsert_de, ps_upsert_sref, ps_upsert_sdef, // dalt
-                                  f->fts_statp->st_mtime,
+                        st.st_mtime,
                         my_fts_executable, my_fts_debuginfo, my_fts_sref, my_fts_sdef,
                         my_fts_sref_complete_p);
       inc_metric ("scanned_total","source",archive_extension + " archive");
@@ -2206,7 +1999,7 @@ scan_source_archive_path (const string& dir)
 
   if (verbose > 2)
     obatched(clog) << "scanned archive=" << rps
-                                 << " mtime=" << f->fts_statp->st_mtime
+                   << " mtime=" << st.st_mtime
                    << " executables=" << my_fts_executable
                    << " debuginfos=" << my_fts_debuginfo
                    << " srefs=" << my_fts_sref
@@ -2222,61 +2015,228 @@ scan_source_archive_path (const string& dir)
     ps_scan_done
       .reset()
       .bind(1, rps)
-                    .bind(2, f->fts_statp->st_mtime)
-                    .bind(3, f->fts_statp->st_size)
+      .bind(2, st.st_mtime)
+      .bind(3, st.st_size)
       .step_ok_done();
 }
-              break;
 
-            case FTS_ERR:
-            case FTS_NS:
-              throw libc_exception(f->fts_errno, string("fts/archive traversal ") + string(f->fts_path));
 
-            default:
-            case FTS_SL: /* ignore symlinks; seen in non-L mode only */
-              break;
-            }
 
-          if ((verbose && f->fts_info == FTS_DP) ||
-              (verbose > 1 && f->fts_info == FTS_F))
-            obatched(clog) << "fts/archive traversing " << rps << ", scanned=" << fts_scanned
-                           << ", regex-skipped=" << fts_regex
-                           << ", archive=" << fts_archive << ", cached=" << fts_cached << ", debuginfo=" << fts_debuginfo
-                           << ", executable=" << fts_executable
-                           << ", sourcerefs=" << fts_sref << ", sourcedefs=" << fts_sdef << endl;
+////////////////////////////////////////////////////////////////////////
+
+
+
+// The thread that consumes file names off of the scanq.  We hold
+// the persistent sqlite_ps's at this level and delegate file/archive
+// scanning to other functions.
+static void*
+thread_main_scanner (void* arg)
+{
+  (void) arg;
+
+  // all the prepared statements fit to use, the _f_ set:
+  sqlite_ps ps_f_upsert_buildids (db, "file-buildids-intern", "insert or ignore into " BUILDIDS "_buildids VALUES (NULL, ?);");
+  sqlite_ps ps_f_upsert_files (db, "file-files-intern", "insert or ignore into " BUILDIDS "_files VALUES (NULL, ?);");
+  sqlite_ps ps_f_upsert_de (db, "file-de-upsert",
+                          "insert or ignore into " BUILDIDS "_f_de "
+                          "(buildid, debuginfo_p, executable_p, file, mtime) "
+                          "values ((select id from " BUILDIDS "_buildids where hex = ?),"
+                          "        ?,?,"
+                          "        (select id from " BUILDIDS "_files where name = ?), ?);");
+  sqlite_ps ps_f_upsert_s (db, "file-s-upsert",
+                         "insert or ignore into " BUILDIDS "_f_s "
+                         "(buildid, artifactsrc, file, mtime) "
+                         "values ((select id from " BUILDIDS "_buildids where hex = ?),"
+                         "        (select id from " BUILDIDS "_files where name = ?),"
+                         "        (select id from " BUILDIDS "_files where name = ?),"
+                         "        ?);");
+  sqlite_ps ps_f_query (db, "file-negativehit-find",
+                        "select 1 from " BUILDIDS "_file_mtime_scanned where sourcetype = 'F' "
+                        "and file = (select id from " BUILDIDS "_files where name = ?) and mtime = ?;");
+  sqlite_ps ps_f_scan_done (db, "file-scanned",
+                          "insert or ignore into " BUILDIDS "_file_mtime_scanned (sourcetype, file, mtime, size)"
+                          "values ('F', (select id from " BUILDIDS "_files where name = ?), ?, ?);");
+
+  // and now for the _r_ set
+  sqlite_ps ps_r_upsert_buildids (db, "rpm-buildid-intern", "insert or ignore into " BUILDIDS "_buildids VALUES (NULL, ?);");
+  sqlite_ps ps_r_upsert_files (db, "rpm-file-intern", "insert or ignore into " BUILDIDS "_files VALUES (NULL, ?);");
+  sqlite_ps ps_r_upsert_de (db, "rpm-de-insert",
+                          "insert or ignore into " BUILDIDS "_r_de (buildid, debuginfo_p, executable_p, file, mtime, content) values ("
+                          "(select id from " BUILDIDS "_buildids where hex = ?), ?, ?, "
+                          "(select id from " BUILDIDS "_files where name = ?), ?, "
+                          "(select id from " BUILDIDS "_files where name = ?));");
+  sqlite_ps ps_r_upsert_sref (db, "rpm-sref-insert",
+                            "insert or ignore into " BUILDIDS "_r_sref (buildid, artifactsrc) values ("
+                            "(select id from " BUILDIDS "_buildids where hex = ?), "
+                            "(select id from " BUILDIDS "_files where name = ?));");
+  sqlite_ps ps_r_upsert_sdef (db, "rpm-sdef-insert",
+                            "insert or ignore into " BUILDIDS "_r_sdef (file, mtime, content) values ("
+                            "(select id from " BUILDIDS "_files where name = ?), ?,"
+                            "(select id from " BUILDIDS "_files where name = ?));");
+  sqlite_ps ps_r_query (db, "rpm-negativehit-query",
+                      "select 1 from " BUILDIDS "_file_mtime_scanned where "
+                      "sourcetype = 'R' and file = (select id from " BUILDIDS "_files where name = ?) and mtime = ?;");
+  sqlite_ps ps_r_scan_done (db, "rpm-scanned",
+                          "insert or ignore into " BUILDIDS "_file_mtime_scanned (sourcetype, file, mtime, size)"
+                          "values ('R', (select id from " BUILDIDS "_files where name = ?), ?, ?);");
+
+
+  unsigned fts_cached = 0, fts_executable = 0, fts_debuginfo = 0, fts_sourcefiles = 0;
+  unsigned fts_sref = 0, fts_sdef = 0;
+
+  add_metric("thread_count", "role", "scan", 1);
+  add_metric("thread_busy", "role", "scan", 1);
+  while (! interrupted)
+    {
+      scan_payload p;
+
+      add_metric("thread_busy", "role", "scan", -1);
+      bool gotone = scanq.wait_front(p);
+      add_metric("thread_busy", "role", "scan", 1);
+      if (! gotone) continue; // or break
+      inc_metric("thread_work_total", "role","scan");
+
+      try
+        {
+          bool scan_archive = false;
+          for (auto&& arch : scan_archives)
+            if (string_endswith(p.first, arch.first))
+              scan_archive = true;
+
+          if (scan_archive)
+            scan_archive_file (p.first, p.second,
+                               ps_r_upsert_buildids,
+                               ps_r_upsert_files,
+                               ps_r_upsert_de,
+                               ps_r_upsert_sref,
+                               ps_r_upsert_sdef,
+                               ps_r_query,
+                               ps_r_scan_done,
+                               fts_cached,
+                               fts_executable,
+                               fts_debuginfo,
+                               fts_sref,
+                               fts_sdef);
+
+          if (scan_files) // NB: maybe "else if" ?
+            scan_source_file (p.first, p.second,
+                              ps_f_upsert_buildids,
+                              ps_f_upsert_files,
+                              ps_f_upsert_de,
+                              ps_f_upsert_s,
+                              ps_f_query,
+                              ps_f_scan_done,
+                              fts_cached, fts_executable, fts_debuginfo, fts_sourcefiles);
         }
       catch (const reportable_exception& e)
         {
-          e.report(clog);
+          e.report(cerr);
         }
     }
-  fts_close (fts);
 
+  add_metric("thread_busy", "role", "scan", -1);
+  return 0;
+}
+
+
+
+// The thread that traverses all the source_paths and enqueues all the
+// matching files into the file/archive scan queue.
+static void
+scan_source_paths()
+{
+  // Turn the source_paths into an fts(3)-compatible char**.  Since
+  // source_paths[] does not change after argv processing, the
+  // c_str()'s are safe to keep around awile.
+  vector<const char *> sps;
+  for (auto&& sp: source_paths)
+    sps.push_back(sp.c_str());
+  sps.push_back(NULL);
+
+  FTS *fts = fts_open ((char * const *)sps.data(),
+                      (traverse_logical ? FTS_LOGICAL : FTS_PHYSICAL|FTS_XDEV)
+                      | FTS_NOCHDIR /* multithreaded */,
+                      NULL);
+  if (fts == NULL)
+    throw libc_exception(errno, "cannot fts_open");
+  defer_dtor<FTS*,int> fts_cleanup (fts, fts_close);
+
+  struct timeval tv_start, tv_end;
+  gettimeofday (&tv_start, NULL);
+  unsigned fts_scanned = 0, fts_regex = 0;
+
+  FTSENT *f;
+  while ((f = fts_read (fts)) != NULL)
+  {
+    if (interrupted) break;
+
+    fts_scanned ++;
+
+    if (verbose > 2)
+      obatched(clog) << "fts traversing " << f->fts_path << endl;
+
+    /* Found a file.  Convert it to an absolute path, so
+       the buildid database does not have relative path
+       names that are unresolvable from a subsequent run
+       in a different cwd. */
+    char *rp = realpath(f->fts_path, NULL);
+    if (rp == NULL)
+      continue; // ignore dangling symlink or such
+    string rps = string(rp);
+    free (rp);
+
+    bool ri = !regexec (&file_include_regex, rps.c_str(), 0, 0, 0);
+    bool rx = !regexec (&file_exclude_regex, rps.c_str(), 0, 0, 0);
+    if (!ri || rx)
+      {
+        if (verbose > 3)
+          obatched(clog) << "fts skipped by regex " << (!ri ? "I" : "") << (rx ? "X" : "") << endl;
+        fts_regex ++;
+        continue;
+      }
+
+    switch (f->fts_info)
+      {
+      case FTS_F:
+        scanq.push_back (make_pair(rps, *f->fts_statp));
+        break;
+
+      case FTS_ERR:
+      case FTS_NS:
+        // report on some types of errors because they may reflect fixable misconfiguration
+        {
+          auto x = libc_exception(f->fts_errno, string("fts traversal ") + string(f->fts_path));
+          x.report(cerr);
+        }
+        break;
+
+      default:
+        ;
+        /* ignore */
+      }
+  }
   gettimeofday (&tv_end, NULL);
   double deltas = (tv_end.tv_sec - tv_start.tv_sec) + (tv_end.tv_usec - tv_start.tv_usec)*0.000001;
 
-  obatched(clog) << "fts/archive traversed " << dir << " in " << deltas << "s, scanned=" << fts_scanned
-                 << ", regex-skipped=" << fts_regex
-                 << ", archive=" << fts_archive << ", cached=" << fts_cached << ", debuginfo=" << fts_debuginfo
-                 << ", executable=" << fts_executable
-                 << ", sourcerefs=" << fts_sref << ", sourcedefs=" << fts_sdef << endl;
+  obatched(clog) << "fts traversed source paths in " << deltas << "s, scanned=" << fts_scanned
+                 << ", regex-skipped=" << fts_regex << endl;
 }
 
 
-
 static void*
-thread_main_scan_source_archive_path (void* arg)
+thread_main_fts_source_paths (void* arg)
 {
-  string dir = string((const char*) arg);
+  (void) arg; // ignore; we operate on global data
 
   unsigned rescan_timer = 0;
   sig_atomic_t forced_rescan_count = 0;
-  set_metric("thread_timer_max", "archive", dir, rescan_s);
-  set_metric("thread_tid", "archive", dir, tid());
+  set_metric("thread_timer_max", "role","traverse", rescan_s);
+  set_metric("thread_tid", "role","traverse", tid());
+  add_metric("thread_count", "role", "traverse", 1);
   while (! interrupted)
     {
-      set_metric("thread_timer", "archive", dir, rescan_timer);
-      set_metric("thread_forced_total", "archive", dir, forced_rescan_count);
+      set_metric("thread_timer", "role","traverse", rescan_timer);
+      // set_metric("thread_forced_total", "role","traverse", forced_rescan_count);
       if (rescan_s && rescan_timer > rescan_s)
         rescan_timer = 0;
       if (sigusr1 != forced_rescan_count)
@@ -2287,23 +2247,27 @@ thread_main_scan_source_archive_path (void* arg)
       if (rescan_timer == 0)
         try
           {
-            set_metric("thread_working", "archive", dir, time(NULL));
-            inc_metric("thread_work_total", "archive", dir);
-            scan_source_archive_path (dir);
-            set_metric("thread_working", "archive", dir, 0);
+            set_metric("thread_busy", "role","traverse", 1);
+            inc_metric("thread_work_total", "role","traverse");
+            scan_source_paths();
+            set_metric("thread_busy", "role","traverse", 0);
           }
-        catch (const sqlite_exception& e)
+        catch (const reportable_exception& e)
           {
-            obatched(cerr) << e.message << endl;
+            e.report(cerr);
           }
       sleep (1);
       rescan_timer ++;
     }
 
+  // wake up any blocked scanning threads so they can check $interrupted and kill themselves
+  scanq.nuke();
+
   return 0;
 }
 
 
+
 ////////////////////////////////////////////////////////////////////////
 
 static void
@@ -2406,10 +2370,11 @@ thread_main_groom (void* /*arg*/)
   sig_atomic_t forced_groom_count = 0;
   set_metric("thread_timer_max", "role", "groom", groom_s);
   set_metric("thread_tid", "role", "groom", tid());
+  add_metric("thread_count", "role", "groom", 1);
   while (! interrupted)
     {
       set_metric("thread_timer", "role", "groom", groom_timer);
-      set_metric("thread_forced_total", "role", "groom", forced_groom_count);      
+      // set_metric("thread_forced_total", "role", "groom", forced_groom_count);
       if (groom_s && groom_timer > groom_s)
         groom_timer = 0;
       if (sigusr2 != forced_groom_count)
@@ -2420,10 +2385,10 @@ thread_main_groom (void* /*arg*/)
       if (groom_timer == 0)
         try
           {
-            set_metric("thread_working", "role", "groom", time(NULL));
+            set_metric("thread_busy", "role", "groom", 1);
             inc_metric("thread_work_total", "role", "groom");
             groom ();
-            set_metric("thread_working", "role", "groom", 0);
+            set_metric("thread_busy", "role", "groom", 0);
           }
         catch (const sqlite_exception& e)
           {
@@ -2533,9 +2498,6 @@ main (int argc, char *argv[])
   (void) signal (SIGUSR1, sigusr1_handler); // end-user
   (void) signal (SIGUSR2, sigusr2_handler); // end-user
 
-  // do this before any threads start
-  scan_concurrency_sem = new semaphore(concurrency);
-
   /* Get database ready. */
   rc = sqlite3_open_v2 (db_path.c_str(), &db, (SQLITE_OPEN_READWRITE
                                                |SQLITE_OPEN_CREATE
@@ -2666,26 +2628,21 @@ main (int argc, char *argv[])
   if (rc < 0)
     error (0, 0, "warning: cannot spawn thread (%d) to groom database\n", rc);
 
-  if (scan_files) for (auto&& it : source_paths)
+  if (scan_files || scan_archives.size() > 0)
     {
       pthread_t pt;
-      rc = pthread_create (& pt, NULL, thread_main_scan_source_file_path, (void*) it.c_str());
+      pthread_create (& pt, NULL, thread_main_fts_source_paths, NULL);
       if (rc < 0)
-        error (0, 0, "warning: cannot spawn thread (%d) to scan files %s\n", rc, it.c_str());
-      else
+        error (0, 0, "warning: cannot spawn thread (%d) to traverse source paths\n", rc);
       scanner_threads.push_back(pt);
-    }
-
-  if (scan_archives.size() > 0)
-    for (auto&& it : source_paths)
+      for (unsigned i=0; i<concurrency; i++)
         {
-        pthread_t pt;
-        rc = pthread_create (& pt, NULL, thread_main_scan_source_archive_path, (void*) it.c_str());
+          pthread_create (& pt, NULL, thread_main_scanner, NULL);
           if (rc < 0)
-          error (0, 0, "warning: cannot spawn thread (%d) to scan archives %s\n", rc, it.c_str());
-        else
+            error (0, 0, "warning: cannot spawn thread (%d) to scan source files / archives\n", rc);
           scanner_threads.push_back(pt);
         }
+    }
 
   /* Trivial main loop! */
   set_metric("ready", 1);
@@ -2706,7 +2663,6 @@ main (int argc, char *argv[])
   if (d6) MHD_stop_daemon (d6);
 
   /* With all threads known dead, we can clean up the global resources. */
-  delete scan_concurrency_sem;
   rc = sqlite3_exec (db, DEBUGINFOD_SQLITE_CLEANUP_DDL, NULL, NULL, NULL);
   if (rc != SQLITE_OK)
     {
diff --git a/doc/ChangeLog b/doc/ChangeLog
index 1422766d07d3..24f62af94fcd 100644
--- a/doc/ChangeLog
+++ b/doc/ChangeLog
@@ -1,3 +1,8 @@
+2019-12-31  Frank Ch. Eigler  <fche@redhat.com>
+
+	* debuginfod.8: Rework sections dealing with traversal/scanning,
+	explaining new threading model.
+
 2019-12-22  Frank Ch. Eigler  <fche@redhat.com
 
 	* debuginfod.8: Add -U (DEB) flag, generalize RPM to "archive".
diff --git a/doc/debuginfod.8 b/doc/debuginfod.8
index 342f524c7921..d2917285beb9 100644
--- a/doc/debuginfod.8
+++ b/doc/debuginfod.8
@@ -34,17 +34,23 @@ debuginfod servers, it queries them for the same information, just as
 \fBdebuginfod-find\fP would.  If successful, it locally caches then
 relays the file content to the original requester.
 
-If the \fB\-F\fP option is given, each listed PATH creates a thread to
-scan for matching ELF/DWARF/source files under the given physical
-directory.  Source files are matched with DWARF files based on the
-AT_comp_dir (compilation directory) attributes inside it.  Duplicate
-directories are ignored.  You may use a file name for a PATH, but
-source code indexing may be incomplete; prefer using a directory that
-contains the binaries.  Caution: source files listed in the DWARF may
-be a path \fIanywhere\fP in the file system, and debuginfod will
-readily serve their content on demand.  (Imagine a doctored DWARF file
-that lists \fI/etc/passwd\fP as a source file.)  If this is a concern,
-audit your binaries with tools such as:
+Indexing the given PATHs proceeds using multiple threads.  One thread
+periodically traverses all the given PATHs logically or physically
+(see the \fB\-L\fP option).  Duplicate PATHs are ignored.  You may use
+a file name for a PATH, but source code indexing may be incomplete;
+prefer using a directory that contains the binaries.  The traversal
+thread enumerates all matching files (see the \fB\-I\fP and \fB\-X\fP
+options) into a work queue.  A collection of scanner threads (see the
+\fB\-c\fP option) wait at the work queue to analyze files in parallel.
+
+If the \fB\-F\fP option is given, each file is scanned as an ELF/DWARF
+file.  Source files are matched with DWARF files based on the
+AT_comp_dir (compilation directory) attributes inside it.  Caution:
+source files listed in the DWARF may be a path \fIanywhere\fP in the
+file system, and debuginfod will readily serve their content on
+demand.  (Imagine a doctored DWARF file that lists \fI/etc/passwd\fP
+as a source file.)  If this is a concern, audit your binaries with
+tools such as:
 
 .SAMPLE
 % eu-readelf -wline BINARY | sed -n '/^Directory.table/,/^File.name.table/p'
@@ -55,42 +61,35 @@ or
 ^C
 .ESAMPLE
 
-If the \fB\-R\fP and/or \fB-U\fP option is given, each listed PATH
-creates a thread to scan for ELF/DWARF/source files contained in
-archive files.  If \-R is given, the will scan RPMs; and/or if \-U is
-given, they will scan DEB / DDEB files.  (The terms RPM and DEB and
-DDEB are used synonymously as "archives" in diagnostic messages.)
-Duplicate directories are ignored.  You may use a file name for a
-PATH, but source code indexing may be incomplete.  Instead, use a
-directory that contains normal RPMs alongside debuginfo/debugsource
-RPMs.  Because of complications such as DWZ-compressed debuginfo, may
-require \fItwo\fP scan passes to identify all source code.  Source
-files for RPMs are only served from other RPMs, so the caution for \-F
-does not apply.  Note that due to Debian/Ubuntu packaging policies &
-mechanisms, debuginfod cannot resolve source files for DEB/DDEB at
-all.
-
-If no PATH is listed, or neither \-F nor \-R nor \-U option is given, then
-\fBdebuginfod\fP will simply serve content that it scanned into its
-index in previous runs: the data is cumulative.
-
-File names must match extended regular expressions given by the \-I
-option and not the \-X option (if any) in order to be considered.
+If the \fB\-R\fP and/or \fB-U\fP option is given, each file is scanned
+as an archive file that may contain ELF/DWARF/source files.  If \-R is
+given, the will scan RPMs; and/or if \-U is given, they will scan DEB
+/ DDEB files.  (The terms RPM and DEB and DDEB are used synonymously
+as "archives" in diagnostic messages.)  Because of complications such
+as DWZ-compressed debuginfo, may require \fItwo\fP traversal passes to
+identify all source code.  Source files for RPMs are only served from
+other RPMs, so the caution for \-F does not apply.  Note that due to
+Debian/Ubuntu packaging policies & mechanisms, debuginfod cannot
+resolve source files for DEB/DDEB at all.
+
+If no PATH is listed, or neither \fB\-F\fP nor \fB\-R\fP nor \fB\-U\fP
+option is given, then \fBdebuginfod\fP will simply serve content that
+it accumulated into its index in all previous runs.
 
 
 .SH OPTIONS
 
 .TP
 .B "\-F"
-Activate ELF/DWARF file scanning threads.  The default is off.
+Activate ELF/DWARF file scanning.  The default is off.
 
 .TP
 .B "\-R"
-Activate RPM patterns in archive scanning threads.  The default is off.
+Activate RPM patterns in archive scanning.  The default is off.
 
 .TP
 .B "\-U"
-Activate DEB/DDEB patterns in archive scanning threads.  The default is off.
+Activate DEB/DDEB patterns in archive scanning.  The default is off.
 
 .TP
 .B "\-d FILE" "\-\-database=FILE"
@@ -100,7 +99,7 @@ data.  It will contain absolute file path names, so it may not be
 portable across machines.  It may be frequently read/written, so it
 should be on a fast filesytem.  It should not be shared across
 machines or users, to maximize sqlite locking performance.  The
-default database file is $HOME/.debuginfod.sqlite.
+default database file is \%$HOME/.debuginfod.sqlite.
 
 .TP
 .B "\-D SQL" "\-\-ddl=SQL"
@@ -129,7 +128,7 @@ inclusion or exclusion filtering: they are all processed.)
 .TP
 .B "\-t SECONDS"  "\-\-rescan\-time=SECONDS"
 Set the rescan time for the file and archive directories.  This is the
-amount of time the scanning threads will wait after finishing a scan,
+amount of time the traversal thread will wait after finishing a scan,
 before doing it again.  A rescan for unchanged files is fast (because
 the index also stores the file mtimes).  A time of zero is acceptable,
 and means that only one initial scan should performed.  The default
@@ -161,11 +160,11 @@ do maximal-grooming.  See also the \fIDATA MANAGEMENT\fP section.
 
 .TP
 .B "\-c NUM"  "\-\-concurrency=NUM"
-Set the concurrency limit for all the scanning threads.  While many
-threads may be spawned to cover all the given PATHs, only NUM may
-concurrently do CPU-intensive operations like parsing an ELF file
-or an archive.  The default is the number of processors on the system;
-the minimum is 1.
+Set the concurrency limit for the scanning queue threads, which work
+together to process archives & files located by the traversal thread.
+This important for controlling CPU-intensive operations like parsing
+an ELF file and especially decompressing archives.  The default is the
+number of processors on the system; the minimum is 1.
 
 .TP
 .B "\-L"
@@ -356,24 +355,29 @@ enabled.
 
 .SH "ENVIRONMENT VARIABLES"
 
-.TP 21
+.TP
+.B TMPDIR
+This environment variable points to a file system to be used for
+temporary files.  The default is /tmp.
+
+.TP
 .B DEBUGINFOD_URLS
 This environment variable contains a list of URL prefixes for trusted
 debuginfod instances.  Alternate URL prefixes are separated by space.
 Avoid referential loops that cause a server to contact itself, directly
 or indirectly - the results would be hilarious.
 
-.TP 21
+.TP
 .B DEBUGINFOD_TIMEOUT
 This environment variable governs the timeout for each debuginfod HTTP
 connection.  A server that fails to respond within this many seconds
 is skipped.  The default is 5.
 
-.TP 21
+.TP
 .B DEBUGINFOD_CACHE_PATH
 This environment variable governs the location of the cache where
 downloaded files are kept.  It is cleaned periodically as this
-program is reexecuted.  The default is $HOME/.debuginfod_client_cache.
+program is reexecuted.  The default is \%$HOME/.debuginfod_client_cache.
 .\" XXX describe cache eviction policy
 
 .SH FILES
diff --git a/tests/ChangeLog b/tests/ChangeLog
index 02a8f75fe0ef..b087e60aab4f 100644
--- a/tests/ChangeLog
+++ b/tests/ChangeLog
@@ -1,3 +1,7 @@
+2019-12-31  Frank Ch. Eigler  <fche@redhat.com>
+
+	* run-debuginfod-find.sh: Adjust to new work-queue metrics.
+
 2019-12-22  Frank Ch. Eigler  <fche@redhat.com>
 
 	* debuginfod-debs/*: New test files, based on
diff --git a/tests/run-debuginfod-find.sh b/tests/run-debuginfod-find.sh
index 90dafe00f50c..277e97199292 100755
--- a/tests/run-debuginfod-find.sh
+++ b/tests/run-debuginfod-find.sh
@@ -115,7 +115,9 @@ mv prog F
 mv prog.debug F
 kill -USR1 $PID1
 # Wait till both files are in the index.
-wait_ready $PORT1 'thread_work_total{file="F"}' 2
+wait_ready $PORT1 'thread_work_total{role="traverse"}' 2
+wait_ready $PORT1 'thread_work_pending{role="scan"}' 0
+wait_ready $PORT1 'thread_busy{role="scan"}' 0
 
 ########################################################################
 
@@ -150,7 +152,9 @@ BUILDID2=`env LD_LIBRARY_PATH=$ldpath ${abs_builddir}/../src/readelf \
 mv prog2 F
 kill -USR1 $PID1
 # Now there should be 3 files in the index
-wait_ready $PORT1 'thread_work_total{file="F"}' 3
+wait_ready $PORT1 'thread_work_total{role="traverse"}' 3
+wait_ready $PORT1 'thread_work_pending{role="scan"}' 0
+wait_ready $PORT1 'thread_busy{role="scan"}' 0
 
 # Rerun same tests for the prog2 binary
 filename=`testrun ${abs_top_builddir}/debuginfod/debuginfod-find -v debuginfo $BUILDID2 2>vlog`


Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]