Message ID | 20231110133324.3168418-1-ecordonnier@snap.com |
---|---|
State | New |
Headers | show |
Series | [2.0,kirkstone] bitbake-worker: add header with length of message | expand |
On Fri, Nov 10, 2023 at 3:34 AM Etienne Cordonnier via lists.openembedded.org <ecordonnier=snap.com@lists.openembedded.org> wrote: > > From: Etienne Cordonnier <ecordonnier@snap.com> > > The IPC mechanism between runqueue.py and bitbake-worker is currently > not scalable: > > The data is sent with the format <tag>pickled-data</tag>, and bitbake-worker > has no information about the size of the message. Therefore, the bitbake-worker > is calling select() and read() in a loop, and then calling "self.queue.find(b"</" + item + b">")" > for each chunk received. > > This does not scale, because queue.find has a linear complexity relative to the size of the queue, > and workerdata messages get very big e.g. for builds which reference a lot of files in SRC_URI. > The number of chunks varies, but on my test system a lot of chunks of 65536 bytes are sent, and each > iteration takes 0.1 seconds, making the transfer of the "workerdata" data very slow (on my test setup > 35 seconds before this fix, and 1.5 seconds after this fix). > > This commit adds a 4 bytes header after <tag>, so that bitbake-worker knows how many bytes need to be > received, and does not need to constantly search the whole queue for </tag>. Sorry, this is an API change and therefore not allowed for backporting to a stable release. Steve > > Signed-off-by: Etienne Cordonnier <ecordonnier@snap.com> > Signed-off-by: Alexandre Belloni <alexandre.belloni@bootlin.com> > Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org> > --- > bin/bitbake-worker | 33 ++++++++++++++++++++++----------- > lib/bb/runqueue.py | 34 ++++++++++++++++++++++------------ > 2 files changed, 44 insertions(+), 23 deletions(-) > > diff --git a/bin/bitbake-worker b/bin/bitbake-worker > index e02f2532..f8be973d 100755 > --- a/bin/bitbake-worker > +++ b/bin/bitbake-worker > @@ -425,18 +425,29 @@ class BitbakeWorker(object): > while self.process_waitpid(): > continue > > - > def handle_item(self, item, func): > - if self.queue.startswith(b"<" + item + b">"): > - index = self.queue.find(b"</" + item + b">") > - while index != -1: > - try: > - func(self.queue[(len(item) + 2):index]) > - except pickle.UnpicklingError: > - workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue)) > - raise > - self.queue = self.queue[(index + len(item) + 3):] > - index = self.queue.find(b"</" + item + b">") > + opening_tag = b"<" + item + b">" > + if not self.queue.startswith(opening_tag): > + return > + > + tag_len = len(opening_tag) > + if len(self.queue) < tag_len + 4: > + # we need to receive more data > + return > + header = self.queue[tag_len:tag_len + 4] > + payload_len = int.from_bytes(header, 'big') > + # closing tag has length (tag_len + 1) > + if len(self.queue) < tag_len * 2 + 1 + payload_len: > + # we need to receive more data > + return > + index = self.queue.find(b"</" + item + b">") > + if index != -1: > + try: > + func(self.queue[(len(item) + 2 + 4):index]) > + except pickle.UnpicklingError: > + workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue)) > + raise > + self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):] > > def handle_cookercfg(self, data): > self.cookercfg = pickle.loads(data) > diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py > index 8605c46c..4e48c944 100644 > --- a/lib/bb/runqueue.py > +++ b/lib/bb/runqueue.py > @@ -1289,6 +1289,16 @@ class RunQueue: > self.worker = {} > self.fakeworker = {} > > + @staticmethod > + def send_pickled_data(worker, data, name): > + msg = bytearray() > + pickled_data = pickle.dumps(data) > + msg.extend(b"<" + name.encode() + b">") > + msg.extend(len(pickled_data).to_bytes(4, 'big')) > + msg.extend(pickled_data) > + msg.extend(b"</" + name.encode() + b">") > + worker.stdin.write(msg) > + > def _start_worker(self, mc, fakeroot = False, rqexec = None): > logger.debug("Starting bitbake-worker") > magic = "decafbad" > @@ -1328,9 +1338,9 @@ class RunQueue: > "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"), > } > > - worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>") > - worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>") > - worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>") > + RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig") > + RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata") > + RunQueue.send_pickled_data(worker, workerdata, "workerdata") > worker.stdin.flush() > > return RunQueueWorker(worker, workerpipe) > @@ -1340,7 +1350,7 @@ class RunQueue: > return > logger.debug("Teardown for bitbake-worker") > try: > - worker.process.stdin.write(b"<quit></quit>") > + RunQueue.send_pickled_data(worker.process, b"", "quit") > worker.process.stdin.flush() > worker.process.stdin.close() > except IOError: > @@ -1867,14 +1877,14 @@ class RunQueueExecute: > def finish_now(self): > for mc in self.rq.worker: > try: > - self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>") > + RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow") > self.rq.worker[mc].process.stdin.flush() > except IOError: > # worker must have died? > pass > for mc in self.rq.fakeworker: > try: > - self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>") > + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow") > self.rq.fakeworker[mc].process.stdin.flush() > except IOError: > # worker must have died? > @@ -2156,10 +2166,10 @@ class RunQueueExecute: > if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: > if not mc in self.rq.fakeworker: > self.rq.start_fakeworker(self, mc) > - self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>") > + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, (taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False), "runtask") > self.rq.fakeworker[mc].process.stdin.flush() > else: > - self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>") > + RunQueue.send_pickled_data(self.rq.worker[mc].process, (taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False), "runtask") > self.rq.worker[mc].process.stdin.flush() > > self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) > @@ -2243,10 +2253,10 @@ class RunQueueExecute: > self.rq.state = runQueueFailed > self.stats.taskFailed() > return True > - self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>") > + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, (taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce), "runtask") > self.rq.fakeworker[mc].process.stdin.flush() > else: > - self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>") > + RunQueue.send_pickled_data(self.rq.worker[mc].process, (taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce), "runtask") > self.rq.worker[mc].process.stdin.flush() > > self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) > @@ -2448,9 +2458,9 @@ class RunQueueExecute: > > if changed: > for mc in self.rq.worker: > - self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") > + RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") > for mc in self.rq.fakeworker: > - self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") > + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") > > hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed))) > > -- > 2.36.1.vfs.0.0 > > > -=-=-=-=-=-=-=-=-=-=-=- > Links: You receive all messages sent to this group. > View/Reply Online (#15492): https://lists.openembedded.org/g/bitbake-devel/message/15492 > Mute This Topic: https://lists.openembedded.org/mt/102506319/3620601 > Group Owner: bitbake-devel+owner@lists.openembedded.org > Unsubscribe: https://lists.openembedded.org/g/bitbake-devel/unsub [steve@sakoman.com] > -=-=-=-=-=-=-=-=-=-=-=- >
diff --git a/bin/bitbake-worker b/bin/bitbake-worker index e02f2532..f8be973d 100755 --- a/bin/bitbake-worker +++ b/bin/bitbake-worker @@ -425,18 +425,29 @@ class BitbakeWorker(object): while self.process_waitpid(): continue - def handle_item(self, item, func): - if self.queue.startswith(b"<" + item + b">"): - index = self.queue.find(b"</" + item + b">") - while index != -1: - try: - func(self.queue[(len(item) + 2):index]) - except pickle.UnpicklingError: - workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue)) - raise - self.queue = self.queue[(index + len(item) + 3):] - index = self.queue.find(b"</" + item + b">") + opening_tag = b"<" + item + b">" + if not self.queue.startswith(opening_tag): + return + + tag_len = len(opening_tag) + if len(self.queue) < tag_len + 4: + # we need to receive more data + return + header = self.queue[tag_len:tag_len + 4] + payload_len = int.from_bytes(header, 'big') + # closing tag has length (tag_len + 1) + if len(self.queue) < tag_len * 2 + 1 + payload_len: + # we need to receive more data + return + index = self.queue.find(b"</" + item + b">") + if index != -1: + try: + func(self.queue[(len(item) + 2 + 4):index]) + except pickle.UnpicklingError: + workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue)) + raise + self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):] def handle_cookercfg(self, data): self.cookercfg = pickle.loads(data) diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py index 8605c46c..4e48c944 100644 --- a/lib/bb/runqueue.py +++ b/lib/bb/runqueue.py @@ -1289,6 +1289,16 @@ class RunQueue: self.worker = {} self.fakeworker = {} + @staticmethod + def send_pickled_data(worker, data, name): + msg = bytearray() + pickled_data = pickle.dumps(data) + msg.extend(b"<" + name.encode() + b">") + msg.extend(len(pickled_data).to_bytes(4, 'big')) + msg.extend(pickled_data) + msg.extend(b"</" + name.encode() + b">") + worker.stdin.write(msg) + def _start_worker(self, mc, fakeroot = False, rqexec = None): logger.debug("Starting bitbake-worker") magic = "decafbad" @@ -1328,9 +1338,9 @@ class RunQueue: "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"), } - worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>") - worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>") - worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>") + RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig") + RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata") + RunQueue.send_pickled_data(worker, workerdata, "workerdata") worker.stdin.flush() return RunQueueWorker(worker, workerpipe) @@ -1340,7 +1350,7 @@ class RunQueue: return logger.debug("Teardown for bitbake-worker") try: - worker.process.stdin.write(b"<quit></quit>") + RunQueue.send_pickled_data(worker.process, b"", "quit") worker.process.stdin.flush() worker.process.stdin.close() except IOError: @@ -1867,14 +1877,14 @@ class RunQueueExecute: def finish_now(self): for mc in self.rq.worker: try: - self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>") + RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow") self.rq.worker[mc].process.stdin.flush() except IOError: # worker must have died? pass for mc in self.rq.fakeworker: try: - self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow") self.rq.fakeworker[mc].process.stdin.flush() except IOError: # worker must have died? @@ -2156,10 +2166,10 @@ class RunQueueExecute: if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: if not mc in self.rq.fakeworker: self.rq.start_fakeworker(self, mc) - self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, (taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False), "runtask") self.rq.fakeworker[mc].process.stdin.flush() else: - self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>") + RunQueue.send_pickled_data(self.rq.worker[mc].process, (taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False), "runtask") self.rq.worker[mc].process.stdin.flush() self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) @@ -2243,10 +2253,10 @@ class RunQueueExecute: self.rq.state = runQueueFailed self.stats.taskFailed() return True - self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, (taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce), "runtask") self.rq.fakeworker[mc].process.stdin.flush() else: - self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>") + RunQueue.send_pickled_data(self.rq.worker[mc].process, (taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce), "runtask") self.rq.worker[mc].process.stdin.flush() self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) @@ -2448,9 +2458,9 @@ class RunQueueExecute: if changed: for mc in self.rq.worker: - self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") + RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") for mc in self.rq.fakeworker: - self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed)))