Patchwork lib/oe/package_manager: Add utils function for multiprocess execution

login
register
mail settings
Submitter Richard Purdie
Date Aug. 21, 2014, 8:46 p.m.
Message ID <1408653988.1669.116.camel@ted>
Download mbox | patch
Permalink /patch/78767/
State Accepted
Commit be1b198076cd8849ab6ecc16ad08556c5981f3d9
Headers show

Comments

Richard Purdie - Aug. 21, 2014, 8:46 p.m.
Our usage of multitprocessing is problematic. In particular, there is a bug
in python 2.7 multiprocessing where signals are not handled until command
completion instead of immediately.

This factors the multiprocess code into a function which is enhanced with
a workaround to ensure immediate signal handling and also better SIGINT
handling which should happen in the parent, not the children to ensure
clean exits. The workaround for the signals is being added to the core
bb.utils function so it can benefit all users.

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Chris Larson - Aug. 22, 2014, 12:57 a.m.
On Thu, Aug 21, 2014 at 1:46 PM, Richard Purdie <
richard.purdie@linuxfoundation.org> wrote:

> Our usage of multitprocessing is problematic. In particular, there is a bug
> in python 2.7 multiprocessing where signals are not handled until command
> completion instead of immediately.


This looks good, but the subject is misleading, in my opinion:
"lib/oe/package_manager:
Add utils function for multiprocess execution". It should probably either
mention both utils and package_manager, or be split into two commits.
Richard Purdie - Aug. 23, 2014, 11:04 a.m.
On Thu, 2014-08-21 at 17:57 -0700, Christopher Larson wrote:

> On Thu, Aug 21, 2014 at 1:46 PM, Richard Purdie
> <richard.purdie@linuxfoundation.org> wrote:
>         Our usage of multitprocessing is problematic. In particular,
>         there is a bug
>         in python 2.7 multiprocessing where signals are not handled
>         until command
>         completion instead of immediately.
>  
> This looks good, but the subject is misleading, in my opinion:
> "lib/oe/package_manager: Add utils function for multiprocess
> execution". It should probably either mention both utils and
> package_manager, or be split into two commits.

Good point, I tweaked before merging, thanks. It was the (bad) result of
rearranging a few commits.

Cheers,

Richard

Patch

diff --git a/meta/lib/oe/package_manager.py b/meta/lib/oe/package_manager.py
index 8be3d41..f8fc3c2 100644
--- a/meta/lib/oe/package_manager.py
+++ b/meta/lib/oe/package_manager.py
@@ -7,6 +7,7 @@  import multiprocessing
 import re
 import bb
 import tempfile
+import oe.utils
 
 
 # this can be used by all PM backends to create the index files in parallel
@@ -116,16 +117,7 @@  class RpmIndexer(Indexer):
             bb.note("There are no packages in %s" % self.deploy_dir)
             return
 
-        nproc = multiprocessing.cpu_count()
-        pool = bb.utils.multiprocessingpool(nproc)
-        results = list(pool.imap(create_index, index_cmds))
-        pool.close()
-        pool.join()
-
-        for result in results:
-            if result is not None:
-                return(result)
-
+        oe.utils.multiprocess_exec(index_cmds, create_index)
 
 class OpkgIndexer(Indexer):
     def write_index(self):
@@ -161,15 +153,7 @@  class OpkgIndexer(Indexer):
             bb.note("There are no packages in %s!" % self.deploy_dir)
             return
 
-        nproc = multiprocessing.cpu_count()
-        pool = bb.utils.multiprocessingpool(nproc)
-        results = list(pool.imap(create_index, index_cmds))
-        pool.close()
-        pool.join()
-
-        for result in results:
-            if result is not None:
-                return(result)
+        oe.utils.multiprocess_exec(index_cmds, create_index)
 
 
 class DpkgIndexer(Indexer):
@@ -210,15 +194,7 @@  class DpkgIndexer(Indexer):
             bb.note("There are no packages in %s" % self.deploy_dir)
             return
 
-        nproc = multiprocessing.cpu_count()
-        pool = bb.utils.multiprocessingpool(nproc)
-        results = list(pool.imap(create_index, index_cmds))
-        pool.close()
-        pool.join()
-
-        for result in results:
-            if result is not None:
-                return(result)
+        oe.utils.multiprocess_exec(index_cmds, create_index)
 
 
 class PkgsList(object):
diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py
index 0a1d108..92e21a4 100644
--- a/meta/lib/oe/utils.py
+++ b/meta/lib/oe/utils.py
@@ -151,3 +151,32 @@  def execute_pre_post_process(d, cmds):
         if cmd != '':
             bb.note("Executing %s ..." % cmd)
             bb.build.exec_func(cmd, d)
+
+def multiprocess_exec(commands, function):
+    import signal
+    import multiprocessing
+
+    if not commands:
+        return []
+
+    def init_worker():
+        signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+    nproc = min(multiprocessing.cpu_count(), len(commands))
+    pool = bb.utils.multiprocessingpool(nproc, init_worker)
+    imap = pool.imap(function, commands)
+
+    try:
+        results = list(imap)
+        pool.close()
+        pool.join()
+        results = []
+        for result in results:
+            if result is not None:
+                results.append(result)
+        return results
+
+    except KeyboardInterrupt:
+        pool.terminate()
+        pool.join()
+        raise