[bitbake-devel,RFC] bitbake-worker: Switch to use asyncio

Message ID 20211211182524.1807371-1-JPEWhacker@gmail.com
State New
Headers show
Series [bitbake-devel,RFC] bitbake-worker: Switch to use asyncio | expand

Commit Message

Joshua Watt Dec. 11, 2021, 6:25 p.m. UTC
Switches bitbake-worker to use asyncio. This is a good canidate for
initial modernization using asyncio because it is self-contained will a
well defined interface to the bitbake server process.

Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
---
 bitbake/bin/bitbake-worker | 554 +++++++++++++++++++------------------
 1 file changed, 284 insertions(+), 270 deletions(-)

Comments

Peter Kjellerstedt Dec. 13, 2021, 5:29 p.m. UTC | #1
> -----Original Message-----
> From: bitbake-devel@lists.openembedded.org <bitbake-
> devel@lists.openembedded.org> On Behalf Of Joshua Watt
> Sent: den 11 december 2021 19:25
> To: bitbake-devel@lists.openembedded.org
> Cc: richard.purdie@linuxfoundation.org; ross@burtonini.com; kergoth@gmail.com; Joshua Watt <JPEWhacker@gmail.com>
> Subject: [bitbake-devel][RFC] bitbake-worker: Switch to use asyncio
> 
> Switches bitbake-worker to use asyncio. This is a good canidate for
> initial modernization using asyncio because it is self-contained will a

Change "will" to "with".

//Peter

