diff mbox series

[2.0,kirkstone] bitbake-worker: add header with length of message

Message ID 20231110133324.3168418-1-ecordonnier@snap.com
State New
Headers show
Series [2.0,kirkstone] bitbake-worker: add header with length of message | expand

Commit Message

Etienne Cordonnier Nov. 10, 2023, 1:33 p.m. UTC
From: Etienne Cordonnier <ecordonnier@snap.com>

The IPC mechanism between runqueue.py and bitbake-worker is currently
not scalable:

The data is sent with the format <tag>pickled-data</tag>, and bitbake-worker
has no information about the size of the message. Therefore, the bitbake-worker
is calling select() and read() in a loop, and then calling "self.queue.find(b"</" + item + b">")"
for each chunk received.

This does not scale, because queue.find has a linear complexity relative to the size of the queue,
and workerdata messages get very big e.g. for builds which reference a lot of files in SRC_URI.
The number of chunks varies, but on my test system a lot of chunks of 65536 bytes are sent, and each
iteration takes 0.1 seconds, making the transfer of the "workerdata" data very slow (on my test setup
35 seconds before this fix, and 1.5 seconds after this fix).

This commit adds a 4 bytes header after <tag>, so that bitbake-worker knows how many bytes need to be
received, and does not need to constantly search the whole queue for </tag>.

Signed-off-by: Etienne Cordonnier <ecordonnier@snap.com>
Signed-off-by: Alexandre Belloni <alexandre.belloni@bootlin.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
---
 bin/bitbake-worker | 33 ++++++++++++++++++++++-----------
 lib/bb/runqueue.py | 34 ++++++++++++++++++++++------------
 2 files changed, 44 insertions(+), 23 deletions(-)

Comments

Steve Sakoman Nov. 10, 2023, 2:32 p.m. UTC | #1
On Fri, Nov 10, 2023 at 3:34 AM Etienne Cordonnier via
lists.openembedded.org <ecordonnier=snap.com@lists.openembedded.org>
wrote:
>
> From: Etienne Cordonnier <ecordonnier@snap.com>
>
> The IPC mechanism between runqueue.py and bitbake-worker is currently
> not scalable:
>
> The data is sent with the format <tag>pickled-data</tag>, and bitbake-worker
> has no information about the size of the message. Therefore, the bitbake-worker
> is calling select() and read() in a loop, and then calling "self.queue.find(b"</" + item + b">")"
> for each chunk received.
>
> This does not scale, because queue.find has a linear complexity relative to the size of the queue,
> and workerdata messages get very big e.g. for builds which reference a lot of files in SRC_URI.
> The number of chunks varies, but on my test system a lot of chunks of 65536 bytes are sent, and each
> iteration takes 0.1 seconds, making the transfer of the "workerdata" data very slow (on my test setup
> 35 seconds before this fix, and 1.5 seconds after this fix).
>
> This commit adds a 4 bytes header after <tag>, so that bitbake-worker knows how many bytes need to be
> received, and does not need to constantly search the whole queue for </tag>.

Sorry, this is an API change and therefore not allowed for backporting
to a stable release.

Steve

