From patchwork Sun Feb 18 20:07:39 2024 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 39655 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id D1D16C48BF6 for ; Sun, 18 Feb 2024 20:08:04 +0000 (UTC) Received: from mail-io1-f52.google.com (mail-io1-f52.google.com [209.85.166.52]) by mx.groups.io with SMTP id smtpd.web11.25649.1708286875846585219 for ; Sun, 18 Feb 2024 12:07:55 -0800 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=BkSugQFz; spf=pass (domain: gmail.com, ip: 209.85.166.52, mailfrom: jpewhacker@gmail.com) Received: by mail-io1-f52.google.com with SMTP id ca18e2360f4ac-7c72294e3d1so126766239f.1 for ; Sun, 18 Feb 2024 12:07:55 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1708286874; x=1708891674; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=GaTwCVBHiMp2WMJhdaju/OW/ME4sPP5hS8BJw1s907w=; b=BkSugQFznlFs3U93zaIeTSoLQX9D1EVmi4ggyUic3te01So68yQPB6/ypV+PdjzWVQ jHtt+DXSBVqGZVhZesxeNtQtBBvo6yxP7UAUpbdlq6+ih7tdVlTsJAROCKm1LWj8UIqg 7EuwsbJ5Qyd1RXQ5aAsNmBA4ku2AYJPwKSNAYfsrNneGPfBRfKYeGxin/RCnw++zuxMQ hIOY28+rlfsthZoe8bgh2K+y24YBe/pNt0pB6mrYyueDHbTTmK7s4FvCl8h1bpU+bAmT h1Dq4I9wFDV16Kk5ubd9pHy3G9SvhvAt/fV/eF7z6UChTrww5tvsgZM1TqBIXpehTT6w ojvQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1708286874; x=1708891674; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=GaTwCVBHiMp2WMJhdaju/OW/ME4sPP5hS8BJw1s907w=; b=xD5/se4WzKnlSgvP7poFh/rksATF7LbQtF3symJKajlQIUwL8pOrMAM0uTzkj/vOa1 ZiX43cV7iitM+80ZqhFwIfPfu7kEi6ndxAIRQcjpOK3uB1XG1nQ4iD1C9WFkSwftMVG8 a/pWcBHX5Hx+yDyGIGDpXqDfK1fCWg9Xt2zu0zgHMzl6gL7Gy8dHicCGOQnlGLasLHiM 9GRLyshypFckcGPVJCGo+yQyVoh1dTPRam0wsrIH0O+2G/BhkNSz442eCqkdJQBOXhHc hrLlBVs/hZkHZ6vu0CnKzzAqAxPyLfLxTys5f3+W0BimbQ4tpzHkuTuR77+p0HGyvwIK IhVw== X-Gm-Message-State: AOJu0YwCovg7APZiPWwnULuw6ZBGmlEEqc5BA2v7p5FbNCCgG3ckAKdB NaRau7uWsfcPGfDbAWzZTTfELajCqqdAWBMUkL07XUVrbfEsuScabqyjE0lv X-Google-Smtp-Source: AGHT+IHiCLUoGMGB2zhvkBlP5w/ZPdus1f9InMGryfJWtN1ExhfQBoOtKlljYAMEnZcB/UX+5bh0ww== X-Received: by 2002:a5d:8e02:0:b0:7c4:9a35:40bd with SMTP id e2-20020a5d8e02000000b007c49a3540bdmr10633557iod.3.1708286873860; Sun, 18 Feb 2024 12:07:53 -0800 (PST) Received: from localhost.localdomain ([2601:282:4300:19e0::44fb]) by smtp.gmail.com with ESMTPSA id u6-20020a02aa86000000b004741e1544b6sm549278jai.81.2024.02.18.12.07.53 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Sun, 18 Feb 2024 12:07:53 -0800 (PST) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][PATCH 1/5] hashserv: sqlalchemy: Use _execute() helper Date: Sun, 18 Feb 2024 13:07:39 -0700 Message-Id: <20240218200743.2982923-2-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20240218200743.2982923-1-JPEWhacker@gmail.com> References: <20240218200743.2982923-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Sun, 18 Feb 2024 20:08:04 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15917 Use the _execute() helper to execute queries. This helper does the logging of the statement that was being done manually everywhere. Signed-off-by: Joshua Watt --- bitbake/lib/hashserv/sqlalchemy.py | 297 ++++++++++++++--------------- 1 file changed, 140 insertions(+), 157 deletions(-) diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py index 89a6b86d9d8..873547809a0 100644 --- a/bitbake/lib/hashserv/sqlalchemy.py +++ b/bitbake/lib/hashserv/sqlalchemy.py @@ -233,124 +233,113 @@ class Database(object): return row.value async def get_unihash_by_taskhash_full(self, method, taskhash): - statement = ( - select( - OuthashesV2, - UnihashesV3.unihash.label("unihash"), - ) - .join( - UnihashesV3, - and_( - UnihashesV3.method == OuthashesV2.method, - UnihashesV3.taskhash == OuthashesV2.taskhash, - ), - ) - .where( - OuthashesV2.method == method, - OuthashesV2.taskhash == taskhash, - ) - .order_by( - OuthashesV2.created.asc(), - ) - .limit(1) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + OuthashesV2, + UnihashesV3.unihash.label("unihash"), + ) + .join( + UnihashesV3, + and_( + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, + ), + ) + .where( + OuthashesV2.method == method, + OuthashesV2.taskhash == taskhash, + ) + .order_by( + OuthashesV2.created.asc(), + ) + .limit(1) + ) return map_row(result.first()) async def get_unihash_by_outhash(self, method, outhash): - statement = ( - select(OuthashesV2, UnihashesV3.unihash.label("unihash")) - .join( - UnihashesV3, - and_( - UnihashesV3.method == OuthashesV2.method, - UnihashesV3.taskhash == OuthashesV2.taskhash, - ), - ) - .where( - OuthashesV2.method == method, - OuthashesV2.outhash == outhash, - ) - .order_by( - OuthashesV2.created.asc(), - ) - .limit(1) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select(OuthashesV2, UnihashesV3.unihash.label("unihash")) + .join( + UnihashesV3, + and_( + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, + ), + ) + .where( + OuthashesV2.method == method, + OuthashesV2.outhash == outhash, + ) + .order_by( + OuthashesV2.created.asc(), + ) + .limit(1) + ) return map_row(result.first()) async def get_outhash(self, method, outhash): - statement = ( - select(OuthashesV2) - .where( - OuthashesV2.method == method, - OuthashesV2.outhash == outhash, - ) - .order_by( - OuthashesV2.created.asc(), - ) - .limit(1) - ) - - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select(OuthashesV2) + .where( + OuthashesV2.method == method, + OuthashesV2.outhash == outhash, + ) + .order_by( + OuthashesV2.created.asc(), + ) + .limit(1) + ) return map_row(result.first()) async def get_equivalent_for_outhash(self, method, outhash, taskhash): - statement = ( - select( - OuthashesV2.taskhash.label("taskhash"), - UnihashesV3.unihash.label("unihash"), - ) - .join( - UnihashesV3, - and_( - UnihashesV3.method == OuthashesV2.method, - UnihashesV3.taskhash == OuthashesV2.taskhash, - ), - ) - .where( - OuthashesV2.method == method, - OuthashesV2.outhash == outhash, - OuthashesV2.taskhash != taskhash, - ) - .order_by( - OuthashesV2.created.asc(), - ) - .limit(1) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + OuthashesV2.taskhash.label("taskhash"), + UnihashesV3.unihash.label("unihash"), + ) + .join( + UnihashesV3, + and_( + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, + ), + ) + .where( + OuthashesV2.method == method, + OuthashesV2.outhash == outhash, + OuthashesV2.taskhash != taskhash, + ) + .order_by( + OuthashesV2.created.asc(), + ) + .limit(1) + ) return map_row(result.first()) async def get_equivalent(self, method, taskhash): - statement = select( - UnihashesV3.unihash, - UnihashesV3.method, - UnihashesV3.taskhash, - ).where( - UnihashesV3.method == method, - UnihashesV3.taskhash == taskhash, - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + UnihashesV3.unihash, + UnihashesV3.method, + UnihashesV3.taskhash, + ).where( + UnihashesV3.method == method, + UnihashesV3.taskhash == taskhash, + ) + ) return map_row(result.first()) async def remove(self, condition): async def do_remove(table): where = _make_condition_statement(table, condition) if where: - statement = delete(table).where(*where) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute(delete(table).where(*where)) return result.rowcount return 0 @@ -417,21 +406,21 @@ class Database(object): return result.rowcount async def clean_unused(self, oldest): - statement = delete(OuthashesV2).where( - OuthashesV2.created < oldest, - ~( - select(UnihashesV3.id) - .where( - UnihashesV3.method == OuthashesV2.method, - UnihashesV3.taskhash == OuthashesV2.taskhash, - ) - .limit(1) - .exists() - ), - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + delete(OuthashesV2).where( + OuthashesV2.created < oldest, + ~( + select(UnihashesV3.id) + .where( + UnihashesV3.method == OuthashesV2.method, + UnihashesV3.taskhash == OuthashesV2.taskhash, + ) + .limit(1) + .exists() + ), + ) + ) return result.rowcount async def insert_unihash(self, method, taskhash, unihash): @@ -461,11 +450,9 @@ class Database(object): if "created" in data and not isinstance(data["created"], datetime): data["created"] = datetime.fromisoformat(data["created"]) - statement = insert(OuthashesV2).values(**data) - self.logger.debug("%s", statement) try: async with self.db.begin(): - await self.db.execute(statement) + await self._execute(insert(OuthashesV2).values(**data)) return True except IntegrityError: self.logger.debug( @@ -474,16 +461,16 @@ class Database(object): return False async def _get_user(self, username): - statement = select( - Users.username, - Users.permissions, - Users.token, - ).where( - Users.username == username, - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + Users.username, + Users.permissions, + Users.token, + ).where( + Users.username == username, + ) + ) return result.first() async def lookup_user_token(self, username): @@ -496,70 +483,66 @@ class Database(object): return map_user(await self._get_user(username)) async def set_user_token(self, username, token): - statement = ( - update(Users) - .where( - Users.username == username, - ) - .values( - token=token, - ) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + update(Users) + .where( + Users.username == username, + ) + .values( + token=token, + ) + ) return result.rowcount != 0 async def set_user_perms(self, username, permissions): - statement = ( - update(Users) - .where(Users.username == username) - .values(permissions=" ".join(permissions)) - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + update(Users) + .where(Users.username == username) + .values(permissions=" ".join(permissions)) + ) return result.rowcount != 0 async def get_all_users(self): - statement = select( - Users.username, - Users.permissions, - ) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + select( + Users.username, + Users.permissions, + ) + ) return [map_user(row) for row in result] async def new_user(self, username, permissions, token): - statement = insert(Users).values( - username=username, - permissions=" ".join(permissions), - token=token, - ) - self.logger.debug("%s", statement) try: async with self.db.begin(): - await self.db.execute(statement) + await self._execute( + insert(Users).values( + username=username, + permissions=" ".join(permissions), + token=token, + ) + ) return True except IntegrityError as e: self.logger.debug("Cannot create new user %s: %s", username, e) return False async def delete_user(self, username): - statement = delete(Users).where(Users.username == username) - self.logger.debug("%s", statement) async with self.db.begin(): - result = await self.db.execute(statement) + result = await self._execute( + delete(Users).where(Users.username == username) + ) return result.rowcount != 0 async def get_usage(self): usage = {} async with self.db.begin() as session: for name, table in Base.metadata.tables.items(): - statement = select(func.count()).select_from(table) - self.logger.debug("%s", statement) - result = await self.db.execute(statement) + result = await self._execute( + statement=select(func.count()).select_from(table) + ) usage[name] = { "rows": result.scalar(), } From patchwork Sun Feb 18 20:07:40 2024 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 39656 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id 0B917C54787 for ; Sun, 18 Feb 2024 20:08:05 +0000 (UTC) Received: from mail-io1-f42.google.com (mail-io1-f42.google.com [209.85.166.42]) by mx.groups.io with SMTP id smtpd.web10.25557.1708286877807275422 for ; Sun, 18 Feb 2024 12:07:57 -0800 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=Q2SXRix1; spf=pass (domain: gmail.com, ip: 209.85.166.42, mailfrom: jpewhacker@gmail.com) Received: by mail-io1-f42.google.com with SMTP id ca18e2360f4ac-7c74d3ab471so15190339f.2 for ; Sun, 18 Feb 2024 12:07:57 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1708286876; x=1708891676; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=CtvV3E/YY2tMZ2TEJt6BPtAHOVV5gzzmxAya+OjTTsE=; b=Q2SXRix1Q8ErpIzhjrSIr4G2EyjkLkYMVAiFN/EA+NjupqShXuaxGU01STB4x4sFhv 9glDnvLOEJrIP1yvxFMhTE2RwE/1dd/C7pB+m77HPqeJry7AdP9mOtnmEn/nHf6MeCsA AIbRrthikbx+D00pig6b9q/4n+D2OFBHRtEz7yyYQ+ZiHe93qhBSg3zzDoq1l5fVQIXv ZGx5j25s+msIUrFI6wjcbl9Ixa8dRmVlnfeK8uITa06/F+7c1IM1XyqpGEAfSJVkZy6+ SfL9UMrf7NyvJFAb9kyvegA67TrLjSF2fMYbqRcnSDYSrb1GmK3v2NOuqEwfQQebmVYR rlPA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1708286876; x=1708891676; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=CtvV3E/YY2tMZ2TEJt6BPtAHOVV5gzzmxAya+OjTTsE=; b=v1rRQZykG3wWz6T4pnV/SCBgKbzQ91QWmzFNgWmk6bZekCQ28AZG+7lg5X2pgRSran y+NrKVgrt/aDkq74odd/8UJ4tkxHP3eLwKBxZXk4eUIy9eLOWAOTWfHUw7wc5Q84e+5e wsnhcaGchtSYSn2U9p24iPcHdKx0Z+px21GHPVPvvKzqmikx1Zm8kdT1UXrSm2UK/VBn MD7S7TwgVsyg7ODZzA0kz8VBLMbw6VODTxukAee/7KunzF61lowvYiQux+j7fC44WCbO mMpaKvyfGbY9y2Wa7IyJin8ovy8874pLTLlXCJ+Hik92MJPwu6NTgXpN5dtZR2PHSF1D UxJg== X-Gm-Message-State: AOJu0YzGlLc+hIa2q3Kr3Q4+HvN1Y8PrNWraU1FI6tb7nHFSN1dvBXmL QtoWG6UJq0wd2TBliC5GjtBE3mdfzBZpNtPa7mzCVc5qumllWNbuhs6/pV0Q X-Google-Smtp-Source: AGHT+IERlLSmtSiFkw1p5StjrLFeG23Sk5qQg4sPs2qqqKvTgBD/lZe/5yo61UbXWvKMOLXfPVI5eg== X-Received: by 2002:a5d:91cb:0:b0:7c7:4fa7:f55f with SMTP id k11-20020a5d91cb000000b007c74fa7f55fmr852445ior.20.1708286875948; Sun, 18 Feb 2024 12:07:55 -0800 (PST) Received: from localhost.localdomain ([2601:282:4300:19e0::44fb]) by smtp.gmail.com with ESMTPSA id u6-20020a02aa86000000b004741e1544b6sm549278jai.81.2024.02.18.12.07.53 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Sun, 18 Feb 2024 12:07:54 -0800 (PST) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][PATCH 2/5] hashserv: Add unihash-exists API Date: Sun, 18 Feb 2024 13:07:40 -0700 Message-Id: <20240218200743.2982923-3-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20240218200743.2982923-1-JPEWhacker@gmail.com> References: <20240218200743.2982923-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Sun, 18 Feb 2024 20:08:05 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15918 Adds API to check if the server is aware of the existence of a given unihash. This can be used as an optimization for sstate where a client can query the hash equivalence server to check if a unihash exists before querying the sstate cache. If the hash server isn't aware of the existence of a unihash, then there is very likely not a matching sstate object, so this should be able to significantly cut down on the number of negative hits on the sstate cache. Signed-off-by: Joshua Watt --- bitbake/bin/bitbake-hashclient | 13 +++++++ bitbake/lib/hashserv/client.py | 44 ++++++++++++++++----- bitbake/lib/hashserv/server.py | 61 +++++++++++++++++++----------- bitbake/lib/hashserv/sqlalchemy.py | 11 ++++++ bitbake/lib/hashserv/sqlite.py | 16 ++++++++ bitbake/lib/hashserv/tests.py | 39 +++++++++++++++++++ 6 files changed, 151 insertions(+), 33 deletions(-) diff --git a/bitbake/bin/bitbake-hashclient b/bitbake/bin/bitbake-hashclient index f71b87404ae..47dd27cd3c2 100755 --- a/bitbake/bin/bitbake-hashclient +++ b/bitbake/bin/bitbake-hashclient @@ -217,6 +217,14 @@ def main(): print("Removed %d rows" % result["count"]) return 0 + def handle_unihash_exists(args, client): + result = client.unihash_exists(args.unihash) + if args.quiet: + return 0 if result else 1 + + print("true" if result else "false") + return 0 + parser = argparse.ArgumentParser(description='Hash Equivalence Client') parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")') parser.add_argument('--log', default='WARNING', help='Set logging level') @@ -309,6 +317,11 @@ def main(): gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation") gc_sweep_parser.set_defaults(func=handle_gc_sweep) + unihash_exists_parser = subparsers.add_parser('unihash-exists', help="Check if a unihash is known to the server") + unihash_exists_parser.add_argument("--quiet", action="store_true", help="Don't print status. Instead, exit with 0 if unihash exists and 1 if it does not") + unihash_exists_parser.add_argument("unihash", help="Unihash to check") + unihash_exists_parser.set_defaults(func=handle_unihash_exists) + args = parser.parse_args() logger = logging.getLogger('hashserv') diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index e6dc4179126..daf1e128423 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py @@ -16,6 +16,7 @@ logger = logging.getLogger("hashserv.client") class AsyncClient(bb.asyncrpc.AsyncClient): MODE_NORMAL = 0 MODE_GET_STREAM = 1 + MODE_EXIST_STREAM = 2 def __init__(self, username=None, password=None): super().__init__("OEHASHEQUIV", "1.1", logger) @@ -49,19 +50,36 @@ class AsyncClient(bb.asyncrpc.AsyncClient): await self.socket.send("END") return await self.socket.recv() - if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: + async def normal_to_stream(command): + r = await self.invoke({command: None}) + if r != "ok": + raise ConnectionError( + f"Unable to transition to stream mode: Bad response from server {r!r}" + ) + + self.logger.debug("Mode is now %s", command) + + if new_mode == self.mode: + return + + self.logger.debug("Transitioning mode %s -> %s", self.mode, new_mode) + + # Always transition to normal mode before switching to any other mode + if self.mode != self.MODE_NORMAL: r = await self._send_wrapper(stream_to_normal) if r != "ok": self.check_invoke_error(r) - raise ConnectionError("Unable to transition to normal mode: Bad response from server %r" % r) - elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: - r = await self.invoke({"get-stream": None}) - if r != "ok": - raise ConnectionError("Unable to transition to stream mode: Bad response from server %r" % r) - elif new_mode != self.mode: - raise Exception( - "Undefined mode transition %r -> %r" % (self.mode, new_mode) - ) + raise ConnectionError( + f"Unable to transition to normal mode: Bad response from server {r!r}" + ) + self.logger.debug("Mode is now normal") + + if new_mode == self.MODE_GET_STREAM: + await normal_to_stream("get-stream") + elif new_mode == self.MODE_EXIST_STREAM: + await normal_to_stream("exists-stream") + elif new_mode != self.MODE_NORMAL: + raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}") self.mode = new_mode @@ -95,6 +113,11 @@ class AsyncClient(bb.asyncrpc.AsyncClient): {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} ) + async def unihash_exists(self, unihash): + await self._set_mode(self.MODE_EXIST_STREAM) + r = await self.send_stream(unihash) + return r == "true" + async def get_outhash(self, method, outhash, taskhash, with_unihash=True): await self._set_mode(self.MODE_NORMAL) return await self.invoke( @@ -236,6 +259,7 @@ class Client(bb.asyncrpc.Client): "report_unihash", "report_unihash_equiv", "get_taskhash", + "unihash_exists", "get_outhash", "get_stats", "reset_stats", diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index 5ed852d1f30..68f64f983b2 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py @@ -234,6 +234,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): "get": self.handle_get, "get-outhash": self.handle_get_outhash, "get-stream": self.handle_get_stream, + "exists-stream": self.handle_exists_stream, "get-stats": self.handle_get_stats, "get-db-usage": self.handle_get_db_usage, "get-db-query-columns": self.handle_get_db_query_columns, @@ -377,8 +378,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) await self.db.insert_outhash(data) - @permissions(READ_PERM) - async def handle_get_stream(self, request): + async def _stream_handler(self, handler): await self.socket.send_message("ok") while True: @@ -400,35 +400,50 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): if l == "END": break - (method, taskhash) = l.split() - # self.logger.debug('Looking up %s %s' % (method, taskhash)) - row = await self.db.get_equivalent(method, taskhash) - - if row is not None: - msg = row["unihash"] - # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) - elif self.upstream_client is not None: - upstream = await self.upstream_client.get_unihash(method, taskhash) - if upstream: - msg = upstream - else: - msg = "" - else: - msg = "" - + msg = await handler(l) await self.socket.send(msg) finally: request_measure.end() self.request_sample.end() - # Post to the backfill queue after writing the result to minimize - # the turn around time on a request - if upstream is not None: - await self.server.backfill_queue.put((method, taskhash)) - await self.socket.send("ok") return self.NO_RESPONSE + @permissions(READ_PERM) + async def handle_get_stream(self, request): + async def handler(l): + (method, taskhash) = l.split() + # self.logger.debug('Looking up %s %s' % (method, taskhash)) + row = await self.db.get_equivalent(method, taskhash) + + if row is not None: + # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) + return row["unihash"] + + if self.upstream_client is not None: + upstream = await self.upstream_client.get_unihash(method, taskhash) + if upstream: + await self.server.backfill_queue.put((method, taskhash)) + return upstream + + return "" + + return await self._stream_handler(handler) + + @permissions(READ_PERM) + async def handle_exists_stream(self, request): + async def handler(l): + if await self.db.unihash_exists(l): + return "true" + + if self.upstream_client is not None: + if await self.upstream_client.unihash_exists(l): + return "true" + + return "false" + + return await self._stream_handler(handler) + async def report_readonly(self, data): method = data["method"] outhash = data["outhash"] diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py index 873547809a0..0e28d738f5a 100644 --- a/bitbake/lib/hashserv/sqlalchemy.py +++ b/bitbake/lib/hashserv/sqlalchemy.py @@ -48,6 +48,7 @@ class UnihashesV3(Base): __table_args__ = ( UniqueConstraint("method", "taskhash"), Index("taskhash_lookup_v4", "method", "taskhash"), + Index("unihash_lookup_v1", "unihash"), ) @@ -279,6 +280,16 @@ class Database(object): ) return map_row(result.first()) + async def unihash_exists(self, unihash): + async with self.db.begin(): + result = await self._execute( + select(UnihashesV3) + .where(UnihashesV3.unihash == unihash) + .limit(1) + ) + + return result.first() is not None + async def get_outhash(self, method, outhash): async with self.db.begin(): result = await self._execute( diff --git a/bitbake/lib/hashserv/sqlite.py b/bitbake/lib/hashserv/sqlite.py index 608490730d7..da2e844a031 100644 --- a/bitbake/lib/hashserv/sqlite.py +++ b/bitbake/lib/hashserv/sqlite.py @@ -144,6 +144,9 @@ class DatabaseEngine(object): cursor.execute( "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)" ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS unihash_lookup_v1 ON unihashes_v3 (unihash)" + ) cursor.execute( "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" ) @@ -255,6 +258,19 @@ class Database(object): ) return cursor.fetchone() + async def unihash_exists(self, unihash): + with closing(self.db.cursor()) as cursor: + cursor.execute( + """ + SELECT * FROM unihashes_v3 WHERE unihash=:unihash + LIMIT 1 + """, + { + "unihash": unihash, + }, + ) + return cursor.fetchone() is not None + async def get_outhash(self, method, outhash): with closing(self.db.cursor()) as cursor: cursor.execute( diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index aeedab3575e..fbbe81512a0 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py @@ -442,6 +442,11 @@ class HashEquivalenceCommonTests(object): self.assertEqual(result['taskhash'], taskhash9, 'Server failed to copy unihash from upstream') self.assertEqual(result['method'], self.METHOD) + def test_unihash_exsits(self): + taskhash, outhash, unihash = self.create_test_hash(self.client) + self.assertTrue(self.client.unihash_exists(unihash)) + self.assertFalse(self.client.unihash_exists('6662e699d6e3d894b24408ff9a4031ef9b038ee8')) + def test_ro_server(self): rw_server = self.start_server() rw_client = self.start_client(rw_server.address) @@ -1031,6 +1036,40 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase): def test_stress(self): self.run_hashclient(["--address", self.server_address, "stress"], check=True) + def test_unihash_exsits(self): + taskhash, outhash, unihash = self.create_test_hash(self.client) + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", unihash, + ], check=True) + self.assertEqual(p.stdout.strip(), "true") + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8', + ], check=True) + self.assertEqual(p.stdout.strip(), "false") + + def test_unihash_exsits_quiet(self): + taskhash, outhash, unihash = self.create_test_hash(self.client) + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", unihash, + "--quiet", + ]) + self.assertEqual(p.returncode, 0) + self.assertEqual(p.stdout.strip(), "") + + p = self.run_hashclient([ + "--address", self.server_address, + "unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8', + "--quiet", + ]) + self.assertEqual(p.returncode, 1) + self.assertEqual(p.stdout.strip(), "") + def test_remove_taskhash(self): taskhash, outhash, unihash = self.create_test_hash(self.client) self.run_hashclient([ From patchwork Sun Feb 18 20:07:41 2024 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 39654 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id 0D0B4C5478A for ; Sun, 18 Feb 2024 20:08:05 +0000 (UTC) Received: from mail-io1-f46.google.com (mail-io1-f46.google.com [209.85.166.46]) by mx.groups.io with SMTP id smtpd.web10.25559.1708286880306523128 for ; Sun, 18 Feb 2024 12:08:00 -0800 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=PikckazD; spf=pass (domain: gmail.com, ip: 209.85.166.46, mailfrom: jpewhacker@gmail.com) Received: by mail-io1-f46.google.com with SMTP id ca18e2360f4ac-7c72290deb6so109119939f.3 for ; Sun, 18 Feb 2024 12:08:00 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1708286878; x=1708891678; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=M4D9le2KkjKbIj5/ZeXSLmEDIerOkrycNwUHeOgzjZA=; b=PikckazDYtQW64xt5AFnj5UfcIxf6zPPi2hSIRrWOvxjUr+0MDqBZuNTagg0Cm8uzV R75jBCWc+1N4aDgtZ8vSsMfEIcyqIQWg0/kbj2RUR7YSdO0vE3UAGFqw0aph8AMIcASk 4nGpyNB5S/1RwxfkHghtesUALFgR8IGiHDdf2Fdpy9TEGL3t2YhIg4Uti7ZxPaZa93cL sQ8ws7bgWuZD6mNs8gZaMAS+m2JFEr9Glp13VoBR4eHWGtry4mZSZM/kVGVMBQs4XBEU tRFzLlZopW4rrttlYrDvzRoz68oC/fjYLffOorw5RIHFathJ/Ccgp84sBHP2P74N0/q0 OsKA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1708286878; x=1708891678; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=M4D9le2KkjKbIj5/ZeXSLmEDIerOkrycNwUHeOgzjZA=; b=C4AakR4lTWmVhqxsqvakARnbPclEyxZgEbuggawtPAaW6QgxPPaW9fTHF9ZA76L9c0 lpXRFSEIne49MzIo/889WfM0oOe9wgmjEnZisX6JOBy8ZiIDfjP18R9OcC764SGmoJh6 HgT1wHDd3xrYlz82eTMvKemoU/7x2fQaPqgub789pmnOPruHbvQW1SCtBdc6in0lpx6t 7/JbB9dF8jEA7sAiWoh+w6IKnOW2b59sWcQs/D00HX3/s58uYfxqeuI2M5FiFvXrp5DO facl2uukB4b/NhEKpAo/P0cDuiDLprSTnFNk8KMyQKwynW72fUoEi5bgjkDp73xAP8ge gF/A== X-Gm-Message-State: AOJu0Ywh2Z8l6dMwgCy4jJKT8DIDskS9/i7kJXULXA1aXQqpx2OTu6Vm Z5gwbTIPBVJ5fkowvjCzhafBL1vaLbfgceDgbIPH1/ph4UgzLkbDksFH3S4w X-Google-Smtp-Source: AGHT+IGz8xJsUy3D85cm3nNsFaXyXo6TBTIUSem8zh3V/lPn0TGo51zn3d09cTkQesAcnqRCVp5PFA== X-Received: by 2002:a6b:710d:0:b0:7c7:4daf:485a with SMTP id q13-20020a6b710d000000b007c74daf485amr1432614iog.11.1708286877987; Sun, 18 Feb 2024 12:07:57 -0800 (PST) Received: from localhost.localdomain ([2601:282:4300:19e0::44fb]) by smtp.gmail.com with ESMTPSA id u6-20020a02aa86000000b004741e1544b6sm549278jai.81.2024.02.18.12.07.56 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Sun, 18 Feb 2024 12:07:56 -0800 (PST) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][PATCH 3/5] asyncrpc: Add Client Pool object Date: Sun, 18 Feb 2024 13:07:41 -0700 Message-Id: <20240218200743.2982923-4-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20240218200743.2982923-1-JPEWhacker@gmail.com> References: <20240218200743.2982923-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Sun, 18 Feb 2024 20:08:05 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15919 Adds an abstract base class that can be used to implement a pool of client connections. The class implements a thread that runs an async event loop, and allows derived classes to schedule work on the loop and wait for the work to be finished. Signed-off-by: Joshua Watt --- bitbake/lib/bb/asyncrpc/__init__.py | 2 +- bitbake/lib/bb/asyncrpc/client.py | 77 +++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/bitbake/lib/bb/asyncrpc/__init__.py b/bitbake/lib/bb/asyncrpc/__init__.py index a4371643d74..639e1607f8e 100644 --- a/bitbake/lib/bb/asyncrpc/__init__.py +++ b/bitbake/lib/bb/asyncrpc/__init__.py @@ -5,7 +5,7 @@ # -from .client import AsyncClient, Client +from .client import AsyncClient, Client, ClientPool from .serv import AsyncServer, AsyncServerConnection from .connection import DEFAULT_MAX_CHUNK from .exceptions import ( diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index 0d7cd85780d..a6228bb0ba0 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py @@ -10,6 +10,8 @@ import json import os import socket import sys +import contextlib +from threading import Thread from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK from .exceptions import ConnectionClosedError, InvokeError @@ -180,3 +182,78 @@ class Client(object): def __exit__(self, exc_type, exc_value, traceback): self.close() return False + + +class ClientPool(object): + def __init__(self, max_clients): + self.avail_clients = [] + self.num_clients = 0 + self.max_clients = max_clients + self.loop = None + self.client_condition = None + + @abc.abstractmethod + async def _new_client(self): + raise NotImplementedError("Must be implemented in derived class") + + def close(self): + if self.client_condition: + self.client_condition = None + + if self.loop: + self.loop.run_until_complete(self.__close_clients()) + self.loop.run_until_complete(self.loop.shutdown_asyncgens()) + self.loop.close() + self.loop = None + + def run_tasks(self, tasks): + if not self.loop: + self.loop = asyncio.new_event_loop() + + thread = Thread(target=self.__thread_main, args=(tasks,)) + thread.start() + thread.join() + + @contextlib.asynccontextmanager + async def get_client(self): + async with self.client_condition: + if self.avail_clients: + client = self.avail_clients.pop() + elif self.num_clients < self.max_clients: + self.num_clients += 1 + client = await self._new_client() + else: + while not self.avail_clients: + await self.client_condition.wait() + client = self.avail_clients.pop() + + try: + yield client + finally: + async with self.client_condition: + self.avail_clients.append(client) + self.client_condition.notify() + + def __thread_main(self, tasks): + async def process_task(task): + async with self.get_client() as client: + await task(client) + + asyncio.set_event_loop(self.loop) + if not self.client_condition: + self.client_condition = asyncio.Condition() + tasks = [process_task(t) for t in tasks] + self.loop.run_until_complete(asyncio.gather(*tasks)) + + async def __close_clients(self): + for c in self.avail_clients: + await c.close() + self.avail_clients = [] + self.num_clients = 0 + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + return False From patchwork Sun Feb 18 20:07:42 2024 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 39652 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id CFE5CC48BF8 for ; Sun, 18 Feb 2024 20:08:04 +0000 (UTC) Received: from mail-io1-f41.google.com (mail-io1-f41.google.com [209.85.166.41]) by mx.groups.io with SMTP id smtpd.web11.25651.1708286881319476516 for ; Sun, 18 Feb 2024 12:08:01 -0800 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=hKA6zUtx; spf=pass (domain: gmail.com, ip: 209.85.166.41, mailfrom: jpewhacker@gmail.com) Received: by mail-io1-f41.google.com with SMTP id ca18e2360f4ac-7c3e01a7fe0so172941339f.2 for ; Sun, 18 Feb 2024 12:08:01 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1708286879; x=1708891679; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=NjgoUuru/5mh1737D5tbdwzUctpUmZzvxjF8D9ZkMhc=; b=hKA6zUtxlfnujWP4AfNg4pthmrt/pwM/VRXYOwJ3SW9m/IPOZcwzXhKRplJWZDRvGc m7FkwnijWw61j3fjGPRvbDjkebay9DTwM2djfAvzEXaYiblNzmXP9Jina8hdMiP89qYW RK/QTIdMvfQkgeRjS8eRskp/630eFEppMf7ROS5jHVSI1u/BgC9NkSfiXP0PkM1nsVmB pfXUQDE8wJXmdtvV3HVp46YW4/I2Z4w10+CfJCrFOWw2F6JEmwARL373E1DHNOsnQhFS D2HLHKqMxi7oUVjbfQ0GJmJE0yCAk30musYh5uNFeIwc1/92QuHnn3HhgjbxQmEGbItY /dqg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1708286879; x=1708891679; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=NjgoUuru/5mh1737D5tbdwzUctpUmZzvxjF8D9ZkMhc=; b=DOdLD0OrW8Aml9A9IHoRBo6AzK9VMJC6iS8vhkTKGJb6sFYpYyQa3H1Hn4/nXV0G1Q SYzKDpjaLrBrdwiOmtyPpL1Nj9Pk0CDgoX5MERpYbZnmIPpS7hLJgojCm4023sL/qRsO BJqVyHkyId8oDIqYpQe9v58SahSyRbfdEsJbpty8a6AdleHPxLQmfZ4ICSDLfjMePHST a845SABS/c0C7CQpmI94auPZPaodqfPjcUPk2/45Q56ftSNX39lK0krb9pE3Y6kXFX95 1qev3iOqRqMp2tznjSJViWpaD5EsPNtOeBXOvJV3EkmvOgzpyr4ip4ZgQ5Eh/A3BDLvJ gcbA== X-Gm-Message-State: AOJu0YwabN97EU1T/8zhhzkHd3ijE9tUZv53iMyctJEYnOM5IWDrIumK NoZWLWOoOwlTMg748hf1yQfpbCR4xHzFwNhrZ3mL4dUV4Mw7hsOIQCM3cH5x X-Google-Smtp-Source: AGHT+IHSM2UTNW4IoL3x7OlgKT+1M8VWRSFWUioXAxR750MGbOKOGV/pbTWxGRowyxAf19te/TD7Pw== X-Received: by 2002:a05:6602:809:b0:7c7:28dd:2b4e with SMTP id z9-20020a056602080900b007c728dd2b4emr7369969iow.13.1708286878883; Sun, 18 Feb 2024 12:07:58 -0800 (PST) Received: from localhost.localdomain ([2601:282:4300:19e0::44fb]) by smtp.gmail.com with ESMTPSA id u6-20020a02aa86000000b004741e1544b6sm549278jai.81.2024.02.18.12.07.58 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Sun, 18 Feb 2024 12:07:58 -0800 (PST) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][PATCH 4/5] hashserv: Add Client Pool Date: Sun, 18 Feb 2024 13:07:42 -0700 Message-Id: <20240218200743.2982923-5-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20240218200743.2982923-1-JPEWhacker@gmail.com> References: <20240218200743.2982923-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Sun, 18 Feb 2024 20:08:04 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15920 Implements a Client Pool derived from the AsyncRPC client pool that allows querying for multiple equivalent hashes in parallel Signed-off-by: Joshua Watt --- bitbake/lib/hashserv/client.py | 80 ++++++++++++++++++++++++++++++++ bitbake/lib/hashserv/tests.py | 83 ++++++++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+) diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index daf1e128423..b269879ecfd 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py @@ -283,3 +283,83 @@ class Client(bb.asyncrpc.Client): def _get_async_client(self): return AsyncClient(self.username, self.password) + + +class ClientPool(bb.asyncrpc.ClientPool): + def __init__( + self, + address, + max_clients, + *, + username=None, + password=None, + become=None, + ): + super().__init__(max_clients) + self.address = address + self.username = username + self.password = password + self.become = become + + async def _new_client(self): + client = await create_async_client( + self.address, + username=self.username, + password=self.password, + ) + if self.become: + await client.become_user(self.become) + return client + + def _run_key_tasks(self, queries, call): + results = {key: None for key in queries.keys()} + + def make_task(key, args): + async def task(client): + nonlocal results + unihash = await call(client, args) + results[key] = unihash + + return task + + def gen_tasks(): + for key, args in queries.items(): + yield make_task(key, args) + + self.run_tasks(gen_tasks()) + return results + + def get_unihashes(self, queries): + """ + Query multiple unihashes in parallel. + + The queries argument is a dictionary with arbitrary key. The values + must be a tuple of (method, taskhash). + + Returns a dictionary with a corresponding key for each input key, and + the value is the queried unihash (which might be none if the query + failed) + """ + + async def call(client, args): + method, taskhash = args + return await client.get_unihash(method, taskhash) + + return self._run_key_tasks(queries, call) + + def unihashes_exist(self, queries): + """ + Query multiple unihash existence checks in parallel. + + The queries argument is a dictionary with arbitrary key. The values + must be a unihash. + + Returns a dictionary with a corresponding key for each input key, and + the value is True or False if the unihash is known by the server (or + None if there was a failure) + """ + + async def call(client, unihash): + return await client.unihash_exists(unihash) + + return self._run_key_tasks(queries, call) diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index fbbe81512a0..0809453cf87 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py @@ -8,6 +8,7 @@ from . import create_server, create_client from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS from bb.asyncrpc import InvokeError +from .client import ClientPool import hashlib import logging import multiprocessing @@ -554,6 +555,88 @@ class HashEquivalenceCommonTests(object): # shares a taskhash with Task 2 self.assertClientGetHash(self.client, taskhash2, unihash2) + + def test_client_pool_get_unihashes(self): + TEST_INPUT = ( + # taskhash outhash unihash + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), + # Duplicated taskhash with multiple output hashes and unihashes. + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), + # Equivalent hash + ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), + ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), + ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), + ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), + ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), + ) + EXTRA_QUERIES = ( + "6b6be7a84ab179b4240c4302518dc3f6", + ) + + with ClientPool(self.server_address, 10) as client_pool: + for taskhash, outhash, unihash in TEST_INPUT: + self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + + query = {idx: (self.METHOD, data[0]) for idx, data in enumerate(TEST_INPUT)} + for idx, taskhash in enumerate(EXTRA_QUERIES): + query[idx + len(TEST_INPUT)] = (self.METHOD, taskhash) + + result = client_pool.get_unihashes(query) + + self.assertDictEqual(result, { + 0: "218e57509998197d570e2c98512d0105985dffc9", + 1: "218e57509998197d570e2c98512d0105985dffc9", + 2: "218e57509998197d570e2c98512d0105985dffc9", + 3: "3b5d3d83f07f259e9086fcb422c855286e18a57d", + 4: "f46d3fbb439bd9b921095da657a4de906510d2cd", + 5: "f46d3fbb439bd9b921095da657a4de906510d2cd", + 6: "05d2a63c81e32f0a36542ca677e8ad852365c538", + 7: None, + }) + + def test_client_pool_unihash_exists(self): + TEST_INPUT = ( + # taskhash outhash unihash + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), + # Duplicated taskhash with multiple output hashes and unihashes. + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), + # Equivalent hash + ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), + ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), + ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), + ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), + ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), + ) + EXTRA_QUERIES = ( + "6b6be7a84ab179b4240c4302518dc3f6", + ) + + result_unihashes = set() + + + with ClientPool(self.server_address, 10) as client_pool: + for taskhash, outhash, unihash in TEST_INPUT: + result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + result_unihashes.add(result["unihash"]) + + query = {} + expected = {} + + for _, _, unihash in TEST_INPUT: + idx = len(query) + query[idx] = unihash + expected[idx] = unihash in result_unihashes + + + for unihash in EXTRA_QUERIES: + idx = len(query) + query[idx] = unihash + expected[idx] = False + + result = client_pool.unihashes_exist(query) + self.assertDictEqual(result, expected) + + def test_auth_read_perms(self): admin_client = self.start_auth_server() From patchwork Sun Feb 18 20:07:43 2024 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Joshua Watt X-Patchwork-Id: 39653 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from aws-us-west-2-korg-lkml-1.web.codeaurora.org (localhost.localdomain [127.0.0.1]) by smtp.lore.kernel.org (Postfix) with ESMTP id F3637C54766 for ; Sun, 18 Feb 2024 20:08:04 +0000 (UTC) Received: from mail-io1-f43.google.com (mail-io1-f43.google.com [209.85.166.43]) by mx.groups.io with SMTP id smtpd.web11.25654.1708286883375541698 for ; Sun, 18 Feb 2024 12:08:03 -0800 Authentication-Results: mx.groups.io; dkim=pass header.i=@gmail.com header.s=20230601 header.b=Yi3hQkdU; spf=pass (domain: gmail.com, ip: 209.85.166.43, mailfrom: jpewhacker@gmail.com) Received: by mail-io1-f43.google.com with SMTP id ca18e2360f4ac-7c457b8ef7cso142242939f.2 for ; Sun, 18 Feb 2024 12:08:03 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20230601; t=1708286881; x=1708891681; darn=lists.openembedded.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=IGm+CR07IVTaqD5fHQ5Z3Ej7Lcj09MmUmOV6eyrsNLo=; b=Yi3hQkdUS/d3GoECgKvrqznJ/9gPd5QgMmXaPNYJRWjQtq9+CFo3MRiRyKwc4JqeZA xqB66UxPnfWf2bZXEJbOD4E78mw1nPVX4FAcxj9Qu0RK5wjsj+MnzYCUER5z2dIxCqYa INbXMve9akX9pHHxueAlRr7a6H52OQ6ORRrx5uPJrMXC7bvQ0KNOSmUxe1Hh5xyN/5eA w+FpIj0xE4HHQtU07WQSl9GEzTR7NYkhiwPujcphjyy4lfTOpcXAmCYFb+G5I5oVW94P 0Zyd3pa59yKTCihTPAdAW0a0k6ClGkT8UVSjeIEaYNsB7BL4e270yhBVs1oKi4OVlPhH NLjg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1708286881; x=1708891681; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-message-state:from:to:cc :subject:date:message-id:reply-to; bh=IGm+CR07IVTaqD5fHQ5Z3Ej7Lcj09MmUmOV6eyrsNLo=; b=NC+wiJAwNKFA3UUkwgPIafF38EahQRWv2oUPm0Mijj77Rf8Qu8FuCM8+ZWDvlxWMPK oZS01tBRUXxXcrxth58bqzqDOQdtFh+ZEfPSAtteeCVDBjnXXH5AThnKiREaYh8GoEqn GvNAWQEZ9FXQbf7kmVIdEEYwSrsxdaNueoTomhs8zGigyiehAoSFcvs99q/7UGtSFnwq FXcH3H3tDwJiXqxco0zZS/IEXuysEDw5wdpG0qmL0zy3ZOoZ6Jxjk42kTaE7lNTvm7st 80HLTpcuASENmR1us+JLVQVtw59rQN+LfTxKBborxns1sVQudeaLwJt83lfKJrhotcxW qUPw== X-Gm-Message-State: AOJu0Ywl9jl4GQ7swrnFBCRTDd5iB4Xnb1BPyhTGgQJQ1W/RY+6KM6WD W7kbS30TZgGO/Vys0h9+Oj5+Hr3mCWFWqnVzJ2jOYQL2lPKJCVqdKfCjguUw X-Google-Smtp-Source: AGHT+IFM8Ltc6dB5k0fXiBc0Sqv9UvpKjEYUWjLjaLHobU6FT42Er1qIFcHEkiHn7VESzfCWzLN1Kg== X-Received: by 2002:a05:6602:2741:b0:7c4:75b7:ff82 with SMTP id b1-20020a056602274100b007c475b7ff82mr12655556ioe.20.1708286881532; Sun, 18 Feb 2024 12:08:01 -0800 (PST) Received: from localhost.localdomain ([2601:282:4300:19e0::44fb]) by smtp.gmail.com with ESMTPSA id u6-20020a02aa86000000b004741e1544b6sm549278jai.81.2024.02.18.12.07.58 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Sun, 18 Feb 2024 12:07:59 -0800 (PST) From: Joshua Watt X-Google-Original-From: Joshua Watt To: bitbake-devel@lists.openembedded.org Cc: Joshua Watt Subject: [bitbake-devel][PATCH 5/5] siggen: Add parallel query API Date: Sun, 18 Feb 2024 13:07:43 -0700 Message-Id: <20240218200743.2982923-6-JPEWhacker@gmail.com> X-Mailer: git-send-email 2.34.1 In-Reply-To: <20240218200743.2982923-1-JPEWhacker@gmail.com> References: <20240218200743.2982923-1-JPEWhacker@gmail.com> MIME-Version: 1.0 List-Id: X-Webhook-Received: from li982-79.members.linode.com [45.33.32.79] by aws-us-west-2-korg-lkml-1.web.codeaurora.org with HTTPS for ; Sun, 18 Feb 2024 20:08:04 -0000 X-Groupsio-URL: https://lists.openembedded.org/g/bitbake-devel/message/15921 Implements a new API called get_unihashes() that allows for querying multiple unihashes in parallel. The API is also reworked to make it easier for derived classes to interface with the new API in a consistent manner. Instead of overriding get_unihash() to add custom handling for local hash calculating (e.g. caches) derived classes should now override get_cached_unihash(), and return the local unihash or None if there isn't one. Signed-off-by: Joshua Watt --- bitbake/lib/bb/siggen.py | 121 ++++++++++++++++++++++++++++----------- 1 file changed, 87 insertions(+), 34 deletions(-) diff --git a/bitbake/lib/bb/siggen.py b/bitbake/lib/bb/siggen.py index 58854aee76c..e1a4fa2cdd1 100644 --- a/bitbake/lib/bb/siggen.py +++ b/bitbake/lib/bb/siggen.py @@ -102,9 +102,18 @@ class SignatureGenerator(object): if flag: self.datacaches[mc].stamp_extrainfo[mcfn][t] = flag + def get_cached_unihash(self, tid): + return None + def get_unihash(self, tid): + unihash = self.get_cached_unihash(tid) + if unihash: + return unihash return self.taskhash[tid] + def get_unihashes(self, tids): + return {tid: self.get_unihash(tid) for tid in tids} + def prep_taskhash(self, tid, deps, dataCaches): return @@ -524,28 +533,37 @@ class SignatureGeneratorUniHashMixIn(object): super().__init__(data) def get_taskdata(self): - return (self.server, self.method, self.extramethod) + super().get_taskdata() + return (self.server, self.method, self.extramethod, self.max_parallel) + super().get_taskdata() def set_taskdata(self, data): - self.server, self.method, self.extramethod = data[:3] - super().set_taskdata(data[3:]) + self.server, self.method, self.extramethod, self.max_parallel = data[:4] + super().set_taskdata(data[4:]) def client(self): if getattr(self, '_client', None) is None: self._client = hashserv.create_client(self.server) return self._client + def client_pool(self): + if getattr(self, '_client_pool', None) is None: + self._client_pool = hashserv.client.ClientPool(self.server, self.max_parallel) + return self._client_pool + def reset(self, data): - if getattr(self, '_client', None) is not None: - self._client.close() - self._client = None + self.__close_clients() return super().reset(data) def exit(self): + self.__close_clients() + return super().exit() + + def __close_clients(self): if getattr(self, '_client', None) is not None: self._client.close() self._client = None - return super().exit() + if getattr(self, '_client_pool', None) is not None: + self._client_pool.close() + self._client_pool = None def get_stampfile_hash(self, tid): if tid in self.taskhash: @@ -578,7 +596,7 @@ class SignatureGeneratorUniHashMixIn(object): return None return unihash - def get_unihash(self, tid): + def get_cached_unihash(self, tid): taskhash = self.taskhash[tid] # If its not a setscene task we can return @@ -593,40 +611,74 @@ class SignatureGeneratorUniHashMixIn(object): self.unihash[tid] = unihash return unihash - # In the absence of being able to discover a unique hash from the - # server, make it be equivalent to the taskhash. The unique "hash" only - # really needs to be a unique string (not even necessarily a hash), but - # making it match the taskhash has a few advantages: - # - # 1) All of the sstate code that assumes hashes can be the same - # 2) It provides maximal compatibility with builders that don't use - # an equivalency server - # 3) The value is easy for multiple independent builders to derive the - # same unique hash from the same input. This means that if the - # independent builders find the same taskhash, but it isn't reported - # to the server, there is a better chance that they will agree on - # the unique hash. - unihash = taskhash + return None - try: - method = self.method - if tid in self.extramethod: - method = method + self.extramethod[tid] - data = self.client().get_unihash(method, self.taskhash[tid]) - if data: - unihash = data + def _get_method(self, tid): + method = self.method + if tid in self.extramethod: + method = method + self.extramethod[tid] + + return method + + def get_unihash(self, tid): + return self.get_unihashes([tid])[tid] + + def get_unihashes(self, tids): + """ + For a iterable of tids, returns a dictionary that maps each tid to a + unihash + """ + result = {} + queries = {} + query_result = {} + + for tid in tids: + unihash = self.get_cached_unihash(tid) + if unihash: + result[tid] = unihash + else: + queries[tid] = (self._get_method(tid), self.taskhash[tid]) + + if len(queries) == 0: + return result + + if self.max_parallel <= 1 or len(queries) <= 1: + # No parallelism required. Make the query serially with the single client + for tid, args in queries.items(): + query_result[tid] = self.client().get_unihash(*args) + else: + query_result = self.client_pool().get_unihashes(queries) + + for tid, unihash in query_result.items(): + # In the absence of being able to discover a unique hash from the + # server, make it be equivalent to the taskhash. The unique "hash" only + # really needs to be a unique string (not even necessarily a hash), but + # making it match the taskhash has a few advantages: + # + # 1) All of the sstate code that assumes hashes can be the same + # 2) It provides maximal compatibility with builders that don't use + # an equivalency server + # 3) The value is easy for multiple independent builders to derive the + # same unique hash from the same input. This means that if the + # independent builders find the same taskhash, but it isn't reported + # to the server, there is a better chance that they will agree on + # the unique hash. + taskhash = self.taskhash[tid] + if unihash: # A unique hash equal to the taskhash is not very interesting, # so it is reported it at debug level 2. If they differ, that # is much more interesting, so it is reported at debug level 1 hashequiv_logger.bbdebug((1, 2)[unihash == taskhash], 'Found unihash %s in place of %s for %s from %s' % (unihash, taskhash, tid, self.server)) else: hashequiv_logger.debug2('No reported unihash for %s:%s from %s' % (tid, taskhash, self.server)) - except ConnectionError as e: - bb.warn('Error contacting Hash Equivalence Server %s: %s' % (self.server, str(e))) + unihash = taskhash - self.set_unihash(tid, unihash) - self.unihash[tid] = unihash - return unihash + + self.set_unihash(tid, unihash) + self.unihash[tid] = unihash + result[tid] = unihash + + return result def report_unihash(self, path, task, d): import importlib @@ -754,6 +806,7 @@ class SignatureGeneratorTestEquivHash(SignatureGeneratorUniHashMixIn, SignatureG super().init_rundepcheck(data) self.server = data.getVar('BB_HASHSERVE') self.method = "sstate_output_hash" + self.max_parallel = 1 def clean_checksum_file_path(file_checksum_tuple): f, cs = file_checksum_tuple