> well defined interface to the bitbake server process.
> 
> Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
> ---
>  bitbake/bin/bitbake-worker | 554 +++++++++++++++++++------------------
>  1 file changed, 284 insertions(+), 270 deletions(-)
> 
> diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker
> index bf96207edc..d5cc4fa248 100755
> --- a/bitbake/bin/bitbake-worker
> +++ b/bitbake/bin/bitbake-worker
> @@ -11,16 +11,14 @@ sys.path.insert(0,
> os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), '
>  from bb import fetch2
>  import logging
>  import bb
> -import select
>  import errno
>  import signal
>  import pickle
>  import traceback
> -import queue
>  import shlex
>  import subprocess
>  from multiprocessing import Lock
> -from threading import Thread
> +import asyncio
> 
>  if sys.getfilesystemencoding() != "utf-8":
>      sys.exit("Please use a locale setting which supports UTF-8 (such as
> LANG=en_US.UTF-8).\nPython can't change the filesystem locale after
> loading so we need a UTF-8 when Python starts or things won't work.")
> @@ -53,14 +51,15 @@ except:
> 
>  logger = logging.getLogger("BitBake")
> 
> -worker_pipe = sys.stdout.fileno()
> -bb.utils.nonblockingfd(worker_pipe)
> -# Need to guard against multiprocessing being used in child processes
> -# and multiple processes trying to write to the parent at the same time
> -worker_pipe_lock = None
> 
> -handler = bb.event.LogHandler()
> -logger.addHandler(handler)
> +async def connect_stdout():
> +    loop = asyncio.get_event_loop()
> +    w_transport, w_protocol = await
> loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
> +    writer = asyncio.StreamWriter(w_transport, w_protocol, None, loop)
> +    return writer
> +
> +log_handler = bb.event.LogHandler()
> +logger.addHandler(log_handler)
> 
>  if 0:
>      # Code to write out a log file of all events passing through the
> worker
> @@ -71,73 +70,270 @@ if 0:
>      consolelog.setFormatter(conlogformat)
>      logger.addHandler(consolelog)
> 
> -worker_queue = queue.Queue()
> -
> -def worker_fire(event, d):
> -    data = b"<event>" + pickle.dumps(event) + b"</event>"
> -    worker_fire_prepickled(data)
> +async def read_messages(fd, handlers):
> +    buf = b""
> +    event = asyncio.Event()
> +    done = False
> 
> -def worker_fire_prepickled(event):
> -    global worker_queue
> +    def read_data():
> +        nonlocal buf
> +        nonlocal fd
> +        nonlocal event
> +        nonlocal done
> 
> -    worker_queue.put(event)
> +        try:
> +            data = os.read(fd, 102400)
> +        except (OSError, IOError) as e:
> +            if e.errno != errno.EAGAIN:
> +                raise
> +            return
> 
> -#
> -# We can end up with write contention with the cooker, it can be trying
> to send commands
> -# and we can be trying to send event data back. Therefore use a separate
> thread for writing
> -# back data to cooker.
> -#
> -worker_thread_exit = False
> +        if len(data) == 0:
> +            done = True
> +        else:
> +            buf += data
> +        event.set()
> 
> -def worker_flush(worker_queue):
> -    worker_queue_int = b""
> -    global worker_pipe, worker_thread_exit
> +    asyncio.get_event_loop().add_reader(fd, read_data)
> 
> -    while True:
> +    try:
> +        while not done:
> +            for name, handler in handlers.items():
> +                prefix = b"<" + name + b">"
> +                if buf.startswith(prefix):
> +                    suffix = b"</" + name + b">"
> +                    index = buf.find(suffix)
> +                    if index != -1:
> +                        try:
> +                            workerlog_write("%d: Handling %r\n" % (fd,
> name))
> +                            await handler(buf[len(prefix):index])
> +                        except pickle.UnpicklingError:
> +                            workerlog_write("Unable to unpickle data:
> %s\n" % ":".join("{:02x}".format(c) for c in buf))
> +                            raise
> +
> +                        buf = buf[index + len(suffix):]
> +                        break
> +                        # TODO: The old code would keep looking for an
> ending
> +                        # tag in a loop, so that a stream like
> +                        # <A>foo</A>bar</A> was valid. This doesn't
> appear to
> +                        # be necessary anymore?
> +                        #index = self.buf.find(b"</" + item + b">")
> +            else:
> +                # Nothing found in the buffer. Wait for more data
> +                await event.wait()
> +                event.clear()
> +    finally:
> +        asyncio.get_event_loop().remove_reader(fd)
> +
> +class ChildHandler(object):
> +    def __init__(self, writer, task, pid, pipeinfd, pipeoutfd):
> +        self.task = task
> +        self.writer = writer
> +        self.pid = pid
> +        self.pipeinfd = pipeinfd
> +        if pipeoutfd >= 0:
> +            os.close(pipeoutfd)
> +
> +        self.done_event = asyncio.Event()
> +        self.loop = asyncio.get_running_loop()
> +
> +        asyncio.get_child_watcher().add_child_handler(self.pid,
> self._child_watcher_callback)
> +
> +    def _child_watcher_callback(self, pid, status):
> +        # The callback may be called in a thread, so call_soon_threadsafe
> is
> +        # recommended to get back to the main loop.
> +        self.loop.call_soon_threadsafe(self.child_exited, pid, status)
> +
> +    async def main_loop(self):
> +        bb.utils.nonblockingfd(self.pipeinfd)
> +        await read_messages(self.pipeinfd, {
> +            b"event": self.handle_event,
> +        })
> +        os.close(self.pipeinfd)
> +        await self.done_event.wait()
> +
> +    async def handle_event(self, data):
> +        self.writer.write(b"<event>")
> +        self.writer.write(data)
> +        self.writer.write(b"</event>")
> +        await self.writer.drain()
> +
> +    def child_exited(self, pid, status):
>          try:
> -            worker_queue_int = worker_queue_int + worker_queue.get(True,
> 1)
> -        except queue.Empty:
> -            pass
> -        while (worker_queue_int or not worker_queue.empty()):
> +            if pid != self.pid:
> +                return
> +
> +            workerlog_write("Exit code of %d for pid %d (fd %d)\n" %
> (status, pid, self.pipeinfd))
> +
> +            asyncio.get_child_watcher().remove_child_handler(self.pid)
> +
> +            if os.WIFEXITED(status):
> +                status = os.WEXITSTATUS(status)
> +            elif os.WIFSIGNALED(status):
> +                # Per shell conventions for $?, when a process exits due
> to
> +                # a signal, we return an exit code of 128 + SIGNUM
> +                status = 128 + os.WTERMSIG(status)
> +
> +            self.writer.write(b"<exitcode>")
> +            self.writer.write(pickle.dumps((self.task, status)))
> +            self.writer.write(b"</exitcode>")
> +
> +            self.done_event.set()
> +        except Exception as e:
> +            workerlog_write("%s\n%s\n" % (traceback.format_exc(), e))
> +            raise e
> +
> +    def close(self):
> +        if not self.done_event.is_set():
>              try:
> -                (_, ready, _) = select.select([], [worker_pipe], [], 1)
> -                if not worker_queue.empty():
> -                    worker_queue_int = worker_queue_int +
> worker_queue.get()
> -                written = os.write(worker_pipe, worker_queue_int)
> -                worker_queue_int = worker_queue_int[written:]
> -            except (IOError, OSError) as e:
> -                if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
> -                    raise
> -        if worker_thread_exit and worker_queue.empty() and not
> worker_queue_int:
> -            return
> +                os.kill(-self.pid, signal.SIGTERM)
> +            except OSError:
> +                pass
> +
> +
> +class MainHandler(object):
> +    def __init__(self, writer):
> +        self.writer = writer
> +        self.cookercfg = None
> +        self.databuilder = None
> +        self.data = None
> +        self.extraconfigdata = None
> +        self.children = []
> +        self.child_tasks = []
> 
> -worker_thread = Thread(target=worker_flush, args=(worker_queue,))
> -worker_thread.start()
> +    async def main_loop(self):
> +        loop = asyncio.get_running_loop()
> +        fd = sys.stdin.fileno()
> +        bb.utils.nonblockingfd(fd)
> 
> -def worker_child_fire(event, d):
> -    global worker_pipe
> -    global worker_pipe_lock
> +        loop.add_signal_handler(signal.SIGTERM, self.signal_handler)
> +        loop.add_signal_handler(signal.SIGHUP, self.signal_handler)
> 
> -    data = b"<event>" + pickle.dumps(event) + b"</event>"
> -    try:
> -        worker_pipe_lock.acquire()
> -        while(len(data)):
> -            written = worker_pipe.write(data)
> -            data = data[written:]
> -        worker_pipe_lock.release()
> -    except IOError:
> -        sigterm_handler(None, None)
> -        raise
> +        try:
> +            await read_messages(fd, {
> +                b"cookerconfig": self.handle_cookercfg,
> +                b"extraconfigdata": self.handle_extraconfigdata,
> +                b"workerdata": self.handle_workerdata,
> +                b"newtaskhashes": self.handle_newtaskhashes,
> +                b"runtask": self.handle_runtask,
> +                b"finishnow": self.handle_finishnow,
> +                b"ping": self.handle_ping,
> +                b"quit": self.handle_quit,
> +                }
> +            )
> +        finally:
> +            loop.remove_signal_handler(signal.SIGTERM)
> +            loop.remove_signal_handler(signal.SIGHUP)
> +
> +    def signal_handler(self, signum, stackframe):
> +        loop = asyncio.get_running_loop()
> +
> +        if signum == signal.SIGTERM:
> +            bb.warn("Worker received SIGTERM, shutting down...")
> +        elif signum == signal.SIGHUP:
> +            bb.warn("Worker received SIGHUP, shutting down...")
> +
> +        self.handle_finishnow(None)
> +        loop.remove_signal_handler(signal.SIGTERM)
> +        os.kill(os.getpid(), signal.SIGTERM)
> +
> +    async def handle_cookercfg(self, data):
> +        self.cookercfg = pickle.loads(data)
> +        self.databuilder =
> bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
> +        self.databuilder.parseBaseConfiguration()
> +        self.data = self.databuilder.data
> +
> +    async def handle_extraconfigdata(self, data):
> +        self.extraconfigdata = pickle.loads(data)
> +
> +    async def handle_workerdata(self, data):
> +        self.workerdata = pickle.loads(data)
> +        bb.build.verboseShellLogging =
> self.workerdata["build_verbose_shell"]
> +        bb.build.verboseStdoutLogging =
> self.workerdata["build_verbose_stdout"]
> +        bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"]
> +        bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
> +        for mc in self.databuilder.mcdata:
> +            self.databuilder.mcdata[mc].setVar("PRSERV_HOST",
> self.workerdata["prhost"])
> +            self.databuilder.mcdata[mc].setVar("BB_HASHSERVE",
> self.workerdata["hashservaddr"])
> +
> +    async def handle_newtaskhashes(self, data):
> +        self.workerdata["newhashes"] = pickle.loads(data)
> +
> +    async def handle_ping(self, _):
> +        logger.warning("Pong from bitbake-worker!")
> +
> +    async def handle_quit(self, data):
> +        global normalexit
> +        normalexit = True
> +        sys.exit(0)
> +
> +    async def run_child(self, child):
> +        await child.main_loop()
> +        self.children.remove(child)
> +        self.child_tasks.remove(asyncio.current_task())
> +
> +    async def handle_runtask(self, data):
> +        fn, task, taskname, taskhash, unihash, quieterrors, appends,
> taskdepdata, dry_run_exec = pickle.loads(data)
> +        workerlog_write("Handling runtask %s %s %s\n" % (task, fn,
> taskname))
> +
> +        pid, pipeinfd, pipeoutfd = fork_off_task(self.cookercfg,
> self.data, self.databuilder, self.workerdata, fn, task, taskname,
> taskhash, unihash, appends, taskdepdata, self.extraconfigdata,
> quieterrors, dry_run_exec)
> +
> +        child = ChildHandler(self.writer, task, pid, pipeinfd, pipeoutfd)
> +        self.children.append(child)
> +
> +        t = asyncio.ensure_future(self.run_child(child))
> +        self.child_tasks.append(t)
> +
> +    async def handle_finishnow(self, _=None):
> +        for c in self.children:
> +            c.close()
> +
> +        workerlog_write("Waiting for %d child tasks: %s\n" %
> (len(self.child_tasks),
> +            " ".join(str(c.pid) for c in self.children)))
> +
> +        # Wait for all outstanding children to exit
> +        await asyncio.gather(*self.child_tasks)
> +
> +async def main():
> +    writer = await connect_stdout()
> +    worker_queue = []
> +
> +    def worker_fire(event, d):
> +        nonlocal worker_queue
> +
> +        async def flush_worker_queue():
> +            nonlocal writer
> +            nonlocal worker_queue
> +
> +            if worker_queue:
> +                for m in worker_queue:
> +                    writer.write(m)
> +                worker_queue = []
> +                await writer.drain()
> +
> +        # To ensure the messages are sent out in the order they are
> received,
> +        # put them in a list then schedule a task to write them out
> +        data = b"<event>" + pickle.dumps(event) + b"</event>"
> +        worker_queue.append(data)
> +        asyncio.ensure_future(flush_worker_queue())
> 
> -bb.event.worker_fire = worker_fire
> +    bb.event.worker_fire = worker_fire
> +
> +    handler = MainHandler(writer)
> +
> +    await asyncio.gather(handler.main_loop())
> +
> +normalexit = False
> 
>  lf = None
>  #lf = open("/tmp/workercommandlog", "w+")
>  def workerlog_write(msg):
> +    global lf
>      if lf:
>          lf.write(msg)
>          lf.flush()
> 
> +
>  def sigterm_handler(signum, frame):
>      signal.signal(signal.SIGTERM, signal.SIG_DFL)
>      os.killpg(0, signal.SIGTERM)
> @@ -191,9 +387,7 @@ def fork_off_task(cfg, data, databuilder, workerdata,
> fn, task, taskname, taskha
>      sys.stderr.flush()
> 
>      try:
> -        pipein, pipeout = os.pipe()
> -        pipein = os.fdopen(pipein, 'rb', 4096)
> -        pipeout = os.fdopen(pipeout, 'wb', 0)
> +        pipeinfd, pipeoutfd = os.pipe()
>          pid = os.fork()
>      except OSError as e:
>          logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
> @@ -201,18 +395,30 @@ def fork_off_task(cfg, data, databuilder,
> workerdata, fn, task, taskname, taskha
> 
>      if pid == 0:
>          def child():
> -            global worker_pipe
> -            global worker_pipe_lock
> -            pipein.close()
> +            os.close(pipeinfd)
> 
>              bb.utils.signal_on_parent_exit("SIGTERM")
> 
> +            pipeout = os.fdopen(pipeoutfd, 'wb', 0)
> +            pipelock = Lock()
> +            def worker_child_fire(event, d):
> +                nonlocal pipeout
> +                nonlocal pipelock
> +
> +                data = b"<event>" + pickle.dumps(event) + b"</event>"
> +                try:
> +                    with pipelock:
> +                        while(len(data)):
> +                            written = pipeout.write(data)
> +                            data = data[written:]
> +                except IOError:
> +                    sigterm_handler(None, None)
> +                    raise
> +
>              # Save out the PID so that the event can include it the
>              # events
>              bb.event.worker_pid = os.getpid()
>              bb.event.worker_fire = worker_child_fire
> -            worker_pipe = pipeout
> -            worker_pipe_lock = Lock()
> 
>              # Make the child the process group leader and ensure no
>              # child process will be controlled by the current terminal
> @@ -315,225 +521,33 @@ def fork_off_task(cfg, data, databuilder,
> workerdata, fn, task, taskname, taskha
>              else:
>                  os.environ[key] = value
> 
> -    return pid, pipein, pipeout
> -
> -class runQueueWorkerPipe():
> -    """
> -    Abstraction for a pipe between a worker thread and the worker server
> -    """
> -    def __init__(self, pipein, pipeout):
> -        self.input = pipein
> -        if pipeout:
> -            pipeout.close()
> -        bb.utils.nonblockingfd(self.input)
> -        self.queue = b""
> -
> -    def read(self):
> -        start = len(self.queue)
> -        try:
> -            self.queue = self.queue + (self.input.read(102400) or b"")
> -        except (OSError, IOError) as e:
> -            if e.errno != errno.EAGAIN:
> -                raise
> -
> -        end = len(self.queue)
> -        index = self.queue.find(b"</event>")
> -        while index != -1:
> -            msg = self.queue[:index+8]
> -            assert msg.startswith(b"<event>") and msg.count(b"<event>")
> == 1
> -            worker_fire_prepickled(msg)
> -            self.queue = self.queue[index+8:]
> -            index = self.queue.find(b"</event>")
> -        return (end > start)
> -
> -    def close(self):
> -        while self.read():
> -            continue
> -        if len(self.queue) > 0:
> -            print("Warning, worker child left partial message: %s" %
> self.queue)
> -        self.input.close()
> -
> -normalexit = False
> +    return pid, pipeinfd, pipeoutfd
> 
> -class BitbakeWorker(object):
> -    def __init__(self, din):
> -        self.input = din
> -        bb.utils.nonblockingfd(self.input)
> -        self.queue = b""
> -        self.cookercfg = None
> -        self.databuilder = None
> -        self.data = None
> -        self.extraconfigdata = None
> -        self.build_pids = {}
> -        self.build_pipes = {}
> -
> -        signal.signal(signal.SIGTERM, self.sigterm_exception)
> -        # Let SIGHUP exit as SIGTERM
> -        signal.signal(signal.SIGHUP, self.sigterm_exception)
> -        if "beef" in sys.argv[1]:
> -            bb.utils.set_process_name("Worker (Fakeroot)")
> -        else:
> -            bb.utils.set_process_name("Worker")
> -
> -    def sigterm_exception(self, signum, stackframe):
> -        if signum == signal.SIGTERM:
> -            bb.warn("Worker received SIGTERM, shutting down...")
> -        elif signum == signal.SIGHUP:
> -            bb.warn("Worker received SIGHUP, shutting down...")
> -        self.handle_finishnow(None)
> -        signal.signal(signal.SIGTERM, signal.SIG_DFL)
> -        os.kill(os.getpid(), signal.SIGTERM)
> -
> -    def serve(self):
> -        while True:
> -            (ready, _, _) = select.select([self.input] + [i.input for i
> in self.build_pipes.values()], [] , [], 1)
> -            if self.input in ready:
> -                try:
> -                    r = self.input.read()
> -                    if len(r) == 0:
> -                        # EOF on pipe, server must have terminated
> -                        self.sigterm_exception(signal.SIGTERM, None)
> -                    self.queue = self.queue + r
> -                except (OSError, IOError):
> -                    pass
> -            if len(self.queue):
> -                self.handle_item(b"cookerconfig", self.handle_cookercfg)
> -                self.handle_item(b"extraconfigdata",
> self.handle_extraconfigdata)
> -                self.handle_item(b"workerdata", self.handle_workerdata)
> -                self.handle_item(b"newtaskhashes",
> self.handle_newtaskhashes)
> -                self.handle_item(b"runtask", self.handle_runtask)
> -                self.handle_item(b"finishnow", self.handle_finishnow)
> -                self.handle_item(b"ping", self.handle_ping)
> -                self.handle_item(b"quit", self.handle_quit)
> -
> -            for pipe in self.build_pipes:
> -                if self.build_pipes[pipe].input in ready:
> -                    self.build_pipes[pipe].read()
> -            if len(self.build_pids):
> -                while self.process_waitpid():
> -                    continue
> -
> -
> -    def handle_item(self, item, func):
> -        if self.queue.startswith(b"<" + item + b">"):
> -            index = self.queue.find(b"</" + item + b">")
> -            while index != -1:
> -                try:
> -                    func(self.queue[(len(item) + 2):index])
> -                except pickle.UnpicklingError:
> -                    workerlog_write("Unable to unpickle data: %s\n" %
> ":".join("{:02x}".format(c) for c in self.queue))
> -                    raise
> -                self.queue = self.queue[(index + len(item) + 3):]
> -                index = self.queue.find(b"</" + item + b">")
> -
> -    def handle_cookercfg(self, data):
> -        self.cookercfg = pickle.loads(data)
> -        self.databuilder =
> bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
> -        self.databuilder.parseBaseConfiguration()
> -        self.data = self.databuilder.data
> -
> -    def handle_extraconfigdata(self, data):
> -        self.extraconfigdata = pickle.loads(data)
> -
> -    def handle_workerdata(self, data):
> -        self.workerdata = pickle.loads(data)
> -        bb.build.verboseShellLogging =
> self.workerdata["build_verbose_shell"]
> -        bb.build.verboseStdoutLogging =
> self.workerdata["build_verbose_stdout"]
> -        bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"]
> -        bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
> -        for mc in self.databuilder.mcdata:
> -            self.databuilder.mcdata[mc].setVar("PRSERV_HOST",
> self.workerdata["prhost"])
> -            self.databuilder.mcdata[mc].setVar("BB_HASHSERVE",
> self.workerdata["hashservaddr"])
> -
> -    def handle_newtaskhashes(self, data):
> -        self.workerdata["newhashes"] = pickle.loads(data)
> -
> -    def handle_ping(self, _):
> -        workerlog_write("Handling ping\n")
> -
> -        logger.warning("Pong from bitbake-worker!")
> -
> -    def handle_quit(self, data):
> -        workerlog_write("Handling quit\n")
> -
> -        global normalexit
> -        normalexit = True
> -        sys.exit(0)
> -
> -    def handle_runtask(self, data):
> -        fn, task, taskname, taskhash, unihash, quieterrors, appends,
> taskdepdata, dry_run_exec = pickle.loads(data)
> -        workerlog_write("Handling runtask %s %s %s\n" % (task, fn,
> taskname))
> -
> -        pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data,
> self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash,
> appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec)
> -
> -        self.build_pids[pid] = task
> -        self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
> -
> -    def process_waitpid(self):
> -        """
> -        Return none is there are no processes awaiting result collection,
> otherwise
> -        collect the process exit codes and close the information pipe.
> -        """
> -        try:
> -            pid, status = os.waitpid(-1, os.WNOHANG)
> -            if pid == 0 or os.WIFSTOPPED(status):
> -                return False
> -        except OSError:
> -            return False
> -
> -        workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
> -
> -        if os.WIFEXITED(status):
> -            status = os.WEXITSTATUS(status)
> -        elif os.WIFSIGNALED(status):
> -            # Per shell conventions for $?, when a process exits due to
> -            # a signal, we return an exit code of 128 + SIGNUM
> -            status = 128 + os.WTERMSIG(status)
> -
> -        task = self.build_pids[pid]
> -        del self.build_pids[pid]
> -
> -        self.build_pipes[pid].close()
> -        del self.build_pipes[pid]
> -
> -        worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task,
> status)) + b"</exitcode>")
> -
> -        return True
> +try:
> +    if "beef" in sys.argv[1]:
> +        bb.utils.set_process_name("Worker (Fakeroot)")
> +    else:
> +        bb.utils.set_process_name("Worker")
> 
> -    def handle_finishnow(self, _):
> -        if self.build_pids:
> -            logger.info("Sending SIGTERM to remaining %s tasks",
> len(self.build_pids))
> -            for k, v in iter(self.build_pids.items()):
> -                try:
> -                    os.kill(-k, signal.SIGTERM)
> -                    os.waitpid(-1, 0)
> -                except:
> -                    pass
> -        for pipe in self.build_pipes:
> -            self.build_pipes[pipe].read()
> +    loop = asyncio.get_event_loop()
> 
> -try:
> -    worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
>      if not profiling:
> -        worker.serve()
> +        loop.run_until_complete(main())
>      else:
>          profname = "profile-worker.log"
>          prof = profile.Profile()
>          try:
> -            profile.Profile.runcall(prof, worker.serve)
> +            profile.Profile.runcall(prof, loop.run_until_complete,
> main())
>          finally:
>              prof.dump_stats(profname)
>              bb.utils.process_profilelog(profname)
> -except BaseException as e:
> +except Exception as e:
> +    workerlog_write("%s\n%s\n" % (traceback.format_exc(), e))
>      if not normalexit:
> -        import traceback
>          sys.stderr.write(traceback.format_exc())
>          sys.stderr.write(str(e))
> -finally:
> -    worker_thread_exit = True
> -    worker_thread.join()
> 
> -workerlog_write("exiting")
> +workerlog_write("exiting\n")
>  if not normalexit:
>      sys.exit(1)
>  sys.exit(0)
> --
> 2.33.0

