[5/6] cooker/process: Fix signal handling lockups

Message ID 20220329142755.1473185-5-richard.purdie@linuxfoundation.org
State Accepted, archived
Commit a40efaa5556a188dfe46c8d060adde37dc400dcd
Headers show
Series [1/6] cooker: Fix exception handling in parsers | expand

Commit Message

Richard Purdie March 29, 2022, 2:27 p.m. UTC
If a parser process is terminated while holding a write lock, then it
will lead to a deadlock (see
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process.terminate).

With SIGTERM, we don't want to terminate holding the lock. We also don't
want a SIGINT to cause a partial write to the event stream.

I tried using signal masks to avoid this but it doesn't work, see
https://bugs.python.org/issue47139

Instead, add a signal handler and catch the calls around the critical section.
We also need a thread lock to ensure other threads in the same process don't
handle the signal until all the threads are not in the lock.

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
---
 lib/bb/cooker.py         | 28 ++++++++++++++++++++++++++--
 lib/bb/server/process.py | 22 ++++++++++++++++++++--
 2 files changed, 46 insertions(+), 4 deletions(-)

Patch

diff --git a/lib/bb/cooker.py b/lib/bb/cooker.py
index d6fcd9e05c..7c0c5d4efa 100644
--- a/lib/bb/cooker.py
+++ b/lib/bb/cooker.py
@@ -2017,6 +2017,22 @@  class Parser(multiprocessing.Process):
         self.context = bb.utils.get_context().copy()
         self.handlers = bb.event.get_class_handlers().copy()
         self.profile = profile
+        self.queue_signals = False
+        self.signal_received = []
+        self.signal_threadlock = threading.Lock()
+
+    def catch_sig(self, signum, frame):
+        if self.queue_signals:
+            self.signal_received.append(signum)
+        else:
+            self.handle_sig(signum, frame)
+
+    def handle_sig(self, signum, frame):
+        if signum == signal.SIGTERM:
+            signal.signal(signal.SIGTERM, signal.SIG_DFL)
+            os.kill(os.getpid(), signal.SIGTERM)
+        elif signum == signal.SIGINT:
+            signal.default_int_handler(signum, frame)
 
     def run(self):
 
@@ -2036,9 +2052,17 @@  class Parser(multiprocessing.Process):
             prof.dump_stats(logfile)
 
     def realrun(self):
-        signal.signal(signal.SIGTERM, signal.SIG_DFL)
+        # Signal handling here is hard. We must not terminate any process or thread holding the write
+        # lock for the event stream as it will not be released, ever, and things will hang.
+        # Python handles signals in the main thread/process but they can be raised from any thread and
+        # we want to defer processing of any SIGTERM/SIGINT signal until we're outside the critical section
+        # and don't hold the lock (see server/process.py). We therefore always catch the signals (so any
+        # new thread should also do so) and we defer handling but we handle with the local thread lock
+        # held (a threading lock, not a multiprocessing one) so that no other thread in the process
+        # can be in the critical section.
+        signal.signal(signal.SIGTERM, self.catch_sig)
         signal.signal(signal.SIGHUP, signal.SIG_DFL)
-        signal.signal(signal.SIGINT, signal.SIG_IGN)
+        signal.signal(signal.SIGINT, self.catch_sig)
         bb.utils.set_process_name(multiprocessing.current_process().name)
         multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, exitpriority=1)
         multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, exitpriority=1)
diff --git a/lib/bb/server/process.py b/lib/bb/server/process.py
index 7c587a9110..ce53fdc678 100644
--- a/lib/bb/server/process.py
+++ b/lib/bb/server/process.py
@@ -20,6 +20,7 @@  import os
 import sys
 import time
 import select
+import signal
 import socket
 import subprocess
 import errno
@@ -737,11 +738,28 @@  class ConnectionWriter(object):
         # Why bb.event needs this I have no idea
         self.event = self
 
-    def send(self, obj):
-        obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
+    def _send(self, obj):
         with self.wlock:
             self.writer.send_bytes(obj)
 
+    def send(self, obj):
+        obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
+        # See notes/code in CookerParser
+        # We must not terminate holding this lock else processes will hang.
+        # For SIGTERM, raising afterwards avoids this.
+        # For SIGINT, we don't want to have written partial data to the pipe.
+        # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139
+        process = multiprocessing.current_process()
+        if process and hasattr(process, "queue_signals"):
+            with process.signal_threadlock:
+                process.queue_signals = True
+                self._send(obj)
+                process.queue_signals = False
+                for sig in process.signal_received.pop():
+                    process.handle_sig(sig, None)
+        else:
+            self._send(obj)
+
     def fileno(self):
         return self.writer.fileno()