>
> Signed-off-by: Etienne Cordonnier <ecordonnier@snap.com>
> Signed-off-by: Alexandre Belloni <alexandre.belloni@bootlin.com>
> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
> ---
>  bin/bitbake-worker | 33 ++++++++++++++++++++++-----------
>  lib/bb/runqueue.py | 34 ++++++++++++++++++++++------------
>  2 files changed, 44 insertions(+), 23 deletions(-)
>
> diff --git a/bin/bitbake-worker b/bin/bitbake-worker
> index e02f2532..f8be973d 100755
> --- a/bin/bitbake-worker
> +++ b/bin/bitbake-worker
> @@ -425,18 +425,29 @@ class BitbakeWorker(object):
>                  while self.process_waitpid():
>                      continue
>
> -
>      def handle_item(self, item, func):
> -        if self.queue.startswith(b"<" + item + b">"):
> -            index = self.queue.find(b"</" + item + b">")
> -            while index != -1:
> -                try:
> -                    func(self.queue[(len(item) + 2):index])
> -                except pickle.UnpicklingError:
> -                    workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
> -                    raise
> -                self.queue = self.queue[(index + len(item) + 3):]
> -                index = self.queue.find(b"</" + item + b">")
> +        opening_tag = b"<" + item + b">"
> +        if not self.queue.startswith(opening_tag):
> +            return
> +
> +        tag_len = len(opening_tag)
> +        if len(self.queue) < tag_len + 4:
> +            # we need to receive more data
> +            return
> +        header = self.queue[tag_len:tag_len + 4]
> +        payload_len = int.from_bytes(header, 'big')
> +        # closing tag has length (tag_len + 1)
> +        if len(self.queue) < tag_len * 2 + 1 + payload_len:
> +            # we need to receive more data
> +            return
> +        index = self.queue.find(b"</" + item + b">")
> +        if index != -1:
> +            try:
> +                func(self.queue[(len(item) + 2 + 4):index])
> +            except pickle.UnpicklingError:
> +                workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
> +                raise
> +            self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):]
>
>      def handle_cookercfg(self, data):
>          self.cookercfg = pickle.loads(data)
> diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py
> index 8605c46c..4e48c944 100644
> --- a/lib/bb/runqueue.py
> +++ b/lib/bb/runqueue.py
> @@ -1289,6 +1289,16 @@ class RunQueue:
>          self.worker = {}
>          self.fakeworker = {}
>
> +    @staticmethod
> +    def send_pickled_data(worker, data, name):
> +        msg = bytearray()
> +        pickled_data = pickle.dumps(data)
> +        msg.extend(b"<" + name.encode() + b">")
> +        msg.extend(len(pickled_data).to_bytes(4, 'big'))
> +        msg.extend(pickled_data)
> +        msg.extend(b"</" + name.encode() + b">")
> +        worker.stdin.write(msg)
> +
>      def _start_worker(self, mc, fakeroot = False, rqexec = None):
>          logger.debug("Starting bitbake-worker")
>          magic = "decafbad"
> @@ -1328,9 +1338,9 @@ class RunQueue:
>              "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"),
>          }
>
> -        worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>")
> -        worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>")
> -        worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>")
> +        RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig")
> +        RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata")
> +        RunQueue.send_pickled_data(worker, workerdata, "workerdata")
>          worker.stdin.flush()
>
>          return RunQueueWorker(worker, workerpipe)
> @@ -1340,7 +1350,7 @@ class RunQueue:
>              return
>          logger.debug("Teardown for bitbake-worker")
>          try:
> -           worker.process.stdin.write(b"<quit></quit>")
> +           RunQueue.send_pickled_data(worker.process, b"", "quit")
>             worker.process.stdin.flush()
>             worker.process.stdin.close()
>          except IOError:
> @@ -1867,14 +1877,14 @@ class RunQueueExecute:
>      def finish_now(self):
>          for mc in self.rq.worker:
>              try:
> -                self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>")
> +                RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow")
>                  self.rq.worker[mc].process.stdin.flush()
>              except IOError:
>                  # worker must have died?
>                  pass
>          for mc in self.rq.fakeworker:
>              try:
> -                self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>")
> +                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow")
>                  self.rq.fakeworker[mc].process.stdin.flush()
>              except IOError:
>                  # worker must have died?
> @@ -2156,10 +2166,10 @@ class RunQueueExecute:
>              if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
>                  if not mc in self.rq.fakeworker:
>                      self.rq.start_fakeworker(self, mc)
> -                self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>")
> +                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, (taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False), "runtask")
>                  self.rq.fakeworker[mc].process.stdin.flush()
>              else:
> -                self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>")
> +                RunQueue.send_pickled_data(self.rq.worker[mc].process, (taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False), "runtask")
>                  self.rq.worker[mc].process.stdin.flush()
>
>              self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True)
> @@ -2243,10 +2253,10 @@ class RunQueueExecute:
>                          self.rq.state = runQueueFailed
>                          self.stats.taskFailed()
>                          return True
> -                self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>")
> +                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, (taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce), "runtask")
>                  self.rq.fakeworker[mc].process.stdin.flush()
>              else:
> -                self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>")
> +                RunQueue.send_pickled_data(self.rq.worker[mc].process, (taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce), "runtask")
>                  self.rq.worker[mc].process.stdin.flush()
>
>              self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True)
> @@ -2448,9 +2458,9 @@ class RunQueueExecute:
>
>          if changed:
>              for mc in self.rq.worker:
> -                self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
> +                RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
>              for mc in self.rq.fakeworker:
> -                self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
> +                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
>
>              hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed)))
>
> --
> 2.36.1.vfs.0.0
>
>
> -=-=-=-=-=-=-=-=-=-=-=-
> Links: You receive all messages sent to this group.
> View/Reply Online (#15492): https://lists.openembedded.org/g/bitbake-devel/message/15492
> Mute This Topic: https://lists.openembedded.org/mt/102506319/3620601
> Group Owner: bitbake-devel+owner@lists.openembedded.org
> Unsubscribe: https://lists.openembedded.org/g/bitbake-devel/unsub [steve@sakoman.com]
> -=-=-=-=-=-=-=-=-=-=-=-
>
diff mbox series

Patch

diff --git a/bin/bitbake-worker b/bin/bitbake-worker
index e02f2532..f8be973d 100755
--- a/bin/bitbake-worker
+++ b/bin/bitbake-worker
@@ -425,18 +425,29 @@  class BitbakeWorker(object):
                 while self.process_waitpid():
                     continue
 
-
     def handle_item(self, item, func):
-        if self.queue.startswith(b"<" + item + b">"):
-            index = self.queue.find(b"</" + item + b">")
-            while index != -1:
-                try:
-                    func(self.queue[(len(item) + 2):index])
-                except pickle.UnpicklingError:
-                    workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
-                    raise
-                self.queue = self.queue[(index + len(item) + 3):]
-                index = self.queue.find(b"</" + item + b">")
+        opening_tag = b"<" + item + b">"
+        if not self.queue.startswith(opening_tag):
+            return
+
+        tag_len = len(opening_tag)
+        if len(self.queue) < tag_len + 4:
+            # we need to receive more data
+            return
+        header = self.queue[tag_len:tag_len + 4]
+        payload_len = int.from_bytes(header, 'big')
+        # closing tag has length (tag_len + 1)
+        if len(self.queue) < tag_len * 2 + 1 + payload_len:
+            # we need to receive more data
+            return
+        index = self.queue.find(b"</" + item + b">")
+        if index != -1:
+            try:
+                func(self.queue[(len(item) + 2 + 4):index])
+            except pickle.UnpicklingError:
+                workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
+                raise
+            self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):]
 
     def handle_cookercfg(self, data):
         self.cookercfg = pickle.loads(data)
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py
index 8605c46c..4e48c944 100644
--- a/lib/bb/runqueue.py
+++ b/lib/bb/runqueue.py
@@ -1289,6 +1289,16 @@  class RunQueue:
         self.worker = {}
         self.fakeworker = {}
 