Patch

diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker
index bf96207edc..d5cc4fa248 100755
--- a/bitbake/bin/bitbake-worker
+++ b/bitbake/bin/bitbake-worker
@@ -11,16 +11,14 @@  sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), '
 from bb import fetch2
 import logging
 import bb
-import select
 import errno
 import signal
 import pickle
 import traceback
-import queue
 import shlex
 import subprocess
 from multiprocessing import Lock
-from threading import Thread
+import asyncio
 
 if sys.getfilesystemencoding() != "utf-8":
     sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.")
@@ -53,14 +51,15 @@  except:
 
 logger = logging.getLogger("BitBake")
 
-worker_pipe = sys.stdout.fileno()
-bb.utils.nonblockingfd(worker_pipe)
-# Need to guard against multiprocessing being used in child processes
-# and multiple processes trying to write to the parent at the same time
-worker_pipe_lock = None
 
-handler = bb.event.LogHandler()
-logger.addHandler(handler)
+async def connect_stdout():
+    loop = asyncio.get_event_loop()
+    w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
+    writer = asyncio.StreamWriter(w_transport, w_protocol, None, loop)
+    return writer
+
+log_handler = bb.event.LogHandler()
+logger.addHandler(log_handler)
 
 if 0:
     # Code to write out a log file of all events passing through the worker
