From patchwork Thu Sep 21 07:56:58 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Etienne Cordonnier X-Patchwork-Id: 30873 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id 1AEE9CE79A8 for ; Thu, 21 Sep 2023 07:57:04 +0000 (UTC) Received: from mail-ed1-f54.google.com (mail-ed1-f54.google.com [209.85.208.54]) by mx.groups.io with SMTP id smtpd.web11.10933.1695283023455421624 for ; Thu, 21 Sep 2023 00:57:03 -0700 Authentication-Results: mx.groups.io; dkim=pass header.i=@snap.com header.s=google header.b=XAhKOISW; spf=pass (domain: snapchat.com, ip: 209.85.208.54, mailfrom: ecordonnier@snapchat.com) Received: by mail-ed1-f54.google.com with SMTP id 4fb4d7f45d1cf-530e180ffcbso626605a12.1 for ; Thu, 21 Sep 2023 00:57:03 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=snap.com; s=google; t=1695283022; x=1695887822; darn=lists.yoctoproject.org; h=content-transfer-encoding:mime-version:message-id:date:subject:cc :to:from:from:to:cc:subject:date:message-id:reply-to; bh=iSxVL88McJMpcIjfGVlFBTxtsrXPkx2NNYYJoARBI+c=; b=XAhKOISWK50m4ttGNUJRGt96xTMc4aMqGVHfzjHJA4LrHRBT0BcWqkfhOavmCq/3fw 3vtEvbeBZl5aZN8J/XA6v66t1HTj8MlgZ2l8YJV6XFSz7hwGlD8hevnjMA8J6y0LdzMy qgZywABd34Sga2V5D0S31HM5ERNLcu0GGnc6o= X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1695283022; x=1695887822; h=content-transfer-encoding:mime-version:message-id:date:subject:cc :to:from:x-gm-message-state:from:to:cc:subject:date:message-id :reply-to; bh=iSxVL88McJMpcIjfGVlFBTxtsrXPkx2NNYYJoARBI+c=; b=pDq/XNymW3eM3VC5WH8f9sJkpc2QvZ2VFKhYMoBx0MAIayicCI1aq9344pD8tlFbGr Xm9MmKyaJp21/sjBbEumuVhgY4gE7KZJONhdkoLKLH1uA48Ko5Z9CJg5mG23zILf9ZKh FiLig3k3Fn+/v4FQZ55tFm2Yk8nNPLBkv5Zt1XJ+9f8c/kydyReJnrxUS1+nWZ2w+ZgK +Jih1a0+1YBpJpU+WGGkAgp/TPnWj7qCNRIThwwKdV+9NOqR0Y7GEsWtf7KcvZegi0Vf 47nAYabZRdelpd7oJ+4ZWWb5rnxebIYVyUVzPKTETMuxWUFeRFudcC+Y/yZmkt9lziM3 VXyw== X-Gm-Message-State: AOJu0YzXp3Y1kvhG++qRv7U/y1J4D7rwiD+24bcmm7n8jKeDC7SbXROs ppU+rvkzny7eTIbTlO9TD2M+6Q== X-Google-Smtp-Source: AGHT+IENczgfAoZIhrW0H1VCPstxy6XcSPUDe2ygdrGpMdbmwnsQ2l0Kn1Buc6ndMMvb75uF/MDItw== X-Received: by 2002:a17:906:2001:b0:9a4:88af:b82 with SMTP id 1-20020a170906200100b009a488af0b82mr3790371ejo.77.1695283021739; Thu, 21 Sep 2023 00:57:01 -0700 (PDT) Received: from lj8k2dq3.sc-core.net ([213.249.125.50]) by smtp.gmail.com with ESMTPSA id lo18-20020a170906fa1200b0099bd1a78ef5sm648202ejb.74.2023.09.21.00.57.01 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Thu, 21 Sep 2023 00:57:01 -0700 (PDT) From: ecordonnier@snap.com To: bitbake-devel@lists.openembedded.org Cc: docs@lists.yoctoproject.org, Etienne Cordonnier Subject: [PATCH] bitbake-worker: add header with length of message Date: Thu, 21 Sep 2023 09:56:58 +0200 Message-Id: <20230921075658.509846-1-ecordonnier@snap.com> X-Mailer: git-send-email 2.36.1.vfs.0.0 MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Thu, 21 Sep 2023 07:57:04 -0000 X-Groupsio-URL: https://lists.yoctoproject.org/g/docs/message/4280 From: Etienne Cordonnier The IPC mechanism between runqueue.py and bitbake-worker is currently not scalable: The data is sent with the format pickled-data, and bitbake-worker has no information about the size of the message. Therefore, the bitbake-worker is calling select() and read() in a loop, and then calling "self.queue.find(b"")" for each chunk received. This does not scale, because queue.find has a linear complexity relative to the size of the queue, and workerdata messages get very big e.g. for builds which reference a lot of files in SRC_URI. The number of chunks varies, but on my test system a lot of chunks of 65536 bytes are sent, and each iteration takes 0.1 seconds, making the transfer of the "workerdata" data very slow (on my test setup 35 seconds before this fix, and 1.5 seconds after this fix). This commit adds a 4 bytes header after , so that bitbake-worker knows how many bytes need to be received, and does not need to constantly search the whole queue for . Signed-off-by: Etienne Cordonnier --- bin/bitbake-worker | 34 +++++++++++++++++++++++----------- lib/bb/runqueue.py | 34 ++++++++++++++++++++++------------ 2 files changed, 45 insertions(+), 23 deletions(-) diff --git a/bin/bitbake-worker b/bin/bitbake-worker index 451e6926..a4e78991 100755 --- a/bin/bitbake-worker +++ b/bin/bitbake-worker @@ -433,18 +433,30 @@ class BitbakeWorker(object): while self.process_waitpid(): continue - def handle_item(self, item, func): - if self.queue.startswith(b"<" + item + b">"): - index = self.queue.find(b"") - while index != -1: - try: - func(self.queue[(len(item) + 2):index]) - except pickle.UnpicklingError: - workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue)) - raise - self.queue = self.queue[(index + len(item) + 3):] - index = self.queue.find(b"") + opening_tag = b"<" + item + b">" + if not self.queue.startswith(opening_tag): + return + + tag_len = len(opening_tag) + if len(self.queue) < tag_len + 4: + # we need to receive more data + return + header = self.queue[tag_len:tag_len + 4] + payload_len = int.from_bytes(header, 'big') + # closing tag has length (tag_len + 1) + if len(self.queue) < tag_len * 2 + 1 + payload_len: + # we need to receive more data + return + + index = self.queue.find(b"") + if index != -1: + try: + func(self.queue[(tag_len + 4):index]) + except pickle.UnpicklingError: + workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue)) + raise + self.queue = self.queue[(index + len(b"")):] def handle_cookercfg(self, data): self.cookercfg = pickle.loads(data) diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py index c88d7129..9afb899c 100644 --- a/lib/bb/runqueue.py +++ b/lib/bb/runqueue.py @@ -1318,6 +1318,16 @@ class RunQueue: self.worker = {} self.fakeworker = {} + @staticmethod + def send_pickled_data(worker, data, name): + msg = bytearray() + msg.extend(b"<" + name.encode() + b">") + pickled_data = pickle.dumps(data) + msg.extend(len(pickled_data).to_bytes(4, 'big')) + msg.extend(pickled_data) + msg.extend(b"") + worker.stdin.write(msg) + def _start_worker(self, mc, fakeroot = False, rqexec = None): logger.debug("Starting bitbake-worker") magic = "decafbad" @@ -1353,9 +1363,9 @@ class RunQueue: "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"), } - worker.stdin.write(b"" + pickle.dumps(self.cooker.configuration) + b"") - worker.stdin.write(b"" + pickle.dumps(self.cooker.extraconfigdata) + b"") - worker.stdin.write(b"" + pickle.dumps(workerdata) + b"") + RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig") + RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata") + RunQueue.send_pickled_data(worker, workerdata, "workerdata") worker.stdin.flush() return RunQueueWorker(worker, workerpipe) @@ -1365,7 +1375,7 @@ class RunQueue: return logger.debug("Teardown for bitbake-worker") try: - worker.process.stdin.write(b"") + RunQueue.send_pickled_data(worker.process, b"", "quit") worker.process.stdin.flush() worker.process.stdin.close() except IOError: @@ -1892,14 +1902,14 @@ class RunQueueExecute: def finish_now(self): for mc in self.rq.worker: try: - self.rq.worker[mc].process.stdin.write(b"") + RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow") self.rq.worker[mc].process.stdin.flush() except IOError: # worker must have died? pass for mc in self.rq.fakeworker: try: - self.rq.fakeworker[mc].process.stdin.write(b"") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow") self.rq.fakeworker[mc].process.stdin.flush() except IOError: # worker must have died? @@ -2194,10 +2204,10 @@ class RunQueueExecute: if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: if not mc in self.rq.fakeworker: self.rq.start_fakeworker(self, mc) - self.rq.fakeworker[mc].process.stdin.write(b"" + pickle.dumps(runtask) + b"") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask") self.rq.fakeworker[mc].process.stdin.flush() else: - self.rq.worker[mc].process.stdin.write(b"" + pickle.dumps(runtask) + b"") + RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask") self.rq.worker[mc].process.stdin.flush() self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) @@ -2295,10 +2305,10 @@ class RunQueueExecute: self.rq.state = runQueueFailed self.stats.taskFailed() return True - self.rq.fakeworker[mc].process.stdin.write(b"" + pickle.dumps(runtask) + b"") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask") self.rq.fakeworker[mc].process.stdin.flush() else: - self.rq.worker[mc].process.stdin.write(b"" + pickle.dumps(runtask) + b"") + RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask") self.rq.worker[mc].process.stdin.flush() self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) @@ -2500,9 +2510,9 @@ class RunQueueExecute: if changed: for mc in self.rq.worker: - self.rq.worker[mc].process.stdin.write(b"" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"") + RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") for mc in self.rq.fakeworker: - self.rq.fakeworker[mc].process.stdin.write(b"" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed)))