+    @staticmethod
+    def send_pickled_data(worker, data, name):
+        msg = bytearray()
+        pickled_data = pickle.dumps(data)
+        msg.extend(b"<" + name.encode() + b">")
+        msg.extend(len(pickled_data).to_bytes(4, 'big'))
+        msg.extend(pickled_data)
+        msg.extend(b"</" + name.encode() + b">")
+        worker.stdin.write(msg) 
+
     def _start_worker(self, mc, fakeroot = False, rqexec = None):
         logger.debug("Starting bitbake-worker")
         magic = "decafbad"
@@ -1328,9 +1338,9 @@  class RunQueue:
             "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"),
         }
 
-        worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>")
-        worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>")
-        worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>")
+        RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig")
+        RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata")
+        RunQueue.send_pickled_data(worker, workerdata, "workerdata")
         worker.stdin.flush()
 
         return RunQueueWorker(worker, workerpipe)
@@ -1340,7 +1350,7 @@  class RunQueue:
             return
         logger.debug("Teardown for bitbake-worker")
         try:
-           worker.process.stdin.write(b"<quit></quit>")
+           RunQueue.send_pickled_data(worker.process, b"", "quit")
            worker.process.stdin.flush()
            worker.process.stdin.close()
         except IOError:
@@ -1867,14 +1877,14 @@  class RunQueueExecute:
     def finish_now(self):
         for mc in self.rq.worker:
             try:
-                self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>")
+                RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow")
                 self.rq.worker[mc].process.stdin.flush()
             except IOError:
                 # worker must have died?
                 pass
         for mc in self.rq.fakeworker:
             try:
-                self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>")
+                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow")
                 self.rq.fakeworker[mc].process.stdin.flush()
             except IOError:
                 # worker must have died?
@@ -2156,10 +2166,10 @@  class RunQueueExecute:
             if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
                 if not mc in self.rq.fakeworker:
                     self.rq.start_fakeworker(self, mc)
-                self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>")
+                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, (taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False), "runtask")
                 self.rq.fakeworker[mc].process.stdin.flush()
             else:
-                self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>")
+                RunQueue.send_pickled_data(self.rq.worker[mc].process, (taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False), "runtask")
                 self.rq.worker[mc].process.stdin.flush()
 
             self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True)
@@ -2243,10 +2253,10 @@  class RunQueueExecute:
                         self.rq.state = runQueueFailed
                         self.stats.taskFailed()
                         return True
-                self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>")
+                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, (taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce), "runtask")
                 self.rq.fakeworker[mc].process.stdin.flush()
             else:
-                self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>")
+                RunQueue.send_pickled_data(self.rq.worker[mc].process, (taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce), "runtask")
                 self.rq.worker[mc].process.stdin.flush()
 
             self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True)
@@ -2448,9 +2458,9 @@  class RunQueueExecute:
 
         if changed:
             for mc in self.rq.worker:
-                self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
+                RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
             for mc in self.rq.fakeworker:
-                self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
+                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
 
             hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed)))