@@ -71,73 +70,270 @@  if 0:
     consolelog.setFormatter(conlogformat)
     logger.addHandler(consolelog)
 
-worker_queue = queue.Queue()
-
-def worker_fire(event, d):
-    data = b"<event>" + pickle.dumps(event) + b"</event>"
-    worker_fire_prepickled(data)
+async def read_messages(fd, handlers):
+    buf = b""
+    event = asyncio.Event()
+    done = False
 
-def worker_fire_prepickled(event):
-    global worker_queue
+    def read_data():
+        nonlocal buf
+        nonlocal fd
+        nonlocal event
+        nonlocal done
 
-    worker_queue.put(event)
+        try:
+            data = os.read(fd, 102400)
+        except (OSError, IOError) as e:
+            if e.errno != errno.EAGAIN:
+                raise
+            return
 
-#
-# We can end up with write contention with the cooker, it can be trying to send commands
-# and we can be trying to send event data back. Therefore use a separate thread for writing 
-# back data to cooker.
-#
-worker_thread_exit = False
+        if len(data) == 0:
+            done = True
+        else:
+            buf += data
+        event.set()
 
-def worker_flush(worker_queue):
-    worker_queue_int = b""
-    global worker_pipe, worker_thread_exit
+    asyncio.get_event_loop().add_reader(fd, read_data)
 
-    while True:
+    try:
+        while not done:
+            for name, handler in handlers.items():
+                prefix = b"<" + name + b">"
+                if buf.startswith(prefix):
+                    suffix = b"</" + name + b">"
+                    index = buf.find(suffix)
+                    if index != -1:
+                        try:
+                            workerlog_write("%d: Handling %r\n" % (fd, name))
+                            await handler(buf[len(prefix):index])
+                        except pickle.UnpicklingError:
+                            workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in buf))
+                            raise
+
+                        buf = buf[index + len(suffix):]
+                        break
+                        # TODO: The old code would keep looking for an ending
+                        # tag in a loop, so that a stream like
+                        # <A>foo</A>bar</A> was valid. This doesn't appear to
+                        # be necessary anymore?
+                        #index = self.buf.find(b"</" + item + b">")
+            else:
+                # Nothing found in the buffer. Wait for more data
+                await event.wait()
+                event.clear()
+    finally:
+        asyncio.get_event_loop().remove_reader(fd)
+
+class ChildHandler(object):
+    def __init__(self, writer, task, pid, pipeinfd, pipeoutfd):
+        self.task = task
+        self.writer = writer
+        self.pid = pid
+        self.pipeinfd = pipeinfd
+        if pipeoutfd >= 0:
+            os.close(pipeoutfd)
+
+        self.done_event = asyncio.Event()
+        self.loop = asyncio.get_running_loop()
+
+        asyncio.get_child_watcher().add_child_handler(self.pid, self._child_watcher_callback)
+
+    def _child_watcher_callback(self, pid, status):
+        # The callback may be called in a thread, so call_soon_threadsafe is
+        # recommended to get back to the main loop.
+        self.loop.call_soon_threadsafe(self.child_exited, pid, status)
+
+    async def main_loop(self):
+        bb.utils.nonblockingfd(self.pipeinfd)
+        await read_messages(self.pipeinfd, {
+            b"event": self.handle_event,
+        })
+        os.close(self.pipeinfd)
+        await self.done_event.wait()
+
+    async def handle_event(self, data):
+        self.writer.write(b"<event>")
+        self.writer.write(data)
+        self.writer.write(b"</event>")
+        await self.writer.drain()
+
+    def child_exited(self, pid, status):
         try:
-            worker_queue_int = worker_queue_int + worker_queue.get(True, 1)
-        except queue.Empty:
-            pass
-        while (worker_queue_int or not worker_queue.empty()):
+            if pid != self.pid:
+                return
+
+            workerlog_write("Exit code of %d for pid %d (fd %d)\n" % (status, pid, self.pipeinfd))
+
+            asyncio.get_child_watcher().remove_child_handler(self.pid)
+
+            if os.WIFEXITED(status):
+                status = os.WEXITSTATUS(status)
+            elif os.WIFSIGNALED(status):
+                # Per shell conventions for $?, when a process exits due to
+                # a signal, we return an exit code of 128 + SIGNUM
+                status = 128 + os.WTERMSIG(status)
+
+            self.writer.write(b"<exitcode>")
+            self.writer.write(pickle.dumps((self.task, status)))
+            self.writer.write(b"</exitcode>")
+
+            self.done_event.set()
+        except Exception as e:
+            workerlog_write("%s\n%s\n" % (traceback.format_exc(), e))
+            raise e
+
+    def close(self):
+        if not self.done_event.is_set():
             try:
-                (_, ready, _) = select.select([], [worker_pipe], [], 1)
-                if not worker_queue.empty():
-                    worker_queue_int = worker_queue_int + worker_queue.get()
-                written = os.write(worker_pipe, worker_queue_int)
-                worker_queue_int = worker_queue_int[written:]
-            except (IOError, OSError) as e:
-                if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
-                    raise
-        if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
-            return
+                os.kill(-self.pid, signal.SIGTERM)
+            except OSError:
+                pass
+
+
+class MainHandler(object):
+    def __init__(self, writer):
+        self.writer = writer
+        self.cookercfg = None
+        self.databuilder = None
+        self.data = None
+        self.extraconfigdata = None
+        self.children = []
+        self.child_tasks = []
 
-worker_thread = Thread(target=worker_flush, args=(worker_queue,))
-worker_thread.start()
+    async def main_loop(self):
+        loop = asyncio.get_running_loop()
+        fd = sys.stdin.fileno()
+        bb.utils.nonblockingfd(fd)
 
-def worker_child_fire(event, d):
-    global worker_pipe
-    global worker_pipe_lock
+        loop.add_signal_handler(signal.SIGTERM, self.signal_handler)
+        loop.add_signal_handler(signal.SIGHUP, self.signal_handler)
 
-    data = b"<event>" + pickle.dumps(event) + b"</event>"
-    try:
-        worker_pipe_lock.acquire()
-        while(len(data)):
-            written = worker_pipe.write(data)
-            data = data[written:]
-        worker_pipe_lock.release()
-    except IOError:
-        sigterm_handler(None, None)
-        raise
+        try:
+            await read_messages(fd, {
+                b"cookerconfig": self.handle_cookercfg,
+                b"extraconfigdata": self.handle_extraconfigdata,
+                b"workerdata": self.handle_workerdata,
+                b"newtaskhashes": self.handle_newtaskhashes,
+                b"runtask": self.handle_runtask,
+                b"finishnow": self.handle_finishnow,
+                b"ping": self.handle_ping,
+                b"quit": self.handle_quit,
+                }
+            )
+        finally:
+            loop.remove_signal_handler(signal.SIGTERM)
+            loop.remove_signal_handler(signal.SIGHUP)
+
+    def signal_handler(self, signum, stackframe):
+        loop = asyncio.get_running_loop()
+
+        if signum == signal.SIGTERM:
+            bb.warn("Worker received SIGTERM, shutting down...")
+        elif signum == signal.SIGHUP:
+            bb.warn("Worker received SIGHUP, shutting down...")
+
+        self.handle_finishnow(None)
+        loop.remove_signal_handler(signal.SIGTERM)
+        os.kill(os.getpid(), signal.SIGTERM)
+
+    async def handle_cookercfg(self, data):
+        self.cookercfg = pickle.loads(data)
+        self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
+        self.databuilder.parseBaseConfiguration()
+        self.data = self.databuilder.data
+
+    async def handle_extraconfigdata(self, data):
+        self.extraconfigdata = pickle.loads(data)
+
+    async def handle_workerdata(self, data):
+        self.workerdata = pickle.loads(data)
+        bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"]
+        bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"]
+        bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"]
+        bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
+        for mc in self.databuilder.mcdata:
+            self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
+            self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"])
+
+    async def handle_newtaskhashes(self, data):
+        self.workerdata["newhashes"] = pickle.loads(data)
+
+    async def handle_ping(self, _):
+        logger.warning("Pong from bitbake-worker!")
+
+    async def handle_quit(self, data):
+        global normalexit
+        normalexit = True
+        sys.exit(0)
+
+    async def run_child(self, child):
+        await child.main_loop()
+        self.children.remove(child)
+        self.child_tasks.remove(asyncio.current_task())
+
+    async def handle_runtask(self, data):
+        fn, task, taskname, taskhash, unihash, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data)
+        workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
+
+        pid, pipeinfd, pipeoutfd = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec)
+
+        child = ChildHandler(self.writer, task, pid, pipeinfd, pipeoutfd)
+        self.children.append(child)
+
+        t = asyncio.ensure_future(self.run_child(child))
+        self.child_tasks.append(t)
+
+    async def handle_finishnow(self, _=None):
+        for c in self.children:
+            c.close()
+
+        workerlog_write("Waiting for %d child tasks: %s\n" % (len(self.child_tasks),
+            " ".join(str(c.pid) for c in self.children)))
+
+        # Wait for all outstanding children to exit
+        await asyncio.gather(*self.child_tasks)
+
+async def main():
+    writer = await connect_stdout()
+    worker_queue = []
+
+    def worker_fire(event, d):
+        nonlocal worker_queue
+
+        async def flush_worker_queue():
+            nonlocal writer
+            nonlocal worker_queue
+
+            if worker_queue:
+                for m in worker_queue:
+                    writer.write(m)
+                worker_queue = []
+                await writer.drain()
+
+        # To ensure the messages are sent out in the order they are received,
+        # put them in a list then schedule a task to write them out
+        data = b"<event>" + pickle.dumps(event) + b"</event>"
+        worker_queue.append(data)
+        asyncio.ensure_future(flush_worker_queue())
 
-bb.event.worker_fire = worker_fire
+    bb.event.worker_fire = worker_fire
+
+    handler = MainHandler(writer)
+
+    await asyncio.gather(handler.main_loop())
+
+normalexit = False
 
 lf = None
 #lf = open("/tmp/workercommandlog", "w+")
 def workerlog_write(msg):
+    global lf
     if lf:
         lf.write(msg)
         lf.flush()
 
+
 def sigterm_handler(signum, frame):
     signal.signal(signal.SIGTERM, signal.SIG_DFL)
     os.killpg(0, signal.SIGTERM)
@@ -191,9 +387,7 @@  def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha
     sys.stderr.flush()
 
     try:
-        pipein, pipeout = os.pipe()
-        pipein = os.fdopen(pipein, 'rb', 4096)
-        pipeout = os.fdopen(pipeout, 'wb', 0)
+        pipeinfd, pipeoutfd = os.pipe()
         pid = os.fork()
     except OSError as e:
         logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
@@ -201,18 +395,30 @@  def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha
 
     if pid == 0:
         def child():
-            global worker_pipe
-            global worker_pipe_lock
-            pipein.close()
+            os.close(pipeinfd)
 
             bb.utils.signal_on_parent_exit("SIGTERM")
 
+            pipeout = os.fdopen(pipeoutfd, 'wb', 0)
+            pipelock = Lock()
+            def worker_child_fire(event, d):
+                nonlocal pipeout
+                nonlocal pipelock
+
+                data = b"<event>" + pickle.dumps(event) + b"</event>"
+                try:
+                    with pipelock:
+                        while(len(data)):
+                            written = pipeout.write(data)
+                            data = data[written:]
+                except IOError:
+                    sigterm_handler(None, None)
+                    raise
+
             # Save out the PID so that the event can include it the
             # events
             bb.event.worker_pid = os.getpid()
             bb.event.worker_fire = worker_child_fire
-            worker_pipe = pipeout
-            worker_pipe_lock = Lock()
 
             # Make the child the process group leader and ensure no
             # child process will be controlled by the current terminal
@@ -315,225 +521,33 @@  def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha
             else:
                 os.environ[key] = value
 
-    return pid, pipein, pipeout
-
-class runQueueWorkerPipe():
-    """
-    Abstraction for a pipe between a worker thread and the worker server
-    """
-    def __init__(self, pipein, pipeout):
-        self.input = pipein
-        if pipeout:
-            pipeout.close()
-        bb.utils.nonblockingfd(self.input)
-        self.queue = b""
-
-    def read(self):
-        start = len(self.queue)
-        try:
-            self.queue = self.queue + (self.input.read(102400) or b"")
-        except (OSError, IOError) as e:
-            if e.errno != errno.EAGAIN:
-                raise
-
-        end = len(self.queue)
-        index = self.queue.find(b"</event>")
-        while index != -1:
-            msg = self.queue[:index+8]
-            assert msg.startswith(b"<event>") and msg.count(b"<event>") == 1
-            worker_fire_prepickled(msg)
-            self.queue = self.queue[index+8:]
-            index = self.queue.find(b"</event>")
-        return (end > start)
-
-    def close(self):
-        while self.read():
-            continue
-        if len(self.queue) > 0:
-            print("Warning, worker child left partial message: %s" % self.queue)
-        self.input.close()
-
-normalexit = False
+    return pid, pipeinfd, pipeoutfd
 
-class BitbakeWorker(object):
-    def __init__(self, din):
-        self.input = din
-        bb.utils.nonblockingfd(self.input)
-        self.queue = b""
-        self.cookercfg = None
-        self.databuilder = None
-        self.data = None
-        self.extraconfigdata = None
-        self.build_pids = {}
-        self.build_pipes = {}
-    
-        signal.signal(signal.SIGTERM, self.sigterm_exception)
-        # Let SIGHUP exit as SIGTERM
-        signal.signal(signal.SIGHUP, self.sigterm_exception)
-        if "beef" in sys.argv[1]:
-            bb.utils.set_process_name("Worker (Fakeroot)")
-        else:
-            bb.utils.set_process_name("Worker")
-
-    def sigterm_exception(self, signum, stackframe):
-        if signum == signal.SIGTERM:
-            bb.warn("Worker received SIGTERM, shutting down...")
-        elif signum == signal.SIGHUP:
-            bb.warn("Worker received SIGHUP, shutting down...")
-        self.handle_finishnow(None)
-        signal.signal(signal.SIGTERM, signal.SIG_DFL)
-        os.kill(os.getpid(), signal.SIGTERM)
-
-    def serve(self):        
-        while True:
-            (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
-            if self.input in ready:
-                try:
-                    r = self.input.read()
-                    if len(r) == 0:
-                        # EOF on pipe, server must have terminated
-                        self.sigterm_exception(signal.SIGTERM, None)
-                    self.queue = self.queue + r
-                except (OSError, IOError):
-                    pass
-            if len(self.queue):
-                self.handle_item(b"cookerconfig", self.handle_cookercfg)
-                self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
-                self.handle_item(b"workerdata", self.handle_workerdata)
-                self.handle_item(b"newtaskhashes", self.handle_newtaskhashes)
-                self.handle_item(b"runtask", self.handle_runtask)
-                self.handle_item(b"finishnow", self.handle_finishnow)
-                self.handle_item(b"ping", self.handle_ping)
-                self.handle_item(b"quit", self.handle_quit)
-
-            for pipe in self.build_pipes:
-                if self.build_pipes[pipe].input in ready:
-                    self.build_pipes[pipe].read()
-            if len(self.build_pids):
-                while self.process_waitpid():
-                    continue
-
-
-    def handle_item(self, item, func):
-        if self.queue.startswith(b"<" + item + b">"):
-            index = self.queue.find(b"</" + item + b">")
-            while index != -1:
-                try:
-                    func(self.queue[(len(item) + 2):index])
-                except pickle.UnpicklingError:
-                    workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
-                    raise
-                self.queue = self.queue[(index + len(item) + 3):]
-                index = self.queue.find(b"</" + item + b">")
-
-    def handle_cookercfg(self, data):
-        self.cookercfg = pickle.loads(data)
-        self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
-        self.databuilder.parseBaseConfiguration()
-        self.data = self.databuilder.data
-
-    def handle_extraconfigdata(self, data):
-        self.extraconfigdata = pickle.loads(data)
-
-    def handle_workerdata(self, data):
-        self.workerdata = pickle.loads(data)
-        bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"]
-        bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"]
-        bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"]
-        bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
-        for mc in self.databuilder.mcdata:
-            self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
-            self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"])
-
-    def handle_newtaskhashes(self, data):
-        self.workerdata["newhashes"] = pickle.loads(data)
-
-    def handle_ping(self, _):
-        workerlog_write("Handling ping\n")
-
-        logger.warning("Pong from bitbake-worker!")
-
-    def handle_quit(self, data):
-        workerlog_write("Handling quit\n")
-
-        global normalexit
-        normalexit = True
-        sys.exit(0)
-
-    def handle_runtask(self, data):
-        fn, task, taskname, taskhash, unihash, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data)
-        workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
-
-        pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec)
-
-        self.build_pids[pid] = task
-        self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
-
-    def process_waitpid(self):
-        """
-        Return none is there are no processes awaiting result collection, otherwise
-        collect the process exit codes and close the information pipe.
-        """
-        try:
-            pid, status = os.waitpid(-1, os.WNOHANG)
-            if pid == 0 or os.WIFSTOPPED(status):
-                return False
-        except OSError:
-            return False
-
-        workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
-
-        if os.WIFEXITED(status):
-            status = os.WEXITSTATUS(status)
-        elif os.WIFSIGNALED(status):
-            # Per shell conventions for $?, when a process exits due to
-            # a signal, we return an exit code of 128 + SIGNUM
-            status = 128 + os.WTERMSIG(status)
-
-        task = self.build_pids[pid]
-        del self.build_pids[pid]
-
-        self.build_pipes[pid].close()
-        del self.build_pipes[pid]
-
-        worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
-
-        return True
+try:
+    if "beef" in sys.argv[1]:
+        bb.utils.set_process_name("Worker (Fakeroot)")
+    else:
+        bb.utils.set_process_name("Worker")
 
-    def handle_finishnow(self, _):
-        if self.build_pids:
-            logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
-            for k, v in iter(self.build_pids.items()):
-                try:
-                    os.kill(-k, signal.SIGTERM)
-                    os.waitpid(-1, 0)
-                except:
-                    pass
-        for pipe in self.build_pipes:
-            self.build_pipes[pipe].read()
+    loop = asyncio.get_event_loop()
 
-try:
-    worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
     if not profiling:
-        worker.serve()
+        loop.run_until_complete(main())
     else:
         profname = "profile-worker.log"
         prof = profile.Profile()
         try:
-            profile.Profile.runcall(prof, worker.serve)
+            profile.Profile.runcall(prof, loop.run_until_complete, main())
         finally:
             prof.dump_stats(profname)
             bb.utils.process_profilelog(profname)
-except BaseException as e:
+except Exception as e:
+    workerlog_write("%s\n%s\n" % (traceback.format_exc(), e))
     if not normalexit:
-        import traceback
         sys.stderr.write(traceback.format_exc())
         sys.stderr.write(str(e))
-finally:
-    worker_thread_exit = True
-    worker_thread.join()
 
-workerlog_write("exiting")
+workerlog_write("exiting\n")
 if not normalexit:
     sys.exit(1)
 sys.exit(0)